#!/usr/bin/env python3 """ Rotary Phone Audio Handler with Web Interface Detects handset pickup, plays custom sound, records audio, and provides web UI """ import pyaudio import wave import time import RPi.GPIO as GPIO from datetime import datetime import os import numpy as np import threading from flask import Flask, render_template, request, send_file, jsonify, redirect, url_for from werkzeug.utils import secure_filename import json # Configuration HOOK_PIN = 17 # GPIO pin for hook switch (change to your pin) HOOK_PRESSED = GPIO.LOW # Change to GPIO.HIGH if your switch is active high # Audio settings CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 44100 RECORD_SECONDS = 300 # Maximum recording time (5 minutes) # Directories BASE_DIR = "/home/berwn/rotary_phone_data" OUTPUT_DIR = os.path.join(BASE_DIR, "recordings") SOUNDS_DIR = os.path.join(BASE_DIR, "sounds") CONFIG_FILE = os.path.join(BASE_DIR, "config.json") DIALTONE_FILE = os.path.join(SOUNDS_DIR, "dialtone.wav") # Legacy default # Web server settings WEB_PORT = 8080 # Flask app app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB max file size class RotaryPhone: def __init__(self): self.audio = pyaudio.PyAudio() self.recording = False self.phone_status = "on_hook" self.current_recording = None # Setup GPIO GPIO.setmode(GPIO.BCM) GPIO.setup(HOOK_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) # Create directories os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(SOUNDS_DIR, exist_ok=True) # Initialize configuration self.config = self.load_config() # Generate default dial tone if none exists if not os.path.exists(DIALTONE_FILE): self.generate_default_dialtone() def load_config(self): """Load configuration from JSON file""" default_config = { "active_greeting": "dialtone.wav", "greetings": [] } if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, 'r') as f: return json.load(f) except: pass return default_config def save_config(self): """Save configuration to JSON file""" with open(CONFIG_FILE, 'w') as f: json.dump(self.config, f, indent=2) def get_active_greeting_path(self): """Get the full path to the active greeting file""" active = self.config.get("active_greeting", "dialtone.wav") return os.path.join(SOUNDS_DIR, active) def set_active_greeting(self, filename): """Set which greeting message to play""" self.config["active_greeting"] = filename self.save_config() def generate_default_dialtone(self): """Generate a classic dial tone (350Hz + 440Hz) and save as default""" print("Generating default dial tone...") duration = 3 sample_rate = 44100 t = np.linspace(0, duration, int(sample_rate * duration), False) # Generate two frequencies and combine them tone1 = np.sin(2 * np.pi * 350 * t) tone2 = np.sin(2 * np.pi * 440 * t) tone = (tone1 + tone2) / 2 # Convert to 16-bit PCM tone = (tone * 32767).astype(np.int16) # Save as WAV file wf = wave.open(DIALTONE_FILE, 'wb') wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(sample_rate) wf.writeframes(tone.tobytes()) wf.close() print(f"Default dial tone saved to {DIALTONE_FILE}") def play_sound_file(self, filepath): """Play a WAV file""" if not os.path.exists(filepath): print(f"Sound file not found: {filepath}") return False print(f"Playing sound: {filepath}") try: wf = wave.open(filepath, 'rb') # Use device index 1 for HiFiBerry (change if needed) stream = self.audio.open( format=self.audio.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=1, # HiFiBerry device index frames_per_buffer=CHUNK ) # Play the sound data = wf.readframes(CHUNK) while data and self.phone_status == "off_hook": stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() wf.close() print("Sound playback finished") return True except Exception as e: print(f"Error playing sound: {e}") return False def record_audio(self): """Record audio from the microphone""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(OUTPUT_DIR, f"recording_{timestamp}.wav") self.current_recording = filename print(f"Recording to {filename}") try: # Use device index 1 for HiFiBerry (change if needed) stream = self.audio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, input_device_index=1, # HiFiBerry device index frames_per_buffer=CHUNK ) frames = [] self.recording = True # Record until handset is hung up or max time reached start_time = time.time() while self.recording and (time.time() - start_time) < RECORD_SECONDS: # Check if handset is still off hook if GPIO.input(HOOK_PIN) != HOOK_PRESSED: print("Handset hung up, stopping recording") break try: data = stream.read(CHUNK, exception_on_overflow=False) frames.append(data) except Exception as e: print(f"Error reading audio: {e}") break stream.stop_stream() stream.close() # Save the recording if frames: wf = wave.open(filename, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(self.audio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() duration = len(frames) * CHUNK / RATE print(f"Recording saved: {filename} ({duration:.1f}s)") else: print("No audio recorded") except Exception as e: print(f"Recording error: {e}") self.recording = False self.current_recording = None def get_status(self): """Get current phone status""" return { "status": self.phone_status, "recording": self.recording, "current_recording": self.current_recording } def phone_loop(self): """Main phone handling loop""" print("Rotary Phone System Started") print(f"Hook pin: GPIO {HOOK_PIN}") print(f"Recordings will be saved to: {OUTPUT_DIR}") try: while True: # Wait for handset pickup if GPIO.input(HOOK_PIN) == HOOK_PRESSED and self.phone_status == "on_hook": self.phone_status = "off_hook" print("\n=== Handset picked up ===") # Play active greeting message greeting_file = self.get_active_greeting_path() self.play_sound_file(greeting_file) # Start recording if self.phone_status == "off_hook": # Still off hook after sound self.record_audio() self.phone_status = "on_hook" time.sleep(0.1) except KeyboardInterrupt: print("\nShutting down phone system...") finally: self.cleanup() def cleanup(self): """Clean up resources""" self.audio.terminate() GPIO.cleanup() print("Cleanup complete") # Global phone instance phone = RotaryPhone() # Flask Routes @app.route('/') def index(): """Main page""" recordings = get_recordings() greetings = get_greetings() status = phone.get_status() active_greeting = phone.config.get("active_greeting", "dialtone.wav") return render_template('index.html', recordings=recordings, greetings=greetings, active_greeting=active_greeting, status=status) @app.route('/api/status') def api_status(): """API endpoint for phone status""" return jsonify(phone.get_status()) @app.route('/api/recordings') def api_recordings(): """API endpoint for recordings list""" return jsonify(get_recordings()) @app.route('/api/greetings') def api_greetings(): """API endpoint for greetings list""" return jsonify(get_greetings()) @app.route('/upload_greeting', methods=['POST']) def upload_greeting(): """Upload a new greeting message""" if 'soundfile' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['soundfile'] if file.filename == '': return jsonify({"error": "No file selected"}), 400 if file and file.filename.lower().endswith('.wav'): filename = secure_filename(file.filename) # Ensure unique filename counter = 1 base_name = os.path.splitext(filename)[0] while os.path.exists(os.path.join(SOUNDS_DIR, filename)): filename = f"{base_name}_{counter}.wav" counter += 1 filepath = os.path.join(SOUNDS_DIR, filename) file.save(filepath) return jsonify({"success": True, "message": f"Greeting '{filename}' uploaded successfully", "filename": filename}) return jsonify({"error": "Only WAV files are supported"}), 400 @app.route('/set_active_greeting', methods=['POST']) def set_active_greeting(): """Set which greeting to play""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Greeting file not found"}), 404 phone.set_active_greeting(filename) return jsonify({"success": True, "message": f"Active greeting set to '{filename}'"}) @app.route('/delete_greeting/', methods=['POST']) def delete_greeting(filename): """Delete a greeting file""" filename = secure_filename(filename) filepath = os.path.join(SOUNDS_DIR, filename) # Don't delete the active greeting if filename == phone.config.get("active_greeting"): return jsonify({"error": "Cannot delete the active greeting. Please select a different greeting first."}), 400 if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/play_audio//') def play_audio(audio_type, filename): """Serve audio file for web playback""" filename = secure_filename(filename) if audio_type == 'recording': filepath = os.path.join(OUTPUT_DIR, filename) elif audio_type == 'greeting': filepath = os.path.join(SOUNDS_DIR, filename) else: return "Invalid audio type", 400 if os.path.exists(filepath): return send_file(filepath, mimetype='audio/wav') return "File not found", 404 @app.route('/download/') def download_recording(filename): """Download a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return "File not found", 404 @app.route('/delete/', methods=['POST']) def delete_recording(filename): """Delete a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/restore_default_sound', methods=['POST']) def restore_default_sound(): """Restore default dial tone""" if os.path.exists(DIALTONE_FILE): os.remove(DIALTONE_FILE) phone.generate_default_dialtone() phone.set_active_greeting("dialtone.wav") return jsonify({"success": True, "message": "Default dial tone restored"}) def get_greetings(): """Get list of all greeting sound files""" greetings = [] if os.path.exists(SOUNDS_DIR): for filename in sorted(os.listdir(SOUNDS_DIR)): if filename.endswith('.wav'): filepath = os.path.join(SOUNDS_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 greetings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration, "is_active": filename == phone.config.get("active_greeting", "dialtone.wav") }) return greetings def get_recordings(): """Get list of all recordings with metadata""" recordings = [] if os.path.exists(OUTPUT_DIR): for filename in sorted(os.listdir(OUTPUT_DIR), reverse=True): if filename.endswith('.wav'): filepath = os.path.join(OUTPUT_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 recordings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration }) return recordings def get_local_ip(): """Get local IP address""" import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except: return "127.0.0.1" if __name__ == "__main__": # Create templates directory and HTML template script_dir = os.path.dirname(os.path.abspath(__file__)) templates_dir = os.path.join(script_dir, 'templates') os.makedirs(templates_dir, exist_ok=True) # Create the HTML template if it doesn't exist template_file = os.path.join(templates_dir, 'index.html') if not os.path.exists(template_file): print(f"Creating template at {template_file}") with open(template_file, 'w') as f: f.write(''' Rotary Phone Control Panel

📞 Rotary Phone Control Panel

Manage your vintage phone system

Phone Status

{% if status.recording %} 🔴 Recording in progress... {% elif status.status == 'off_hook' %} 📞 Handset off hook {% else %} ✅ Ready (handset on hook) {% endif %}
{% if status.current_recording %}

Recording to: {{ status.current_recording.split('/')[-1] }}

{% endif %}

đŸŽĩ Greeting Messages

â„šī¸ Active greeting: {{ active_greeting }}

Upload WAV files to play when the handset is picked up

{% if greetings %}

Available Greetings

{% for greeting in greetings %}

{% if greeting.is_active %}⭐{% endif %} {{ greeting.filename }}

📅 {{ greeting.date }} | âąī¸ {{ "%.1f"|format(greeting.duration) }}s | 💾 {{ "%.2f"|format(greeting.size_mb) }} MB
{% if not greeting.is_active %} {% else %} {% endif %}
{% endfor %}
{% else %}

đŸŽĩ

No greeting messages uploaded yet. Upload your first greeting!

{% endif %}

đŸŽ™ī¸ Recordings

{% if recordings %}
{{ recordings|length }}
Total Recordings
{{ "%.1f"|format(recordings|sum(attribute='size_mb')) }} MB
Total Size
{{ "%.1f"|format(recordings|sum(attribute='duration')/60) }} min
Total Duration
{% for recording in recordings %}

{{ recording.filename }}

📅 {{ recording.date }} | âąī¸ {{ "%.1f"|format(recording.duration) }}s | 💾 {{ "%.2f"|format(recording.size_mb) }} MB
{% endfor %}
{% else %}

📭

No recordings yet. Pick up the phone to start recording!

{% endif %}

đŸŽĩ Audio Player

''') else: print(f"Template already exists at {template_file}") # Start phone handling in separate thread phone_thread = threading.Thread(target=phone.phone_loop, daemon=True) phone_thread.start() # Get and display local IP local_ip = get_local_ip() print("\n" + "="*60) print(f"Web Interface Available At:") print(f" http://{local_ip}:{WEB_PORT}") print(f" http://localhost:{WEB_PORT}") print("="*60 + "\n") # Start Flask web server app.run(host='0.0.0.0', port=WEB_PORT, debug=False, threaded=True)