#!/usr/bin/env python3 """ Rotary Phone Audio Handler with Web Interface Detects handset pickup, plays custom sound, records audio, and provides web UI """ import pyaudio import wave import time import RPi.GPIO as GPIO from datetime import datetime import os import numpy as np import threading from flask import Flask, render_template, request, send_file, jsonify, redirect, url_for from werkzeug.utils import secure_filename import json import sys # Load configuration def load_system_config(): """Load system configuration from config.json""" config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') # Check if config exists, otherwise use example if not os.path.exists(config_path): example_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.example.json') if os.path.exists(example_path): print(f"Config file not found. Please copy {example_path} to {config_path}") print("Command: cp config.example.json config.json") sys.exit(1) else: print("ERROR: No configuration file found!") sys.exit(1) with open(config_path, 'r') as f: return json.load(f) # Load system configuration SYS_CONFIG = load_system_config() # GPIO Configuration HOOK_PIN = SYS_CONFIG['gpio']['hook_pin'] HOOK_PRESSED = GPIO.LOW if SYS_CONFIG['gpio']['hook_pressed_state'] == 'LOW' else GPIO.HIGH # Audio settings AUDIO_DEVICE_INDEX = SYS_CONFIG['audio']['device_index'] CHUNK = SYS_CONFIG['audio']['chunk_size'] FORMAT = pyaudio.paInt16 # Fixed format CHANNELS = SYS_CONFIG['audio']['channels'] RATE = SYS_CONFIG['audio']['sample_rate'] RECORD_SECONDS = SYS_CONFIG['audio']['max_record_seconds'] # Directories SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.join(SCRIPT_DIR, SYS_CONFIG['paths']['base_dir']) if SYS_CONFIG['paths']['base_dir'].startswith('.') else SYS_CONFIG['paths']['base_dir'] BASE_DIR = os.path.abspath(BASE_DIR) OUTPUT_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['recordings_dir']) SOUNDS_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['sounds_dir']) USER_CONFIG_FILE = os.path.join(BASE_DIR, "user_config.json") # Runtime user settings DIALTONE_FILE = os.path.join(SOUNDS_DIR, "dialtone.wav") # Web server settings WEB_PORT = SYS_CONFIG['web']['port'] # Flask app app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = SYS_CONFIG['web']['max_upload_size_mb'] * 1024 * 1024 class RotaryPhone: def __init__(self): self.audio = pyaudio.PyAudio() self.recording = False self.phone_status = "on_hook" self.current_recording = None # Setup GPIO GPIO.setmode(GPIO.BCM) GPIO.setup(HOOK_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) # Create directories os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(SOUNDS_DIR, exist_ok=True) # Initialize configuration self.config = self.load_config() # Generate default dial tone if none exists if not os.path.exists(DIALTONE_FILE): self.generate_default_dialtone() def load_config(self): """Load user runtime configuration from JSON file""" default_config = { "active_greeting": SYS_CONFIG['system']['active_greeting'], "greetings": [], "volume": SYS_CONFIG['system']['volume'] } if os.path.exists(USER_CONFIG_FILE): try: with open(USER_CONFIG_FILE, 'r') as f: config = json.load(f) # Ensure volume key exists if "volume" not in config: config["volume"] = SYS_CONFIG['system']['volume'] return config except: pass return default_config def save_config(self): """Save user runtime configuration to JSON file""" with open(USER_CONFIG_FILE, 'w') as f: json.dump(self.config, f, indent=2) def get_active_greeting_path(self): """Get the full path to the active greeting file""" active = self.config.get("active_greeting", "dialtone.wav") return os.path.join(SOUNDS_DIR, active) def set_active_greeting(self, filename): """Set which greeting message to play""" self.config["active_greeting"] = filename self.save_config() def set_volume(self, volume): """Set playback volume (0-100)""" volume = max(0, min(100, int(volume))) # Clamp between 0-100 self.config["volume"] = volume self.save_config() return volume def get_volume(self): """Get current volume setting""" return self.config.get("volume", 70) def generate_default_dialtone(self): """Generate a classic dial tone (350Hz + 440Hz) and save as default""" print("Generating default dial tone...") duration = 3 sample_rate = 44100 t = np.linspace(0, duration, int(sample_rate * duration), False) # Generate two frequencies and combine them tone1 = np.sin(2 * np.pi * 350 * t) tone2 = np.sin(2 * np.pi * 440 * t) tone = (tone1 + tone2) / 2 # Convert to 16-bit PCM tone = (tone * 32767).astype(np.int16) # Save as WAV file wf = wave.open(DIALTONE_FILE, 'wb') wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(sample_rate) wf.writeframes(tone.tobytes()) wf.close() print(f"Default dial tone saved to {DIALTONE_FILE}") def play_sound_file(self, filepath): """Play a WAV file with volume control""" if not os.path.exists(filepath): print(f"Sound file not found: {filepath}") return False print(f"Playing sound: {filepath}") try: wf = wave.open(filepath, 'rb') # Use configured audio device stream = self.audio.open( format=self.audio.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) # Get volume multiplier (0.0 to 1.0) volume = self.get_volume() / 100.0 # Play the sound with volume control data = wf.readframes(CHUNK) while data and self.phone_status == "off_hook": # Apply volume by converting to numpy array and scaling if volume < 1.0: audio_data = np.frombuffer(data, dtype=np.int16) audio_data = (audio_data * volume).astype(np.int16) data = audio_data.tobytes() stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() wf.close() print("Sound playback finished") return True except Exception as e: print(f"Error playing sound: {e}") return False def record_audio(self): """Record audio from the microphone""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(OUTPUT_DIR, f"recording_{timestamp}.wav") self.current_recording = filename print(f"Recording to {filename}") try: # Use configured audio device stream = self.audio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, input_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) frames = [] self.recording = True # Record until handset is hung up or max time reached start_time = time.time() while self.recording and (time.time() - start_time) < RECORD_SECONDS: # Check if handset is still off hook if GPIO.input(HOOK_PIN) != HOOK_PRESSED: print("Handset hung up, stopping recording") break try: data = stream.read(CHUNK, exception_on_overflow=False) frames.append(data) except Exception as e: print(f"Error reading audio: {e}") break stream.stop_stream() stream.close() # Save the recording if frames: wf = wave.open(filename, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(self.audio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() duration = len(frames) * CHUNK / RATE print(f"Recording saved: {filename} ({duration:.1f}s)") else: print("No audio recorded") except Exception as e: print(f"Recording error: {e}") self.recording = False self.current_recording = None def get_status(self): """Get current phone status""" return { "status": self.phone_status, "recording": self.recording, "current_recording": self.current_recording } def phone_loop(self): """Main phone handling loop""" print("Rotary Phone System Started") print(f"Hook pin: GPIO {HOOK_PIN}") print(f"Recordings will be saved to: {OUTPUT_DIR}") try: while True: # Wait for handset pickup if GPIO.input(HOOK_PIN) == HOOK_PRESSED and self.phone_status == "on_hook": self.phone_status = "off_hook" print("\n=== Handset picked up ===") # Play active greeting message greeting_file = self.get_active_greeting_path() self.play_sound_file(greeting_file) # Start recording if self.phone_status == "off_hook": # Still off hook after sound self.record_audio() self.phone_status = "on_hook" time.sleep(0.1) except KeyboardInterrupt: print("\nShutting down phone system...") finally: self.cleanup() def cleanup(self): """Clean up resources""" self.audio.terminate() GPIO.cleanup() print("Cleanup complete") # Global phone instance phone = RotaryPhone() # Flask Routes @app.route('/') def index(): """Main page""" recordings = get_recordings() greetings = get_greetings() status = phone.get_status() active_greeting = phone.config.get("active_greeting", "dialtone.wav") volume = phone.get_volume() return render_template('index.html', recordings=recordings, greetings=greetings, active_greeting=active_greeting, status=status, volume=volume) @app.route('/api/status') def api_status(): """API endpoint for phone status""" return jsonify(phone.get_status()) @app.route('/api/recordings') def api_recordings(): """API endpoint for recordings list""" return jsonify(get_recordings()) @app.route('/api/greetings') def api_greetings(): """API endpoint for greetings list""" return jsonify(get_greetings()) @app.route('/api/volume', methods=['GET']) def api_get_volume(): """Get current volume setting""" return jsonify({"volume": phone.get_volume()}) @app.route('/api/volume', methods=['POST']) def api_set_volume(): """Set volume level""" data = request.get_json() volume = data.get('volume', 70) new_volume = phone.set_volume(volume) return jsonify({"success": True, "volume": new_volume}) @app.route('/upload_greeting', methods=['POST']) def upload_greeting(): """Upload a new greeting message""" if 'soundfile' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['soundfile'] if file.filename == '': return jsonify({"error": "No file selected"}), 400 if file and file.filename.lower().endswith('.wav'): filename = secure_filename(file.filename) # Ensure unique filename counter = 1 base_name = os.path.splitext(filename)[0] while os.path.exists(os.path.join(SOUNDS_DIR, filename)): filename = f"{base_name}_{counter}.wav" counter += 1 filepath = os.path.join(SOUNDS_DIR, filename) file.save(filepath) return jsonify({"success": True, "message": f"Greeting '{filename}' uploaded successfully", "filename": filename}) return jsonify({"error": "Only WAV files are supported"}), 400 @app.route('/set_active_greeting', methods=['POST']) def set_active_greeting(): """Set which greeting to play""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Greeting file not found"}), 404 phone.set_active_greeting(filename) return jsonify({"success": True, "message": f"Active greeting set to '{filename}'"}) @app.route('/delete_greeting/', methods=['POST']) def delete_greeting(filename): """Delete a greeting file""" filename = secure_filename(filename) filepath = os.path.join(SOUNDS_DIR, filename) # Don't delete the active greeting if filename == phone.config.get("active_greeting"): return jsonify({"error": "Cannot delete the active greeting. Please select a different greeting first."}), 400 if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/play_audio//') def play_audio(audio_type, filename): """Serve audio file for web playback""" filename = secure_filename(filename) if audio_type == 'recording': filepath = os.path.join(OUTPUT_DIR, filename) elif audio_type == 'greeting': filepath = os.path.join(SOUNDS_DIR, filename) else: return "Invalid audio type", 400 if os.path.exists(filepath): return send_file(filepath, mimetype='audio/wav') return "File not found", 404 @app.route('/download/') def download_recording(filename): """Download a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return "File not found", 404 @app.route('/delete/', methods=['POST']) def delete_recording(filename): """Delete a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/restore_default_sound', methods=['POST']) def restore_default_sound(): """Restore default dial tone""" if os.path.exists(DIALTONE_FILE): os.remove(DIALTONE_FILE) phone.generate_default_dialtone() phone.set_active_greeting("dialtone.wav") return jsonify({"success": True, "message": "Default dial tone restored"}) def get_greetings(): """Get list of all greeting sound files""" greetings = [] if os.path.exists(SOUNDS_DIR): for filename in sorted(os.listdir(SOUNDS_DIR)): if filename.endswith('.wav'): filepath = os.path.join(SOUNDS_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 greetings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration, "is_active": filename == phone.config.get("active_greeting", "dialtone.wav") }) return greetings def get_recordings(): """Get list of all recordings with metadata""" recordings = [] if os.path.exists(OUTPUT_DIR): for filename in sorted(os.listdir(OUTPUT_DIR), reverse=True): if filename.endswith('.wav'): filepath = os.path.join(OUTPUT_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 recordings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration }) return recordings def get_local_ip(): """Get local IP address""" import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except: return "127.0.0.1" if __name__ == "__main__": # Create templates directory and HTML template script_dir = os.path.dirname(os.path.abspath(__file__)) templates_dir = os.path.join(script_dir, 'templates') os.makedirs(templates_dir, exist_ok=True) # Create the HTML template if it doesn't exist template_file = os.path.join(templates_dir, 'index.html') if not os.path.exists(template_file): print(f"Creating template at {template_file}") with open(template_file, 'w') as f: f.write(''' Rotary Phone Control Panel

📞 Rotary Phone Control Panel

Manage your vintage phone system

Phone Status

{% if status.recording %} 🔴 Recording in progress... {% elif status.status == 'off_hook' %} 📞 Handset off hook {% else %} ✅ Ready (handset on hook) {% endif %}
{% if status.current_recording %}

Recording to: {{ status.current_recording.split('/')[-1] }}

{% endif %}

🔊 Volume Control

🔇 🔊 {{ volume }}%

Adjust the playback volume for greeting messages

đŸŽĩ Greeting Messages

â„šī¸ Active greeting: {{ active_greeting }}

Upload WAV files to play when the handset is picked up

{% if greetings %}

Available Greetings

{% for greeting in greetings %}

{% if greeting.is_active %}⭐{% endif %} {{ greeting.filename }}

📅 {{ greeting.date }} | âąī¸ {{ "%.1f"|format(greeting.duration) }}s | 💾 {{ "%.2f"|format(greeting.size_mb) }} MB
{% if not greeting.is_active %} {% else %} {% endif %}
{% endfor %}
{% else %}

đŸŽĩ

No greeting messages uploaded yet. Upload your first greeting!

{% endif %}

đŸŽ™ī¸ Recordings

{% if recordings %}
{{ recordings|length }}
Total Recordings
{{ "%.1f"|format(recordings|sum(attribute='size_mb')) }} MB
Total Size
{{ "%.1f"|format(recordings|sum(attribute='duration')/60) }} min
Total Duration
{% for recording in recordings %}

{{ recording.filename }}

📅 {{ recording.date }} | âąī¸ {{ "%.1f"|format(recording.duration) }}s | 💾 {{ "%.2f"|format(recording.size_mb) }} MB
{% endfor %}
{% else %}

📭

No recordings yet. Pick up the phone to start recording!

{% endif %}

đŸŽĩ Audio Player

''') else: print(f"Template already exists at {template_file}") # Start phone handling in separate thread phone_thread = threading.Thread(target=phone.phone_loop, daemon=True) phone_thread.start() # Get and display local IP local_ip = get_local_ip() print("\n" + "="*60) print(f"Web Interface Available At:") print(f" http://{local_ip}:{WEB_PORT}") print(f" http://localhost:{WEB_PORT}") print("="*60 + "\n") # Start Flask web server app.run(host='0.0.0.0', port=WEB_PORT, debug=False, threaded=True)