#!/usr/bin/env python3 """ Rotary Phone Audio Handler with Web Interface Detects handset pickup, plays custom sound, records audio, and provides web UI """ import pyaudio import wave import time import RPi.GPIO as GPIO from datetime import datetime import os import numpy as np import threading from flask import Flask, render_template, request, send_file, jsonify, redirect, url_for from werkzeug.utils import secure_filename from waitress import serve import json import sys import zlib import shutil import zipfile import io # Load configuration def load_system_config(): """Load system configuration from config.json and auto-update with missing defaults""" config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') # Check if config exists, otherwise use example if not os.path.exists(config_path): example_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.example.json') if os.path.exists(example_path): print(f"Config file not found. Please copy {example_path} to {config_path}") print("Command: cp config.example.json config.json") sys.exit(1) else: print("ERROR: No configuration file found!") sys.exit(1) with open(config_path, 'r') as f: config = json.load(f) # Default values for all settings defaults = { 'gpio': { 'hook_pin': 17, 'hook_pressed_state': 'LOW', 'extra_button_enabled': False, 'extra_button_pin': 27, 'extra_button_pressed_state': 'LOW' }, 'audio': { 'device_index': 1, 'chunk_size': 1024, 'format': 'paInt16', 'channels': 1, 'sample_rate': 48000, 'max_record_seconds': 300 }, 'paths': { 'base_dir': './rotary_phone_data', 'recordings_dir': 'recordings', 'sounds_dir': 'sounds' }, 'backup': { 'enabled': True, 'usb_paths': ['/media/usb0', '/media/usb1'], 'verify_crc': True, 'backup_on_write': True }, 'web': { 'port': 8080, 'max_upload_size_mb': 50 }, 'system': { 'active_greeting': 'dialtone.wav', 'extra_button_sound': 'button_sound.wav', 'beep_sound': 'beep.wav', 'beep_enabled': True, 'greeting_delay_seconds': 0, 'volume': 70, 'volume_greeting': 70, 'volume_button': 70, 'volume_beep': 70 } } # Check if any defaults are missing and add them updated = False for section, section_defaults in defaults.items(): if section not in config: config[section] = section_defaults updated = True print(f"[CONFIG] Added missing section: {section}") else: for key, default_value in section_defaults.items(): if key not in config[section]: config[section][key] = default_value updated = True print(f"[CONFIG] Added missing setting: {section}.{key} = {default_value}") # Save updated config if changes were made if updated: try: with open(config_path, 'w') as f: json.dump(config, f, indent=2) print(f"[CONFIG] Updated config.json with missing defaults") except Exception as e: print(f"[CONFIG] Warning: Could not save updated config: {e}") return config # Load system configuration SYS_CONFIG = load_system_config() # GPIO Configuration HOOK_PIN = SYS_CONFIG['gpio']['hook_pin'] HOOK_PRESSED = GPIO.LOW if SYS_CONFIG['gpio']['hook_pressed_state'] == 'LOW' else GPIO.HIGH # Extra Button Configuration EXTRA_BUTTON_ENABLED = SYS_CONFIG['gpio'].get('extra_button_enabled', False) EXTRA_BUTTON_PIN = SYS_CONFIG['gpio'].get('extra_button_pin', 27) EXTRA_BUTTON_PRESSED = GPIO.LOW if SYS_CONFIG['gpio'].get('extra_button_pressed_state', 'LOW') == 'LOW' else GPIO.HIGH # Audio settings AUDIO_DEVICE_INDEX = SYS_CONFIG['audio']['device_index'] CHUNK = SYS_CONFIG['audio']['chunk_size'] FORMAT = pyaudio.paInt16 # Fixed format CHANNELS = SYS_CONFIG['audio']['channels'] RATE = SYS_CONFIG['audio']['sample_rate'] RECORD_SECONDS = SYS_CONFIG['audio']['max_record_seconds'] # Directories SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.join(SCRIPT_DIR, SYS_CONFIG['paths']['base_dir']) if SYS_CONFIG['paths']['base_dir'].startswith('.') else SYS_CONFIG['paths']['base_dir'] BASE_DIR = os.path.abspath(BASE_DIR) OUTPUT_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['recordings_dir']) SOUNDS_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['sounds_dir']) USER_CONFIG_FILE = os.path.join(BASE_DIR, "user_config.json") # Runtime user settings DIALTONE_FILE = os.path.join(SOUNDS_DIR, "dialtone.wav") # Backup Configuration BACKUP_CONFIG = SYS_CONFIG.get('backup', {}) BACKUP_ENABLED = BACKUP_CONFIG.get('enabled', False) BACKUP_USB_PATHS = BACKUP_CONFIG.get('usb_paths', []) BACKUP_VERIFY_CRC = BACKUP_CONFIG.get('verify_crc', True) BACKUP_ON_WRITE = BACKUP_CONFIG.get('backup_on_write', True) # Web server settings WEB_PORT = SYS_CONFIG['web']['port'] # Template version - increment this when HTML template changes TEMPLATE_VERSION = "2.0.0" # Updated: Modern responsive design with mobile-first approach, improved layout and animations # Flask app app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = SYS_CONFIG['web']['max_upload_size_mb'] * 1024 * 1024 # Backup and CRC Functions def calculate_crc32(filepath): """Calculate CRC32 checksum of a file""" try: with open(filepath, 'rb') as f: checksum = 0 while chunk := f.read(65536): # Read in 64KB chunks checksum = zlib.crc32(chunk, checksum) return checksum & 0xFFFFFFFF except Exception as e: print(f"Error calculating CRC for {filepath}: {e}") return None def backup_file_to_usb(source_file, relative_path): """ Backup a file to all configured USB drives with CRC verification Args: source_file: Full path to source file relative_path: Relative path from base dir (e.g., 'recordings/file.wav') Returns: dict: Status of each USB backup attempt """ if not BACKUP_ENABLED or not BACKUP_ON_WRITE: return {"enabled": False} if not os.path.exists(source_file): return {"error": f"Source file not found: {source_file}"} # Calculate source CRC if verification is enabled source_crc = None if BACKUP_VERIFY_CRC: source_crc = calculate_crc32(source_file) if source_crc is None: return {"error": "Failed to calculate source CRC"} results = {} for usb_path in BACKUP_USB_PATHS: usb_name = os.path.basename(usb_path) # Check if USB is mounted if not os.path.exists(usb_path) or not os.path.ismount(usb_path): results[usb_name] = {"status": "not_mounted", "path": usb_path} continue try: # Create backup directory structure backup_dir = os.path.join(usb_path, "wedding-phone-backup") dest_dir = os.path.join(backup_dir, os.path.dirname(relative_path)) os.makedirs(dest_dir, exist_ok=True) # Copy file dest_file = os.path.join(backup_dir, relative_path) shutil.copy2(source_file, dest_file) # Verify CRC if enabled if BACKUP_VERIFY_CRC: dest_crc = calculate_crc32(dest_file) if dest_crc != source_crc: results[usb_name] = { "status": "crc_mismatch", "path": dest_file, "source_crc": hex(source_crc), "dest_crc": hex(dest_crc) if dest_crc else None } # Delete corrupted backup try: os.remove(dest_file) except: pass continue results[usb_name] = { "status": "success", "path": dest_file, "crc": hex(source_crc) if source_crc else None } except Exception as e: results[usb_name] = {"status": "error", "error": str(e)} return results def get_usb_backup_status(): """Get status of all configured USB backup drives""" status = { "enabled": BACKUP_ENABLED, "verify_crc": BACKUP_VERIFY_CRC, "backup_on_write": BACKUP_ON_WRITE, "drives": [] } for usb_path in BACKUP_USB_PATHS: drive_info = { "path": usb_path, "name": os.path.basename(usb_path), "mounted": os.path.exists(usb_path) and os.path.ismount(usb_path), "writable": False, "free_space": None } if drive_info["mounted"]: try: # Try to create backup directory first backup_dir = os.path.join(usb_path, "wedding-phone-backup") try: os.makedirs(backup_dir, exist_ok=True) test_location = backup_dir except PermissionError: # Can't create backup dir, try root test_location = usb_path # Check if writable in backup directory test_file = os.path.join(test_location, ".wedding_phone_test") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) drive_info["writable"] = True drive_info["backup_dir"] = backup_dir if test_location == backup_dir else None # Get free space stat = os.statvfs(usb_path) drive_info["free_space"] = stat.f_bavail * stat.f_frsize drive_info["free_space_mb"] = drive_info["free_space"] / (1024 * 1024) except PermissionError as e: drive_info["error"] = f"Permission denied. Mount with proper permissions or run: sudo chown -R $USER {usb_path}" except Exception as e: drive_info["error"] = str(e) status["drives"].append(drive_info) return status class RotaryPhone: def __init__(self): self.audio = pyaudio.PyAudio() self.recording = False self.phone_status = "on_hook" self.current_recording = None # Setup GPIO GPIO.setmode(GPIO.BCM) GPIO.setup(HOOK_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) # Setup extra button if enabled if EXTRA_BUTTON_ENABLED: GPIO.setup(EXTRA_BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) self.extra_button_playing = False # Create directories os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(SOUNDS_DIR, exist_ok=True) # Initialize configuration self.config = self.load_config() # Generate default dial tone if none exists if not os.path.exists(DIALTONE_FILE): self.generate_default_dialtone() # Generate default beep if none exists beep_file = os.path.join(SOUNDS_DIR, "beep.wav") if not os.path.exists(beep_file): self.generate_default_beep() def load_config(self): """Load user runtime configuration from JSON file""" default_config = { "active_greeting": SYS_CONFIG['system']['active_greeting'], "extra_button_sound": SYS_CONFIG['system'].get('extra_button_sound', 'button_sound.wav'), "beep_sound": SYS_CONFIG['system'].get('beep_sound', 'beep.wav'), "beep_enabled": SYS_CONFIG['system'].get('beep_enabled', True), "greetings": [], "volume": SYS_CONFIG['system']['volume'], "volume_greeting": SYS_CONFIG['system'].get('volume_greeting', 70), "volume_button": SYS_CONFIG['system'].get('volume_button', 70), "volume_beep": SYS_CONFIG['system'].get('volume_beep', 70), "greeting_delay": SYS_CONFIG['system'].get('greeting_delay_seconds', 0) } if os.path.exists(USER_CONFIG_FILE): try: with open(USER_CONFIG_FILE, 'r') as f: config = json.load(f) # Check for missing keys and add defaults updated = False for key, default_value in default_config.items(): if key not in config: config[key] = default_value updated = True print(f"[USER_CONFIG] Added missing setting: {key} = {default_value}") # Save updated config if changes were made if updated: try: with open(USER_CONFIG_FILE, 'w') as fw: json.dump(config, fw, indent=2) print(f"[USER_CONFIG] Updated user_config.json with missing defaults") except Exception as e: print(f"[USER_CONFIG] Warning: Could not save updated config: {e}") return config except Exception as e: print(f"[USER_CONFIG] Error loading user config: {e}, using defaults") pass return default_config def save_config(self): """Save user runtime configuration to JSON file""" with open(USER_CONFIG_FILE, 'w') as f: json.dump(self.config, f, indent=2) def get_active_greeting_path(self): """Get the full path to the active greeting file""" active = self.config.get("active_greeting", "dialtone.wav") return os.path.join(SOUNDS_DIR, active) def set_active_greeting(self, filename): """Set which greeting message to play""" self.config["active_greeting"] = filename self.save_config() def get_extra_button_sound_path(self): """Get the full path to the extra button sound file""" sound = self.config.get("extra_button_sound", "button_sound.wav") return os.path.join(SOUNDS_DIR, sound) def set_extra_button_sound(self, filename): """Set which sound plays for extra button""" self.config["extra_button_sound"] = filename self.save_config() def get_beep_sound_path(self): """Get the full path to the beep sound file""" sound = self.config.get("beep_sound", "beep.wav") return os.path.join(SOUNDS_DIR, sound) def set_beep_sound(self, filename): """Set which sound plays as recording beep""" self.config["beep_sound"] = filename self.save_config() def is_beep_enabled(self): """Check if beep sound is enabled""" return self.config.get("beep_enabled", True) def set_beep_enabled(self, enabled): """Enable or disable beep sound""" self.config["beep_enabled"] = bool(enabled) self.save_config() def set_volume(self, volume): """Set playback volume (0-100)""" volume = max(0, min(100, int(volume))) # Clamp between 0-100 self.config["volume"] = volume self.save_config() return volume def get_volume(self): """Get current volume setting""" return self.config.get("volume", 70) def set_volume_greeting(self, volume): """Set greeting volume (0-100)""" volume = max(0, min(100, int(volume))) self.config["volume_greeting"] = volume self.save_config() return volume def get_volume_greeting(self): """Get greeting volume setting""" return self.config.get("volume_greeting", 70) def set_volume_button(self, volume): """Set button sound volume (0-100)""" volume = max(0, min(100, int(volume))) self.config["volume_button"] = volume self.save_config() return volume def get_volume_button(self): """Get button sound volume setting""" return self.config.get("volume_button", 70) def set_volume_beep(self, volume): """Set beep sound volume (0-100)""" volume = max(0, min(100, int(volume))) self.config["volume_beep"] = volume self.save_config() return volume def get_volume_beep(self): """Get beep sound volume setting""" return self.config.get("volume_beep", 70) def set_greeting_delay(self, delay): """Set greeting delay in seconds (0-10)""" delay = max(0, min(10, int(delay))) # Clamp between 0-10 self.config["greeting_delay"] = delay self.save_config() return delay def get_greeting_delay(self): """Get current greeting delay""" return self.config.get("greeting_delay", 0) def generate_default_dialtone(self): """Generate a classic dial tone (350Hz + 440Hz) and save as default""" print(f"Generating default dial tone at {RATE}Hz...") duration = 3 t = np.linspace(0, duration, int(RATE * duration), False) # Generate two frequencies and combine them tone1 = np.sin(2 * np.pi * 350 * t) tone2 = np.sin(2 * np.pi * 440 * t) tone = (tone1 + tone2) / 2 # Convert to 16-bit PCM tone = (tone * 32767).astype(np.int16) # Save as WAV file wf = wave.open(DIALTONE_FILE, 'wb') wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(RATE) wf.writeframes(tone.tobytes()) wf.close() print(f"Default dial tone saved to {DIALTONE_FILE} ({RATE}Hz, 16-bit)") def generate_default_beep(self): """Generate a simple beep tone (1000Hz) to indicate recording start""" print(f"Generating default beep sound at {RATE}Hz...") beep_file = os.path.join(SOUNDS_DIR, "beep.wav") duration = 0.5 # Half second beep t = np.linspace(0, duration, int(RATE * duration), False) # Generate 1000Hz tone frequency = 1000 tone = np.sin(2 * np.pi * frequency * t) # Convert to 16-bit PCM tone = (tone * 32767).astype(np.int16) # Save as WAV file wf = wave.open(beep_file, 'wb') wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(RATE) wf.writeframes(tone.tobytes()) wf.close() print(f"Default beep sound saved to {beep_file} ({RATE}Hz, 16-bit)") def play_sound_file(self, filepath, check_hook_status=True, volume_override=None): """Play a WAV file with volume control Args: filepath: Path to WAV file check_hook_status: If True, only play while off-hook (for greeting). If False, play completely regardless of hook (for button) volume_override: Override volume (0-100), or None to use default volume """ if not os.path.exists(filepath): print(f"Sound file not found: {filepath}") return False print(f"Playing sound: {filepath}") try: wf = wave.open(filepath, 'rb') # Check sample rate compatibility file_rate = wf.getframerate() if file_rate != RATE: print(f"WARNING: File sample rate ({file_rate}Hz) doesn't match configured rate ({RATE}Hz)") print(f"Audio may not play correctly. Please resample the file to {RATE}Hz") wf.close() return False # Use configured audio device stream = self.audio.open( format=self.audio.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=file_rate, output=True, output_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) # Get volume multiplier (0.0 to 1.0) volume = (volume_override if volume_override is not None else self.get_volume()) / 100.0 # Play the sound with volume control data = wf.readframes(CHUNK) while data: # For greeting sounds, stop immediately if handset is hung up if check_hook_status and GPIO.input(HOOK_PIN) != HOOK_PRESSED: print("Handset hung up, stopping playback immediately") break # Apply volume by converting to numpy array and scaling if volume < 1.0: audio_data = np.frombuffer(data, dtype=np.int16) audio_data = (audio_data * volume).astype(np.int16) data = audio_data.tobytes() stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() wf.close() print("Sound playback finished") return True except Exception as e: print(f"Error playing sound: {e}") return False def record_audio(self): """Record audio from the microphone""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(OUTPUT_DIR, f"recording_{timestamp}.wav") self.current_recording = filename print(f"Recording to {filename}") MIN_RECORDING_DURATION = 1.0 # Minimum 1 second to save recording try: # Use configured audio device stream = self.audio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, input_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) frames = [] self.recording = True # Record until handset is hung up or max time reached start_time = time.time() while self.recording and (time.time() - start_time) < RECORD_SECONDS: # Check if handset is still off hook - immediate detection if GPIO.input(HOOK_PIN) != HOOK_PRESSED: print("Handset hung up, stopping recording immediately") break # Check extra button during recording if EXTRA_BUTTON_ENABLED and GPIO.input(EXTRA_BUTTON_PIN) == EXTRA_BUTTON_PRESSED: print(f"[BUTTON] Button pressed during recording!") if not self.extra_button_playing: print(f"[BUTTON] Triggering button sound...") button_thread = threading.Thread(target=self.play_extra_button_sound, daemon=True) button_thread.start() time.sleep(0.5) # Debounce to prevent multiple triggers try: data = stream.read(CHUNK, exception_on_overflow=False) frames.append(data) except Exception as e: print(f"Error reading audio: {e}") break stream.stop_stream() stream.close() # Calculate recording duration duration = len(frames) * CHUNK / RATE if frames else 0 # Only save if recording is long enough if frames and duration >= MIN_RECORDING_DURATION: wf = wave.open(filename, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(self.audio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() print(f"Recording saved: {filename} ({duration:.1f}s)") # Backup to USB drives if enabled if BACKUP_ENABLED: relative_path = os.path.join(SYS_CONFIG['paths']['recordings_dir'], os.path.basename(filename)) backup_results = backup_file_to_usb(filename, relative_path) print(f"Backup results: {backup_results}") else: # Delete aborted/too short recording if os.path.exists(filename): os.remove(filename) print(f"Recording aborted or too short ({duration:.1f}s), not saved") except Exception as e: print(f"Recording error: {e}") # Clean up failed recording file if os.path.exists(filename): try: os.remove(filename) print(f"Cleaned up failed recording: {filename}") except: pass self.recording = False self.current_recording = None def get_status(self): """Get current phone status""" return { "status": self.phone_status, "recording": self.recording, "current_recording": self.current_recording, "extra_button_playing": self.extra_button_playing if EXTRA_BUTTON_ENABLED else False } def play_extra_button_sound(self): """Play sound when extra button is pressed (only during recording)""" if not EXTRA_BUTTON_ENABLED: print("[BUTTON] Extra button disabled in config") return # Only play if currently recording if not self.recording: print("[BUTTON] Extra button ignored - not recording") return button_sound = self.get_extra_button_sound_path() print(f"[BUTTON] Button sound path: {button_sound}") if not os.path.exists(button_sound): print(f"[BUTTON] ERROR: Sound file not found: {button_sound}") return print(f"[BUTTON] Playing extra button sound...") self.extra_button_playing = True try: # Use a separate PyAudio instance for playback during recording # This allows simultaneous input (recording) and output (button sound) print(f"[BUTTON] Opening audio file...") audio_playback = pyaudio.PyAudio() wf = wave.open(button_sound, 'rb') file_rate = wf.getframerate() file_channels = wf.getnchannels() file_width = wf.getsampwidth() print(f"[BUTTON] File info - Rate: {file_rate}Hz, Channels: {file_channels}, Width: {file_width}") if file_rate != RATE: print(f"[BUTTON] WARNING: File sample rate ({file_rate}Hz) != configured rate ({RATE}Hz)") print(f"[BUTTON] Opening audio stream on device {AUDIO_DEVICE_INDEX}...") stream = audio_playback.open( format=audio_playback.get_format_from_width(file_width), channels=file_channels, rate=file_rate, output=True, output_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) # Get button volume multiplier button_volume = self.config.get("volume_button", 70) volume = button_volume / 100.0 print(f"[BUTTON] Volume: {int(volume * 100)}%") # Play the sound print(f"[BUTTON] Starting playback...") data = wf.readframes(CHUNK) chunk_count = 0 while data: # Apply volume if volume < 1.0: audio_data = np.frombuffer(data, dtype=np.int16) audio_data = (audio_data * volume).astype(np.int16) data = audio_data.tobytes() stream.write(data) data = wf.readframes(CHUNK) chunk_count += 1 stream.stop_stream() stream.close() wf.close() audio_playback.terminate() print(f"[BUTTON] Playback finished ({chunk_count} chunks)") except Exception as e: print(f"[BUTTON] ERROR playing button sound: {e}") import traceback traceback.print_exc() finally: self.extra_button_playing = False print(f"[BUTTON] Button playback complete") def phone_loop(self): """Main phone handling loop""" print("Rotary Phone System Started") print(f"Hook pin: GPIO {HOOK_PIN}") if EXTRA_BUTTON_ENABLED: print(f"Extra button: ENABLED on GPIO {EXTRA_BUTTON_PIN}") print(f"Extra button pressed state: {EXTRA_BUTTON_PRESSED} ({'LOW' if EXTRA_BUTTON_PRESSED == GPIO.LOW else 'HIGH'})") print(f"Extra button sound: {self.get_extra_button_sound_path()}") else: print(f"Extra button: DISABLED") print(f"Recordings will be saved to: {OUTPUT_DIR}") try: while True: # Check extra button first (higher priority) if EXTRA_BUTTON_ENABLED and GPIO.input(EXTRA_BUTTON_PIN) == EXTRA_BUTTON_PRESSED: print(f"[BUTTON] Button pressed detected (recording={self.recording}, playing={self.extra_button_playing})") if not self.extra_button_playing: # Play extra button sound in a separate thread to not block print(f"[BUTTON] Starting button sound thread...") button_thread = threading.Thread(target=self.play_extra_button_sound, daemon=True) button_thread.start() time.sleep(0.5) # Debounce else: print(f"[BUTTON] Button sound already playing, ignoring") # Wait for handset pickup if GPIO.input(HOOK_PIN) == HOOK_PRESSED and self.phone_status == "on_hook": self.phone_status = "off_hook" print("\n=== Handset picked up ===") # Apply greeting delay if configured greeting_delay = self.get_greeting_delay() if greeting_delay > 0: print(f"Waiting {greeting_delay} seconds before greeting...") time.sleep(greeting_delay) # Play active greeting message greeting_file = self.get_active_greeting_path() greeting_volume = self.config.get("volume_greeting", 70) self.play_sound_file(greeting_file, volume_override=greeting_volume) # Play beep sound to indicate recording will start if self.phone_status == "off_hook" and self.is_beep_enabled(): beep_file = self.get_beep_sound_path() beep_volume = self.config.get("volume_beep", 70) if os.path.exists(beep_file): self.play_sound_file(beep_file, volume_override=beep_volume) # Start recording if self.phone_status == "off_hook": # Still off hook after sound self.record_audio() self.phone_status = "on_hook" time.sleep(0.1) except KeyboardInterrupt: print("\nShutting down phone system...") finally: self.cleanup() def cleanup(self): """Clean up resources""" self.audio.terminate() GPIO.cleanup() print("Cleanup complete") # Global phone instance phone = RotaryPhone() # Flask Routes @app.route('/') def index(): """Main page""" # Get sort parameters from query string sort_by = request.args.get('sort', 'date') sort_order = request.args.get('order', 'desc') recordings = get_recordings(sort_by=sort_by, sort_order=sort_order) greetings = get_greetings() status = phone.get_status() active_greeting = phone.config.get("active_greeting", "dialtone.wav") extra_button_sound = phone.config.get("extra_button_sound", "button_sound.wav") beep_sound = phone.config.get("beep_sound", "beep.wav") beep_enabled = phone.is_beep_enabled() volume = phone.get_volume() volume_greeting = phone.get_volume_greeting() volume_button = phone.get_volume_button() volume_beep = phone.get_volume_beep() greeting_delay = phone.get_greeting_delay() return render_template('index.html', recordings=recordings, greetings=greetings, active_greeting=active_greeting, extra_button_sound=extra_button_sound, beep_sound=beep_sound, beep_enabled=beep_enabled, extra_button_enabled=EXTRA_BUTTON_ENABLED, status=status, volume=volume, volume_greeting=volume_greeting, volume_button=volume_button, volume_beep=volume_beep, greeting_delay=greeting_delay, sort_by=sort_by, sort_order=sort_order) @app.route('/api/status') def api_status(): """API endpoint for phone status""" return jsonify(phone.get_status()) @app.route('/api/recordings') def api_recordings(): """API endpoint for recordings list""" return jsonify(get_recordings()) @app.route('/api/greetings') def api_greetings(): """API endpoint for greetings list""" return jsonify(get_greetings()) @app.route('/api/volume', methods=['GET']) def api_get_volume(): """Get current volume setting""" return jsonify({"volume": phone.get_volume()}) @app.route('/api/volume', methods=['POST']) def api_set_volume(): """Set volume level""" data = request.get_json() volume = data.get('volume', 70) new_volume = phone.set_volume(volume) return jsonify({"success": True, "volume": new_volume}) @app.route('/api/volume/greeting', methods=['GET']) def api_get_volume_greeting(): """Get greeting volume setting""" return jsonify({"volume": phone.get_volume_greeting()}) @app.route('/api/volume/greeting', methods=['POST']) def api_set_volume_greeting(): """Set greeting volume level""" data = request.get_json() volume = data.get('volume', 70) new_volume = phone.set_volume_greeting(volume) return jsonify({"success": True, "volume": new_volume}) @app.route('/api/volume/button', methods=['GET']) def api_get_volume_button(): """Get button sound volume setting""" return jsonify({"volume": phone.get_volume_button()}) @app.route('/api/volume/button', methods=['POST']) def api_set_volume_button(): """Set button sound volume level""" data = request.get_json() volume = data.get('volume', 70) new_volume = phone.set_volume_button(volume) return jsonify({"success": True, "volume": new_volume}) @app.route('/api/volume/beep', methods=['GET']) def api_get_volume_beep(): """Get beep sound volume setting""" return jsonify({"volume": phone.get_volume_beep()}) @app.route('/api/volume/beep', methods=['POST']) def api_set_volume_beep(): """Set beep sound volume level""" data = request.get_json() volume = data.get('volume', 70) new_volume = phone.set_volume_beep(volume) return jsonify({"success": True, "volume": new_volume}) @app.route('/api/greeting_delay', methods=['GET']) def api_get_greeting_delay(): """Get current greeting delay setting""" return jsonify({"delay": phone.get_greeting_delay()}) @app.route('/api/greeting_delay', methods=['POST']) def api_set_greeting_delay(): """Set greeting delay in seconds""" data = request.get_json() delay = data.get('delay', 0) new_delay = phone.set_greeting_delay(delay) return jsonify({"success": True, "delay": new_delay}) @app.route('/api/backup/status', methods=['GET']) def api_backup_status(): """Get USB backup drive status""" return jsonify(get_usb_backup_status()) @app.route('/api/backup/test', methods=['POST']) def api_backup_test(): """Test backup to all USB drives""" try: # Create a test file test_file = os.path.join(OUTPUT_DIR, ".backup_test.txt") with open(test_file, 'w') as f: f.write(f"Backup test at {datetime.now()}") # Try to backup results = backup_file_to_usb(test_file, os.path.join(SYS_CONFIG['paths']['recordings_dir'], ".backup_test.txt")) # Clean up test file try: os.remove(test_file) except: pass return jsonify({"success": True, "results": results}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/system/shutdown', methods=['POST']) def api_system_shutdown(): """Shutdown the Raspberry Pi""" try: import subprocess # Schedule shutdown in 1 minute to allow response to be sent subprocess.Popen(['sudo', '/sbin/shutdown', '-h', '+1']) return jsonify({"success": True, "message": "System will shutdown in 1 minute"}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/system/restart', methods=['POST']) def api_system_restart(): """Restart the Raspberry Pi""" try: import subprocess # Schedule restart to allow response to be sent subprocess.Popen(['sudo', '/sbin/reboot']) return jsonify({"success": True, "message": "System will restart shortly"}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/upload_greeting', methods=['POST']) def upload_greeting(): """Upload a new greeting message""" if 'soundfile' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['soundfile'] if file.filename == '': return jsonify({"error": "No file selected"}), 400 if file and file.filename.lower().endswith('.wav'): filename = secure_filename(file.filename) # Ensure unique filename counter = 1 base_name = os.path.splitext(filename)[0] while os.path.exists(os.path.join(SOUNDS_DIR, filename)): filename = f"{base_name}_{counter}.wav" counter += 1 filepath = os.path.join(SOUNDS_DIR, filename) file.save(filepath) # Backup to USB drives if enabled if BACKUP_ENABLED: relative_path = os.path.join(SYS_CONFIG['paths']['sounds_dir'], filename) backup_results = backup_file_to_usb(filepath, relative_path) print(f"Greeting backup results: {backup_results}") return jsonify({"success": True, "message": f"Greeting '{filename}' uploaded successfully", "filename": filename}) return jsonify({"error": "Only WAV files are supported"}), 400 @app.route('/set_active_greeting', methods=['POST']) def set_active_greeting(): """Set which greeting to play""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Greeting file not found"}), 404 phone.set_active_greeting(filename) return jsonify({"success": True, "message": f"Active greeting set to '{filename}'"}) @app.route('/set_extra_button_sound', methods=['POST']) def set_extra_button_sound(): """Set which sound plays for extra button""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Sound file not found"}), 404 phone.set_extra_button_sound(filename) return jsonify({"success": True, "message": f"Extra button sound set to '{filename}'"}) @app.route('/set_beep_sound', methods=['POST']) def set_beep_sound(): """Set which sound plays as recording beep""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Sound file not found"}), 404 phone.set_beep_sound(filename) return jsonify({"success": True, "message": f"Beep sound set to '{filename}'"}) @app.route('/api/beep_enabled', methods=['GET', 'POST']) def beep_enabled(): """Get or set beep enabled status""" if request.method == 'POST': data = request.get_json() enabled = data.get('enabled', True) phone.set_beep_enabled(enabled) return jsonify({"success": True, "enabled": phone.is_beep_enabled()}) else: return jsonify({"enabled": phone.is_beep_enabled()}) @app.route('/delete_greeting/', methods=['POST']) def delete_greeting(filename): """Delete a greeting file""" filename = secure_filename(filename) filepath = os.path.join(SOUNDS_DIR, filename) # Don't delete the active greeting if filename == phone.config.get("active_greeting"): return jsonify({"error": "Cannot delete the active greeting. Please select a different greeting first."}), 400 if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/play_audio//') def play_audio(audio_type, filename): """Serve audio file for web playback""" filename = secure_filename(filename) if audio_type == 'recording': filepath = os.path.join(OUTPUT_DIR, filename) elif audio_type == 'greeting': filepath = os.path.join(SOUNDS_DIR, filename) else: return "Invalid audio type", 400 if os.path.exists(filepath): return send_file(filepath, mimetype='audio/wav') return "File not found", 404 @app.route('/download/') def download_recording(filename): """Download a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return "File not found", 404 @app.route('/download_all') def download_all_recordings(): """Download all recordings as a ZIP file""" if not os.path.exists(OUTPUT_DIR): return "No recordings found", 404 # Get all recording files recordings = [f for f in os.listdir(OUTPUT_DIR) if f.endswith('.wav') and os.path.isfile(os.path.join(OUTPUT_DIR, f))] if not recordings: return "No recordings found", 404 # Create ZIP file in memory memory_file = io.BytesIO() with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf: for filename in recordings: filepath = os.path.join(OUTPUT_DIR, filename) # Add file to ZIP with just the filename (no path) zf.write(filepath, arcname=filename) # Seek to beginning of file memory_file.seek(0) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_filename = f"wedding_recordings_{timestamp}.zip" return send_file( memory_file, mimetype='application/zip', as_attachment=True, download_name=zip_filename ) @app.route('/delete/', methods=['POST']) def delete_recording(filename): """Delete a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/rename/', methods=['POST']) def rename_recording(filename): """Rename a recording""" data = request.get_json() new_name = data.get('new_name', '').strip() if not new_name: return jsonify({"error": "New name is required"}), 400 # Ensure .wav extension if not new_name.endswith('.wav'): new_name += '.wav' # Sanitize filenames old_filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) new_filepath = os.path.join(OUTPUT_DIR, secure_filename(new_name)) if not os.path.exists(old_filepath): return jsonify({"error": "File not found"}), 404 if os.path.exists(new_filepath): return jsonify({"error": "A file with that name already exists"}), 409 try: os.rename(old_filepath, new_filepath) # Also backup renamed file if backup is enabled if SYS_CONFIG['backup']['enabled'] and SYS_CONFIG['backup']['backup_on_write']: backup_file_to_usb(new_filepath, 'recordings') return jsonify({"success": True, "new_name": secure_filename(new_name)}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/restore_default_sound', methods=['POST']) def restore_default_sound(): """Restore default dial tone""" if os.path.exists(DIALTONE_FILE): os.remove(DIALTONE_FILE) phone.generate_default_dialtone() phone.set_active_greeting("dialtone.wav") return jsonify({"success": True, "message": "Default dial tone restored"}) def get_greetings(): """Get list of all greeting sound files""" greetings = [] if os.path.exists(SOUNDS_DIR): for filename in sorted(os.listdir(SOUNDS_DIR)): if filename.endswith('.wav'): filepath = os.path.join(SOUNDS_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 greetings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration, "is_active": filename == phone.config.get("active_greeting", "dialtone.wav"), "is_button_sound": filename == phone.config.get("extra_button_sound", "button_sound.wav"), "is_beep_sound": filename == phone.config.get("beep_sound", "beep.wav") }) return greetings def get_recordings(sort_by='date', sort_order='desc'): """Get list of all recordings with metadata Args: sort_by: Sort field - 'date', 'name', 'duration', or 'size' sort_order: Sort order - 'asc' or 'desc' """ recordings = [] if os.path.exists(OUTPUT_DIR): for filename in os.listdir(OUTPUT_DIR): if filename.endswith('.wav'): filepath = os.path.join(OUTPUT_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 recordings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "timestamp": stat.st_mtime, # For sorting "duration": duration }) # Sort recordings based on parameters reverse = (sort_order == 'desc') if sort_by == 'name': recordings.sort(key=lambda x: x['filename'].lower(), reverse=reverse) elif sort_by == 'duration': recordings.sort(key=lambda x: x['duration'], reverse=reverse) elif sort_by == 'size': recordings.sort(key=lambda x: x['size'], reverse=reverse) else: # default to date recordings.sort(key=lambda x: x['timestamp'], reverse=reverse) return recordings def get_local_ip(): """Get local IP address""" import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except: return "127.0.0.1" def check_template_version(template_file): """Check if template needs regeneration based on version""" if not os.path.exists(template_file): return False # Template doesn't exist, needs creation try: with open(template_file, 'r') as f: content = f.read() # Look for version comment in template if f'' in content: return True # Version matches, no regeneration needed else: return False # Version mismatch or missing, needs regeneration except: return False # Error reading, regenerate to be safe def main(): """Main entry point for the wedding phone application""" # Create templates directory and HTML template script_dir = os.path.dirname(os.path.abspath(__file__)) templates_dir = os.path.join(script_dir, 'templates') os.makedirs(templates_dir, exist_ok=True) # Check if template needs regeneration template_file = os.path.join(templates_dir, 'index.html') template_up_to_date = check_template_version(template_file) if not template_up_to_date: if os.path.exists(template_file): print(f"Template version mismatch - regenerating template at {template_file}") else: print(f"Creating new template at {template_file}") with open(template_file, 'w', encoding='utf-8') as f: # Write version comment first, then the rest of the template f.write(f'\n') f.write(''' Rotary Phone Control Panel

๐Ÿ“ž Rotary Phone Control Panel

Manage your vintage phone system

Phone Status

{% if status.recording %} ๐Ÿ”ด Recording in progress... {% elif status.status == 'off_hook' %} ๐Ÿ“ž Handset off hook {% else %} โœ… Ready (handset on hook) {% endif %}
{% if status.current_recording %}

Recording to: {{ status.current_recording.split('/')[-1] }}

{% endif %} {% if extra_button_enabled %}
{% if status.extra_button_playing %} ๐Ÿ”˜ Button sound playing... {% else %} ๐Ÿ”˜ Button ready {% endif %}
{% endif %}

๐Ÿ”Š Volume & Delay Control

๐Ÿ”Š Volume Controls

๐Ÿ”‡ ๐Ÿ”Š {{ volume_greeting }}%
๐Ÿ”‡ ๐Ÿ”Š {{ volume_button }}%
๐Ÿ”‡ ๐Ÿ”Š {{ volume_beep }}%

Greeting Delay

โฑ๏ธ โฐ {{ greeting_delay }}s

Delay before greeting plays after pickup (0-10 seconds)

Recording Beep

๐Ÿ“ฃ

Audio cue to signal recording has started

โš™๏ธ System Control

Safely shutdown or restart the Raspberry Pi

โš ๏ธ System will shutdown/restart after confirmation

๐ŸŽต Sound Assignment

โญ Greeting Sound

Plays when handset is picked up

{% if extra_button_enabled %}

๐Ÿ”˜ Button Sound

Plays when extra button is pressed during recording

{% endif %}

๐Ÿ“ฃ Recording Beep

Plays after greeting to signal recording start

๐Ÿ”Š Available Sounds

Upload WAV audio files for greetings, button sounds, and beeps

{% if greetings %}

Uploaded Sounds

{% for greeting in greetings %}

{{ greeting.filename }}

๐Ÿ“… {{ greeting.date }} | โฑ๏ธ {{ "%.1f"|format(greeting.duration) }}s | ๐Ÿ’พ {{ "%.2f"|format(greeting.size_mb) }} MB
{% if greeting.is_active %}โญ Active Greeting{% endif %} {% if greeting.is_button_sound and extra_button_enabled %}{% if greeting.is_active %} | {% endif %}๐Ÿ”˜ Button Sound{% endif %} {% if greeting.is_beep_sound %}{% if greeting.is_active or greeting.is_button_sound %} | {% endif %}๐Ÿ“ฃ Beep Sound{% endif %}
{% endfor %}
{% else %}

๐ŸŽต

No sounds uploaded yet. Upload your first sound file!

{% endif %}

๐Ÿ’พ USB Backup

Automatic backup to USB drives with CRC verification

Loading backup status...

๐ŸŽ™๏ธ Recordings

{% if recordings %} {% endif %}
{% if recordings %}
๐Ÿ”„ Sort by:
{{ recordings|length }}
Total Recordings
{{ "%.1f"|format(recordings|sum(attribute='size_mb')) }} MB
Total Size
{{ "%.1f"|format(recordings|sum(attribute='duration')/60) }} min
Total Duration
{% for recording in recordings %}

{{ recording.filename }}

๐Ÿ“… {{ recording.date }} | โฑ๏ธ {{ "%.1f"|format(recording.duration) }}s | ๐Ÿ’พ {{ "%.2f"|format(recording.size_mb) }} MB
{% endfor %}
{% else %}

๐Ÿ“ญ

No recordings yet. Pick up the phone to start recording!

{% endif %}

๐ŸŽต Audio Player

''') else: print(f"Template up-to-date (v{TEMPLATE_VERSION}) at {template_file}") # Start phone handling in separate thread phone_thread = threading.Thread(target=phone.phone_loop, daemon=True) phone_thread.start() # Get and display local IP local_ip = get_local_ip() print("\n" + "="*60) print(f"๐Ÿš€ Wedding Phone System Starting...") print(f"Web Interface Available At:") print(f" http://{local_ip}:{WEB_PORT}") print(f" http://localhost:{WEB_PORT}") print("="*60 + "\n") # Start production WSGI server (waitress) print(f"Starting production server on 0.0.0.0:{WEB_PORT}") serve(app, host='0.0.0.0', port=WEB_PORT, threads=4) if __name__ == "__main__": main()