#!/usr/bin/env python3 """ Rotary Phone Audio Handler with Web Interface Detects handset pickup, plays custom sound, records audio, and provides web UI """ import pyaudio import wave import time import RPi.GPIO as GPIO from datetime import datetime import os import numpy as np import threading from flask import Flask, render_template, request, send_file, jsonify, redirect, url_for from werkzeug.utils import secure_filename import json import sys # Load configuration def load_system_config(): """Load system configuration from config.json""" config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') # Check if config exists, otherwise use example if not os.path.exists(config_path): example_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.example.json') if os.path.exists(example_path): print(f"Config file not found. Please copy {example_path} to {config_path}") print("Command: cp config.example.json config.json") sys.exit(1) else: print("ERROR: No configuration file found!") sys.exit(1) with open(config_path, 'r') as f: return json.load(f) # Load system configuration SYS_CONFIG = load_system_config() # GPIO Configuration HOOK_PIN = SYS_CONFIG['gpio']['hook_pin'] HOOK_PRESSED = GPIO.LOW if SYS_CONFIG['gpio']['hook_pressed_state'] == 'LOW' else GPIO.HIGH # Extra Button Configuration EXTRA_BUTTON_ENABLED = SYS_CONFIG['gpio'].get('extra_button_enabled', False) EXTRA_BUTTON_PIN = SYS_CONFIG['gpio'].get('extra_button_pin', 27) EXTRA_BUTTON_PRESSED = GPIO.LOW if SYS_CONFIG['gpio'].get('extra_button_pressed_state', 'LOW') == 'LOW' else GPIO.HIGH # Audio settings AUDIO_DEVICE_INDEX = SYS_CONFIG['audio']['device_index'] CHUNK = SYS_CONFIG['audio']['chunk_size'] FORMAT = pyaudio.paInt16 # Fixed format CHANNELS = SYS_CONFIG['audio']['channels'] RATE = SYS_CONFIG['audio']['sample_rate'] RECORD_SECONDS = SYS_CONFIG['audio']['max_record_seconds'] # Directories SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.join(SCRIPT_DIR, SYS_CONFIG['paths']['base_dir']) if SYS_CONFIG['paths']['base_dir'].startswith('.') else SYS_CONFIG['paths']['base_dir'] BASE_DIR = os.path.abspath(BASE_DIR) OUTPUT_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['recordings_dir']) SOUNDS_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['sounds_dir']) USER_CONFIG_FILE = os.path.join(BASE_DIR, "user_config.json") # Runtime user settings DIALTONE_FILE = os.path.join(SOUNDS_DIR, "dialtone.wav") # Web server settings WEB_PORT = SYS_CONFIG['web']['port'] # Flask app app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = SYS_CONFIG['web']['max_upload_size_mb'] * 1024 * 1024 class RotaryPhone: def __init__(self): self.audio = pyaudio.PyAudio() self.recording = False self.phone_status = "on_hook" self.current_recording = None # Setup GPIO GPIO.setmode(GPIO.BCM) GPIO.setup(HOOK_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) # Setup extra button if enabled if EXTRA_BUTTON_ENABLED: GPIO.setup(EXTRA_BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) self.extra_button_playing = False # Create directories os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(SOUNDS_DIR, exist_ok=True) # Initialize configuration self.config = self.load_config() # Generate default dial tone if none exists if not os.path.exists(DIALTONE_FILE): self.generate_default_dialtone() def load_config(self): """Load user runtime configuration from JSON file""" default_config = { "active_greeting": SYS_CONFIG['system']['active_greeting'], "extra_button_sound": SYS_CONFIG['system'].get('extra_button_sound', 'button_sound.wav'), "greetings": [], "volume": SYS_CONFIG['system']['volume'] } if os.path.exists(USER_CONFIG_FILE): try: with open(USER_CONFIG_FILE, 'r') as f: config = json.load(f) # Ensure volume key exists if "volume" not in config: config["volume"] = SYS_CONFIG['system']['volume'] return config except: pass return default_config def save_config(self): """Save user runtime configuration to JSON file""" with open(USER_CONFIG_FILE, 'w') as f: json.dump(self.config, f, indent=2) def get_active_greeting_path(self): """Get the full path to the active greeting file""" active = self.config.get("active_greeting", "dialtone.wav") return os.path.join(SOUNDS_DIR, active) def set_active_greeting(self, filename): """Set which greeting message to play""" self.config["active_greeting"] = filename self.save_config() def get_extra_button_sound_path(self): """Get the full path to the extra button sound file""" sound = self.config.get("extra_button_sound", "button_sound.wav") return os.path.join(SOUNDS_DIR, sound) def set_extra_button_sound(self, filename): """Set which sound plays for extra button""" self.config["extra_button_sound"] = filename self.save_config() def set_volume(self, volume): """Set playback volume (0-100)""" volume = max(0, min(100, int(volume))) # Clamp between 0-100 self.config["volume"] = volume self.save_config() return volume def get_volume(self): """Get current volume setting""" return self.config.get("volume", 70) def generate_default_dialtone(self): """Generate a classic dial tone (350Hz + 440Hz) and save as default""" print("Generating default dial tone...") duration = 3 sample_rate = 44100 t = np.linspace(0, duration, int(sample_rate * duration), False) # Generate two frequencies and combine them tone1 = np.sin(2 * np.pi * 350 * t) tone2 = np.sin(2 * np.pi * 440 * t) tone = (tone1 + tone2) / 2 # Convert to 16-bit PCM tone = (tone * 32767).astype(np.int16) # Save as WAV file wf = wave.open(DIALTONE_FILE, 'wb') wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(sample_rate) wf.writeframes(tone.tobytes()) wf.close() print(f"Default dial tone saved to {DIALTONE_FILE}") def play_sound_file(self, filepath, check_hook_status=True): """Play a WAV file with volume control Args: filepath: Path to WAV file check_hook_status: If True, only play while off-hook (for greeting). If False, play completely regardless of hook (for button) """ if not os.path.exists(filepath): print(f"Sound file not found: {filepath}") return False print(f"Playing sound: {filepath}") try: wf = wave.open(filepath, 'rb') # Use configured audio device stream = self.audio.open( format=self.audio.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) # Get volume multiplier (0.0 to 1.0) volume = self.get_volume() / 100.0 # Play the sound with volume control data = wf.readframes(CHUNK) while data: # For greeting sounds, stop if handset is hung up if check_hook_status and self.phone_status != "off_hook": break # Apply volume by converting to numpy array and scaling if volume < 1.0: audio_data = np.frombuffer(data, dtype=np.int16) audio_data = (audio_data * volume).astype(np.int16) data = audio_data.tobytes() stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() wf.close() print("Sound playback finished") return True except Exception as e: print(f"Error playing sound: {e}") return False def record_audio(self): """Record audio from the microphone""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(OUTPUT_DIR, f"recording_{timestamp}.wav") self.current_recording = filename print(f"Recording to {filename}") try: # Use configured audio device stream = self.audio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, input_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) frames = [] self.recording = True # Record until handset is hung up or max time reached start_time = time.time() while self.recording and (time.time() - start_time) < RECORD_SECONDS: # Check if handset is still off hook if GPIO.input(HOOK_PIN) != HOOK_PRESSED: print("Handset hung up, stopping recording") break try: data = stream.read(CHUNK, exception_on_overflow=False) frames.append(data) except Exception as e: print(f"Error reading audio: {e}") break stream.stop_stream() stream.close() # Save the recording if frames: wf = wave.open(filename, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(self.audio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() duration = len(frames) * CHUNK / RATE print(f"Recording saved: {filename} ({duration:.1f}s)") else: print("No audio recorded") except Exception as e: print(f"Recording error: {e}") self.recording = False self.current_recording = None def get_status(self): """Get current phone status""" return { "status": self.phone_status, "recording": self.recording, "current_recording": self.current_recording } def play_extra_button_sound(self): """Play sound when extra button is pressed (only when off-hook/in call)""" if not EXTRA_BUTTON_ENABLED: return # Only play if phone is OFF hook (during a call) if self.phone_status != "off_hook": print("Extra button ignored - phone is on hook") return button_sound = self.get_extra_button_sound_path() if not os.path.exists(button_sound): print(f"Extra button sound not found: {button_sound}") return print(f"\n=== Extra button pressed ===") self.extra_button_playing = True # Play without checking hook status - play entire sound self.play_sound_file(button_sound, check_hook_status=False) self.extra_button_playing = False def phone_loop(self): """Main phone handling loop""" print("Rotary Phone System Started") print(f"Hook pin: GPIO {HOOK_PIN}") if EXTRA_BUTTON_ENABLED: print(f"Extra button: GPIO {EXTRA_BUTTON_PIN}") print(f"Recordings will be saved to: {OUTPUT_DIR}") try: while True: # Check extra button first (higher priority) if EXTRA_BUTTON_ENABLED and GPIO.input(EXTRA_BUTTON_PIN) == EXTRA_BUTTON_PRESSED: if not self.extra_button_playing: # Play extra button sound in a separate thread to not block button_thread = threading.Thread(target=self.play_extra_button_sound, daemon=True) button_thread.start() time.sleep(0.5) # Debounce # Wait for handset pickup if GPIO.input(HOOK_PIN) == HOOK_PRESSED and self.phone_status == "on_hook": self.phone_status = "off_hook" print("\n=== Handset picked up ===") # Apply greeting delay if configured greeting_delay = SYS_CONFIG['system'].get('greeting_delay_seconds', 0) if greeting_delay > 0: print(f"Waiting {greeting_delay} seconds before greeting...") time.sleep(greeting_delay) # Play active greeting message greeting_file = self.get_active_greeting_path() self.play_sound_file(greeting_file) # Start recording if self.phone_status == "off_hook": # Still off hook after sound self.record_audio() self.phone_status = "on_hook" time.sleep(0.1) except KeyboardInterrupt: print("\nShutting down phone system...") finally: self.cleanup() def cleanup(self): """Clean up resources""" self.audio.terminate() GPIO.cleanup() print("Cleanup complete") # Global phone instance phone = RotaryPhone() # Flask Routes @app.route('/') def index(): """Main page""" recordings = get_recordings() greetings = get_greetings() status = phone.get_status() active_greeting = phone.config.get("active_greeting", "dialtone.wav") extra_button_sound = phone.config.get("extra_button_sound", "button_sound.wav") volume = phone.get_volume() return render_template('index.html', recordings=recordings, greetings=greetings, active_greeting=active_greeting, extra_button_sound=extra_button_sound, extra_button_enabled=EXTRA_BUTTON_ENABLED, status=status, volume=volume) @app.route('/api/status') def api_status(): """API endpoint for phone status""" return jsonify(phone.get_status()) @app.route('/api/recordings') def api_recordings(): """API endpoint for recordings list""" return jsonify(get_recordings()) @app.route('/api/greetings') def api_greetings(): """API endpoint for greetings list""" return jsonify(get_greetings()) @app.route('/api/volume', methods=['GET']) def api_get_volume(): """Get current volume setting""" return jsonify({"volume": phone.get_volume()}) @app.route('/api/volume', methods=['POST']) def api_set_volume(): """Set volume level""" data = request.get_json() volume = data.get('volume', 70) new_volume = phone.set_volume(volume) return jsonify({"success": True, "volume": new_volume}) @app.route('/upload_greeting', methods=['POST']) def upload_greeting(): """Upload a new greeting message""" if 'soundfile' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['soundfile'] if file.filename == '': return jsonify({"error": "No file selected"}), 400 if file and file.filename.lower().endswith('.wav'): filename = secure_filename(file.filename) # Ensure unique filename counter = 1 base_name = os.path.splitext(filename)[0] while os.path.exists(os.path.join(SOUNDS_DIR, filename)): filename = f"{base_name}_{counter}.wav" counter += 1 filepath = os.path.join(SOUNDS_DIR, filename) file.save(filepath) return jsonify({"success": True, "message": f"Greeting '{filename}' uploaded successfully", "filename": filename}) return jsonify({"error": "Only WAV files are supported"}), 400 @app.route('/set_active_greeting', methods=['POST']) def set_active_greeting(): """Set which greeting to play""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Greeting file not found"}), 404 phone.set_active_greeting(filename) return jsonify({"success": True, "message": f"Active greeting set to '{filename}'"}) @app.route('/set_extra_button_sound', methods=['POST']) def set_extra_button_sound(): """Set which sound plays for extra button""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Sound file not found"}), 404 phone.set_extra_button_sound(filename) return jsonify({"success": True, "message": f"Extra button sound set to '{filename}'"}) @app.route('/delete_greeting/', methods=['POST']) def delete_greeting(filename): """Delete a greeting file""" filename = secure_filename(filename) filepath = os.path.join(SOUNDS_DIR, filename) # Don't delete the active greeting if filename == phone.config.get("active_greeting"): return jsonify({"error": "Cannot delete the active greeting. Please select a different greeting first."}), 400 if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/play_audio//') def play_audio(audio_type, filename): """Serve audio file for web playback""" filename = secure_filename(filename) if audio_type == 'recording': filepath = os.path.join(OUTPUT_DIR, filename) elif audio_type == 'greeting': filepath = os.path.join(SOUNDS_DIR, filename) else: return "Invalid audio type", 400 if os.path.exists(filepath): return send_file(filepath, mimetype='audio/wav') return "File not found", 404 @app.route('/download/') def download_recording(filename): """Download a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return "File not found", 404 @app.route('/delete/', methods=['POST']) def delete_recording(filename): """Delete a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/restore_default_sound', methods=['POST']) def restore_default_sound(): """Restore default dial tone""" if os.path.exists(DIALTONE_FILE): os.remove(DIALTONE_FILE) phone.generate_default_dialtone() phone.set_active_greeting("dialtone.wav") return jsonify({"success": True, "message": "Default dial tone restored"}) def get_greetings(): """Get list of all greeting sound files""" greetings = [] if os.path.exists(SOUNDS_DIR): for filename in sorted(os.listdir(SOUNDS_DIR)): if filename.endswith('.wav'): filepath = os.path.join(SOUNDS_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 greetings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration, "is_active": filename == phone.config.get("active_greeting", "dialtone.wav"), "is_button_sound": filename == phone.config.get("extra_button_sound", "button_sound.wav") }) return greetings def get_recordings(): """Get list of all recordings with metadata""" recordings = [] if os.path.exists(OUTPUT_DIR): for filename in sorted(os.listdir(OUTPUT_DIR), reverse=True): if filename.endswith('.wav'): filepath = os.path.join(OUTPUT_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 recordings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration }) return recordings def get_local_ip(): """Get local IP address""" import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except: return "127.0.0.1" if __name__ == "__main__": # Create templates directory and HTML template script_dir = os.path.dirname(os.path.abspath(__file__)) templates_dir = os.path.join(script_dir, 'templates') os.makedirs(templates_dir, exist_ok=True) # Create the HTML template if it doesn't exist template_file = os.path.join(templates_dir, 'index.html') if not os.path.exists(template_file): print(f"Creating template at {template_file}") with open(template_file, 'w') as f: f.write(''' Rotary Phone Control Panel

📞 Rotary Phone Control Panel

Manage your vintage phone system

Phone Status

{% if status.recording %} 🔴 Recording in progress... {% elif status.status == 'off_hook' %} 📞 Handset off hook {% else %} ✅ Ready (handset on hook) {% endif %}
{% if status.current_recording %}

Recording to: {{ status.current_recording.split('/')[-1] }}

{% endif %}

🔊 Volume Control

🔇 🔊 {{ volume }}%

Adjust the playback volume for greeting messages

đŸŽĩ Greeting Messages

â„šī¸ Active greeting: {{ active_greeting }} {% if extra_button_enabled %}
🔘 Extra button sound: {{ extra_button_sound }} {% endif %}

Upload WAV files to play when the handset is picked up

{% if greetings %}

Available Greetings

{% for greeting in greetings %}

{% if greeting.is_active %}⭐{% endif %} {% if greeting.is_button_sound and extra_button_enabled %}🔘{% endif %} {{ greeting.filename }}

📅 {{ greeting.date }} | âąī¸ {{ "%.1f"|format(greeting.duration) }}s | 💾 {{ "%.2f"|format(greeting.size_mb) }} MB
{% if not greeting.is_active %} {% if extra_button_enabled and not greeting.is_button_sound %} {% endif %} {% else %} {% if extra_button_enabled and not greeting.is_button_sound %} {% endif %} {% endif %}
{% endfor %}
{% else %}

đŸŽĩ

No greeting messages uploaded yet. Upload your first greeting!

{% endif %}

đŸŽ™ī¸ Recordings

{% if recordings %}
{{ recordings|length }}
Total Recordings
{{ "%.1f"|format(recordings|sum(attribute='size_mb')) }} MB
Total Size
{{ "%.1f"|format(recordings|sum(attribute='duration')/60) }} min
Total Duration
{% for recording in recordings %}

{{ recording.filename }}

📅 {{ recording.date }} | âąī¸ {{ "%.1f"|format(recording.duration) }}s | 💾 {{ "%.2f"|format(recording.size_mb) }} MB
{% endfor %}
{% else %}

📭

No recordings yet. Pick up the phone to start recording!

{% endif %}

đŸŽĩ Audio Player

''') else: print(f"Template already exists at {template_file}") # Start phone handling in separate thread phone_thread = threading.Thread(target=phone.phone_loop, daemon=True) phone_thread.start() # Get and display local IP local_ip = get_local_ip() print("\n" + "="*60) print(f"Web Interface Available At:") print(f" http://{local_ip}:{WEB_PORT}") print(f" http://localhost:{WEB_PORT}") print("="*60 + "\n") # Start Flask web server app.run(host='0.0.0.0', port=WEB_PORT, debug=False, threaded=True)