#!/usr/bin/env python3 """ Rotary Phone Audio Handler with Web Interface Detects handset pickup, plays custom sound, records audio, and provides web UI """ import pyaudio import wave import time import RPi.GPIO as GPIO from datetime import datetime import os import numpy as np import threading from flask import Flask, render_template, request, send_file, jsonify, redirect, url_for from werkzeug.utils import secure_filename import json import sys import zlib import shutil # Load configuration def load_system_config(): """Load system configuration from config.json""" config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') # Check if config exists, otherwise use example if not os.path.exists(config_path): example_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.example.json') if os.path.exists(example_path): print(f"Config file not found. Please copy {example_path} to {config_path}") print("Command: cp config.example.json config.json") sys.exit(1) else: print("ERROR: No configuration file found!") sys.exit(1) with open(config_path, 'r') as f: return json.load(f) # Load system configuration SYS_CONFIG = load_system_config() # GPIO Configuration HOOK_PIN = SYS_CONFIG['gpio']['hook_pin'] HOOK_PRESSED = GPIO.LOW if SYS_CONFIG['gpio']['hook_pressed_state'] == 'LOW' else GPIO.HIGH # Extra Button Configuration EXTRA_BUTTON_ENABLED = SYS_CONFIG['gpio'].get('extra_button_enabled', False) EXTRA_BUTTON_PIN = SYS_CONFIG['gpio'].get('extra_button_pin', 27) EXTRA_BUTTON_PRESSED = GPIO.LOW if SYS_CONFIG['gpio'].get('extra_button_pressed_state', 'LOW') == 'LOW' else GPIO.HIGH # Audio settings AUDIO_DEVICE_INDEX = SYS_CONFIG['audio']['device_index'] CHUNK = SYS_CONFIG['audio']['chunk_size'] FORMAT = pyaudio.paInt16 # Fixed format CHANNELS = SYS_CONFIG['audio']['channels'] RATE = SYS_CONFIG['audio']['sample_rate'] RECORD_SECONDS = SYS_CONFIG['audio']['max_record_seconds'] # Directories SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.join(SCRIPT_DIR, SYS_CONFIG['paths']['base_dir']) if SYS_CONFIG['paths']['base_dir'].startswith('.') else SYS_CONFIG['paths']['base_dir'] BASE_DIR = os.path.abspath(BASE_DIR) OUTPUT_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['recordings_dir']) SOUNDS_DIR = os.path.join(BASE_DIR, SYS_CONFIG['paths']['sounds_dir']) USER_CONFIG_FILE = os.path.join(BASE_DIR, "user_config.json") # Runtime user settings DIALTONE_FILE = os.path.join(SOUNDS_DIR, "dialtone.wav") # Backup Configuration BACKUP_CONFIG = SYS_CONFIG.get('backup', {}) BACKUP_ENABLED = BACKUP_CONFIG.get('enabled', False) BACKUP_USB_PATHS = BACKUP_CONFIG.get('usb_paths', []) BACKUP_VERIFY_CRC = BACKUP_CONFIG.get('verify_crc', True) BACKUP_ON_WRITE = BACKUP_CONFIG.get('backup_on_write', True) # Web server settings WEB_PORT = SYS_CONFIG['web']['port'] # Flask app app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = SYS_CONFIG['web']['max_upload_size_mb'] * 1024 * 1024 # Backup and CRC Functions def calculate_crc32(filepath): """Calculate CRC32 checksum of a file""" try: with open(filepath, 'rb') as f: checksum = 0 while chunk := f.read(65536): # Read in 64KB chunks checksum = zlib.crc32(chunk, checksum) return checksum & 0xFFFFFFFF except Exception as e: print(f"Error calculating CRC for {filepath}: {e}") return None def backup_file_to_usb(source_file, relative_path): """ Backup a file to all configured USB drives with CRC verification Args: source_file: Full path to source file relative_path: Relative path from base dir (e.g., 'recordings/file.wav') Returns: dict: Status of each USB backup attempt """ if not BACKUP_ENABLED or not BACKUP_ON_WRITE: return {"enabled": False} if not os.path.exists(source_file): return {"error": f"Source file not found: {source_file}"} # Calculate source CRC if verification is enabled source_crc = None if BACKUP_VERIFY_CRC: source_crc = calculate_crc32(source_file) if source_crc is None: return {"error": "Failed to calculate source CRC"} results = {} for usb_path in BACKUP_USB_PATHS: usb_name = os.path.basename(usb_path) # Check if USB is mounted if not os.path.exists(usb_path) or not os.path.ismount(usb_path): results[usb_name] = {"status": "not_mounted", "path": usb_path} continue try: # Create backup directory structure backup_dir = os.path.join(usb_path, "wedding-phone-backup") dest_dir = os.path.join(backup_dir, os.path.dirname(relative_path)) os.makedirs(dest_dir, exist_ok=True) # Copy file dest_file = os.path.join(backup_dir, relative_path) shutil.copy2(source_file, dest_file) # Verify CRC if enabled if BACKUP_VERIFY_CRC: dest_crc = calculate_crc32(dest_file) if dest_crc != source_crc: results[usb_name] = { "status": "crc_mismatch", "path": dest_file, "source_crc": hex(source_crc), "dest_crc": hex(dest_crc) if dest_crc else None } # Delete corrupted backup try: os.remove(dest_file) except: pass continue results[usb_name] = { "status": "success", "path": dest_file, "crc": hex(source_crc) if source_crc else None } except Exception as e: results[usb_name] = {"status": "error", "error": str(e)} return results def get_usb_backup_status(): """Get status of all configured USB backup drives""" status = { "enabled": BACKUP_ENABLED, "verify_crc": BACKUP_VERIFY_CRC, "backup_on_write": BACKUP_ON_WRITE, "drives": [] } for usb_path in BACKUP_USB_PATHS: drive_info = { "path": usb_path, "name": os.path.basename(usb_path), "mounted": os.path.exists(usb_path) and os.path.ismount(usb_path), "writable": False, "free_space": None } if drive_info["mounted"]: try: # Check if writable test_file = os.path.join(usb_path, ".wedding_phone_test") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) drive_info["writable"] = True # Get free space stat = os.statvfs(usb_path) drive_info["free_space"] = stat.f_bavail * stat.f_frsize drive_info["free_space_mb"] = drive_info["free_space"] / (1024 * 1024) except Exception as e: drive_info["error"] = str(e) status["drives"].append(drive_info) return status class RotaryPhone: def __init__(self): self.audio = pyaudio.PyAudio() self.recording = False self.phone_status = "on_hook" self.current_recording = None # Setup GPIO GPIO.setmode(GPIO.BCM) GPIO.setup(HOOK_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) # Setup extra button if enabled if EXTRA_BUTTON_ENABLED: GPIO.setup(EXTRA_BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) self.extra_button_playing = False # Create directories os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(SOUNDS_DIR, exist_ok=True) # Initialize configuration self.config = self.load_config() # Generate default dial tone if none exists if not os.path.exists(DIALTONE_FILE): self.generate_default_dialtone() def load_config(self): """Load user runtime configuration from JSON file""" default_config = { "active_greeting": SYS_CONFIG['system']['active_greeting'], "extra_button_sound": SYS_CONFIG['system'].get('extra_button_sound', 'button_sound.wav'), "greetings": [], "volume": SYS_CONFIG['system']['volume'], "greeting_delay": SYS_CONFIG['system'].get('greeting_delay_seconds', 0) } if os.path.exists(USER_CONFIG_FILE): try: with open(USER_CONFIG_FILE, 'r') as f: config = json.load(f) # Ensure required keys exist if "volume" not in config: config["volume"] = SYS_CONFIG['system']['volume'] if "greeting_delay" not in config: config["greeting_delay"] = SYS_CONFIG['system'].get('greeting_delay_seconds', 0) return config except: pass return default_config def save_config(self): """Save user runtime configuration to JSON file""" with open(USER_CONFIG_FILE, 'w') as f: json.dump(self.config, f, indent=2) def get_active_greeting_path(self): """Get the full path to the active greeting file""" active = self.config.get("active_greeting", "dialtone.wav") return os.path.join(SOUNDS_DIR, active) def set_active_greeting(self, filename): """Set which greeting message to play""" self.config["active_greeting"] = filename self.save_config() def get_extra_button_sound_path(self): """Get the full path to the extra button sound file""" sound = self.config.get("extra_button_sound", "button_sound.wav") return os.path.join(SOUNDS_DIR, sound) def set_extra_button_sound(self, filename): """Set which sound plays for extra button""" self.config["extra_button_sound"] = filename self.save_config() def set_volume(self, volume): """Set playback volume (0-100)""" volume = max(0, min(100, int(volume))) # Clamp between 0-100 self.config["volume"] = volume self.save_config() return volume def get_volume(self): """Get current volume setting""" return self.config.get("volume", 70) def set_greeting_delay(self, delay): """Set greeting delay in seconds (0-10)""" delay = max(0, min(10, int(delay))) # Clamp between 0-10 self.config["greeting_delay"] = delay self.save_config() return delay def get_greeting_delay(self): """Get current greeting delay""" return self.config.get("greeting_delay", 0) def generate_default_dialtone(self): """Generate a classic dial tone (350Hz + 440Hz) and save as default""" print("Generating default dial tone...") duration = 3 sample_rate = 44100 t = np.linspace(0, duration, int(sample_rate * duration), False) # Generate two frequencies and combine them tone1 = np.sin(2 * np.pi * 350 * t) tone2 = np.sin(2 * np.pi * 440 * t) tone = (tone1 + tone2) / 2 # Convert to 16-bit PCM tone = (tone * 32767).astype(np.int16) # Save as WAV file wf = wave.open(DIALTONE_FILE, 'wb') wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(sample_rate) wf.writeframes(tone.tobytes()) wf.close() print(f"Default dial tone saved to {DIALTONE_FILE}") def play_sound_file(self, filepath, check_hook_status=True): """Play a WAV file with volume control Args: filepath: Path to WAV file check_hook_status: If True, only play while off-hook (for greeting). If False, play completely regardless of hook (for button) """ if not os.path.exists(filepath): print(f"Sound file not found: {filepath}") return False print(f"Playing sound: {filepath}") try: wf = wave.open(filepath, 'rb') # Use configured audio device stream = self.audio.open( format=self.audio.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) # Get volume multiplier (0.0 to 1.0) volume = self.get_volume() / 100.0 # Play the sound with volume control data = wf.readframes(CHUNK) while data: # For greeting sounds, stop immediately if handset is hung up if check_hook_status and GPIO.input(HOOK_PIN) != HOOK_PRESSED: print("Handset hung up, stopping playback immediately") break # Apply volume by converting to numpy array and scaling if volume < 1.0: audio_data = np.frombuffer(data, dtype=np.int16) audio_data = (audio_data * volume).astype(np.int16) data = audio_data.tobytes() stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() wf.close() print("Sound playback finished") return True except Exception as e: print(f"Error playing sound: {e}") return False def record_audio(self): """Record audio from the microphone""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(OUTPUT_DIR, f"recording_{timestamp}.wav") self.current_recording = filename print(f"Recording to {filename}") MIN_RECORDING_DURATION = 1.0 # Minimum 1 second to save recording try: # Use configured audio device stream = self.audio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, input_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) frames = [] self.recording = True # Record until handset is hung up or max time reached start_time = time.time() while self.recording and (time.time() - start_time) < RECORD_SECONDS: # Check if handset is still off hook - immediate detection if GPIO.input(HOOK_PIN) != HOOK_PRESSED: print("Handset hung up, stopping recording immediately") break try: data = stream.read(CHUNK, exception_on_overflow=False) frames.append(data) except Exception as e: print(f"Error reading audio: {e}") break stream.stop_stream() stream.close() # Calculate recording duration duration = len(frames) * CHUNK / RATE if frames else 0 # Only save if recording is long enough if frames and duration >= MIN_RECORDING_DURATION: wf = wave.open(filename, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(self.audio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() print(f"Recording saved: {filename} ({duration:.1f}s)") # Backup to USB drives if enabled if BACKUP_ENABLED: relative_path = os.path.join(SYS_CONFIG['paths']['recordings_dir'], os.path.basename(filename)) backup_results = backup_file_to_usb(filename, relative_path) print(f"Backup results: {backup_results}") else: # Delete aborted/too short recording if os.path.exists(filename): os.remove(filename) print(f"Recording aborted or too short ({duration:.1f}s), not saved") except Exception as e: print(f"Recording error: {e}") # Clean up failed recording file if os.path.exists(filename): try: os.remove(filename) print(f"Cleaned up failed recording: {filename}") except: pass self.recording = False self.current_recording = None def get_status(self): """Get current phone status""" return { "status": self.phone_status, "recording": self.recording, "current_recording": self.current_recording, "extra_button_playing": self.extra_button_playing if EXTRA_BUTTON_ENABLED else False } def play_extra_button_sound(self): """Play sound when extra button is pressed (only during recording)""" if not EXTRA_BUTTON_ENABLED: return # Only play if currently recording if not self.recording: print("Extra button ignored - not recording") return button_sound = self.get_extra_button_sound_path() if not os.path.exists(button_sound): print(f"Extra button sound not found: {button_sound}") return print(f"\n=== Extra button pressed ===") self.extra_button_playing = True try: # Use a separate PyAudio instance for playback during recording # This allows simultaneous input (recording) and output (button sound) audio_playback = pyaudio.PyAudio() wf = wave.open(button_sound, 'rb') stream = audio_playback.open( format=audio_playback.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=AUDIO_DEVICE_INDEX, frames_per_buffer=CHUNK ) # Get volume multiplier volume = self.get_volume() / 100.0 # Play the sound data = wf.readframes(CHUNK) while data: # Apply volume if volume < 1.0: audio_data = np.frombuffer(data, dtype=np.int16) audio_data = (audio_data * volume).astype(np.int16) data = audio_data.tobytes() stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() wf.close() audio_playback.terminate() print("Extra button sound playback finished") except Exception as e: print(f"Error playing button sound: {e}") finally: self.extra_button_playing = False def phone_loop(self): """Main phone handling loop""" print("Rotary Phone System Started") print(f"Hook pin: GPIO {HOOK_PIN}") if EXTRA_BUTTON_ENABLED: print(f"Extra button: GPIO {EXTRA_BUTTON_PIN}") print(f"Recordings will be saved to: {OUTPUT_DIR}") try: while True: # Check extra button first (higher priority) if EXTRA_BUTTON_ENABLED and GPIO.input(EXTRA_BUTTON_PIN) == EXTRA_BUTTON_PRESSED: if not self.extra_button_playing: # Play extra button sound in a separate thread to not block button_thread = threading.Thread(target=self.play_extra_button_sound, daemon=True) button_thread.start() time.sleep(0.5) # Debounce # Wait for handset pickup if GPIO.input(HOOK_PIN) == HOOK_PRESSED and self.phone_status == "on_hook": self.phone_status = "off_hook" print("\n=== Handset picked up ===") # Apply greeting delay if configured greeting_delay = self.get_greeting_delay() if greeting_delay > 0: print(f"Waiting {greeting_delay} seconds before greeting...") time.sleep(greeting_delay) # Play active greeting message greeting_file = self.get_active_greeting_path() self.play_sound_file(greeting_file) # Start recording if self.phone_status == "off_hook": # Still off hook after sound self.record_audio() self.phone_status = "on_hook" time.sleep(0.1) except KeyboardInterrupt: print("\nShutting down phone system...") finally: self.cleanup() def cleanup(self): """Clean up resources""" self.audio.terminate() GPIO.cleanup() print("Cleanup complete") # Global phone instance phone = RotaryPhone() # Flask Routes @app.route('/') def index(): """Main page""" recordings = get_recordings() greetings = get_greetings() status = phone.get_status() active_greeting = phone.config.get("active_greeting", "dialtone.wav") extra_button_sound = phone.config.get("extra_button_sound", "button_sound.wav") volume = phone.get_volume() greeting_delay = phone.get_greeting_delay() return render_template('index.html', recordings=recordings, greetings=greetings, active_greeting=active_greeting, extra_button_sound=extra_button_sound, extra_button_enabled=EXTRA_BUTTON_ENABLED, status=status, volume=volume, greeting_delay=greeting_delay) @app.route('/api/status') def api_status(): """API endpoint for phone status""" return jsonify(phone.get_status()) @app.route('/api/recordings') def api_recordings(): """API endpoint for recordings list""" return jsonify(get_recordings()) @app.route('/api/greetings') def api_greetings(): """API endpoint for greetings list""" return jsonify(get_greetings()) @app.route('/api/volume', methods=['GET']) def api_get_volume(): """Get current volume setting""" return jsonify({"volume": phone.get_volume()}) @app.route('/api/volume', methods=['POST']) def api_set_volume(): """Set volume level""" data = request.get_json() volume = data.get('volume', 70) new_volume = phone.set_volume(volume) return jsonify({"success": True, "volume": new_volume}) @app.route('/api/greeting_delay', methods=['GET']) def api_get_greeting_delay(): """Get current greeting delay setting""" return jsonify({"delay": phone.get_greeting_delay()}) @app.route('/api/greeting_delay', methods=['POST']) def api_set_greeting_delay(): """Set greeting delay in seconds""" data = request.get_json() delay = data.get('delay', 0) new_delay = phone.set_greeting_delay(delay) return jsonify({"success": True, "delay": new_delay}) @app.route('/api/backup/status', methods=['GET']) def api_backup_status(): """Get USB backup drive status""" return jsonify(get_usb_backup_status()) @app.route('/api/backup/test', methods=['POST']) def api_backup_test(): """Test backup to all USB drives""" try: # Create a test file test_file = os.path.join(OUTPUT_DIR, ".backup_test.txt") with open(test_file, 'w') as f: f.write(f"Backup test at {datetime.now()}") # Try to backup results = backup_file_to_usb(test_file, os.path.join(SYS_CONFIG['paths']['recordings_dir'], ".backup_test.txt")) # Clean up test file try: os.remove(test_file) except: pass return jsonify({"success": True, "results": results}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/upload_greeting', methods=['POST']) def upload_greeting(): """Upload a new greeting message""" if 'soundfile' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['soundfile'] if file.filename == '': return jsonify({"error": "No file selected"}), 400 if file and file.filename.lower().endswith('.wav'): filename = secure_filename(file.filename) # Ensure unique filename counter = 1 base_name = os.path.splitext(filename)[0] while os.path.exists(os.path.join(SOUNDS_DIR, filename)): filename = f"{base_name}_{counter}.wav" counter += 1 filepath = os.path.join(SOUNDS_DIR, filename) file.save(filepath) # Backup to USB drives if enabled if BACKUP_ENABLED: relative_path = os.path.join(SYS_CONFIG['paths']['sounds_dir'], filename) backup_results = backup_file_to_usb(filepath, relative_path) print(f"Greeting backup results: {backup_results}") return jsonify({"success": True, "message": f"Greeting '{filename}' uploaded successfully", "filename": filename}) return jsonify({"error": "Only WAV files are supported"}), 400 @app.route('/set_active_greeting', methods=['POST']) def set_active_greeting(): """Set which greeting to play""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Greeting file not found"}), 404 phone.set_active_greeting(filename) return jsonify({"success": True, "message": f"Active greeting set to '{filename}'"}) @app.route('/set_extra_button_sound', methods=['POST']) def set_extra_button_sound(): """Set which sound plays for extra button""" data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "No filename provided"}), 400 filepath = os.path.join(SOUNDS_DIR, filename) if not os.path.exists(filepath): return jsonify({"error": "Sound file not found"}), 404 phone.set_extra_button_sound(filename) return jsonify({"success": True, "message": f"Extra button sound set to '{filename}'"}) @app.route('/delete_greeting/', methods=['POST']) def delete_greeting(filename): """Delete a greeting file""" filename = secure_filename(filename) filepath = os.path.join(SOUNDS_DIR, filename) # Don't delete the active greeting if filename == phone.config.get("active_greeting"): return jsonify({"error": "Cannot delete the active greeting. Please select a different greeting first."}), 400 if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/play_audio//') def play_audio(audio_type, filename): """Serve audio file for web playback""" filename = secure_filename(filename) if audio_type == 'recording': filepath = os.path.join(OUTPUT_DIR, filename) elif audio_type == 'greeting': filepath = os.path.join(SOUNDS_DIR, filename) else: return "Invalid audio type", 400 if os.path.exists(filepath): return send_file(filepath, mimetype='audio/wav') return "File not found", 404 @app.route('/download/') def download_recording(filename): """Download a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return "File not found", 404 @app.route('/delete/', methods=['POST']) def delete_recording(filename): """Delete a recording""" filepath = os.path.join(OUTPUT_DIR, secure_filename(filename)) if os.path.exists(filepath): os.remove(filepath) return jsonify({"success": True}) return jsonify({"error": "File not found"}), 404 @app.route('/restore_default_sound', methods=['POST']) def restore_default_sound(): """Restore default dial tone""" if os.path.exists(DIALTONE_FILE): os.remove(DIALTONE_FILE) phone.generate_default_dialtone() phone.set_active_greeting("dialtone.wav") return jsonify({"success": True, "message": "Default dial tone restored"}) def get_greetings(): """Get list of all greeting sound files""" greetings = [] if os.path.exists(SOUNDS_DIR): for filename in sorted(os.listdir(SOUNDS_DIR)): if filename.endswith('.wav'): filepath = os.path.join(SOUNDS_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 greetings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration, "is_active": filename == phone.config.get("active_greeting", "dialtone.wav"), "is_button_sound": filename == phone.config.get("extra_button_sound", "button_sound.wav") }) return greetings def get_recordings(): """Get list of all recordings with metadata""" recordings = [] if os.path.exists(OUTPUT_DIR): for filename in sorted(os.listdir(OUTPUT_DIR), reverse=True): if filename.endswith('.wav'): filepath = os.path.join(OUTPUT_DIR, filename) stat = os.stat(filepath) # Get duration from WAV file try: wf = wave.open(filepath, 'rb') frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) wf.close() except: duration = 0 recordings.append({ "filename": filename, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "date": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "duration": duration }) return recordings def get_local_ip(): """Get local IP address""" import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except: return "127.0.0.1" def main(): """Main entry point for the wedding phone application""" # Create templates directory and HTML template script_dir = os.path.dirname(os.path.abspath(__file__)) templates_dir = os.path.join(script_dir, 'templates') os.makedirs(templates_dir, exist_ok=True) # Create the HTML template if it doesn't exist template_file = os.path.join(templates_dir, 'index.html') if not os.path.exists(template_file): print(f"Creating template at {template_file}") with open(template_file, 'w') as f: f.write(''' Rotary Phone Control Panel

๐Ÿ“ž Rotary Phone Control Panel

Manage your vintage phone system

Phone Status

{% if status.recording %} ๐Ÿ”ด Recording in progress... {% elif status.status == 'off_hook' %} ๐Ÿ“ž Handset off hook {% else %} โœ… Ready (handset on hook) {% endif %}
{% if status.current_recording %}

Recording to: {{ status.current_recording.split('/')[-1] }}

{% endif %} {% if extra_button_enabled %}
{% if status.extra_button_playing %} ๐Ÿ”˜ Button sound playing... {% else %} ๐Ÿ”˜ Button ready {% endif %}
{% endif %}

๐Ÿ”Š Volume & Delay Control

Volume

๐Ÿ”‡ ๐Ÿ”Š {{ volume }}%

Playback volume for greeting messages

Greeting Delay

โฑ๏ธ โฐ {{ greeting_delay }}s

Delay before greeting plays after pickup (0-10 seconds)

๐ŸŽต Greeting Messages

โ„น๏ธ Active greeting: {{ active_greeting }} {% if extra_button_enabled %}
๐Ÿ”˜ Extra button sound: {{ extra_button_sound }} {% endif %}

Upload WAV files to play when the handset is picked up

{% if greetings %}

Available Greetings

{% for greeting in greetings %}

{% if greeting.is_active %}โญ{% endif %} {% if greeting.is_button_sound and extra_button_enabled %}๐Ÿ”˜{% endif %} {{ greeting.filename }}

๐Ÿ“… {{ greeting.date }} | โฑ๏ธ {{ "%.1f"|format(greeting.duration) }}s | ๐Ÿ’พ {{ "%.2f"|format(greeting.size_mb) }} MB
{% if not greeting.is_active %} {% if extra_button_enabled and not greeting.is_button_sound %} {% endif %} {% else %} {% if extra_button_enabled and not greeting.is_button_sound %} {% endif %} {% endif %}
{% endfor %}
{% else %}

๐ŸŽต

No greeting messages uploaded yet. Upload your first greeting!

{% endif %}

๐Ÿ’พ USB Backup

Automatic backup to USB drives with CRC verification

Loading backup status...

๐ŸŽ™๏ธ Recordings

{% if recordings %}
{{ recordings|length }}
Total Recordings
{{ "%.1f"|format(recordings|sum(attribute='size_mb')) }} MB
Total Size
{{ "%.1f"|format(recordings|sum(attribute='duration')/60) }} min
Total Duration
{% for recording in recordings %}

{{ recording.filename }}

๐Ÿ“… {{ recording.date }} | โฑ๏ธ {{ "%.1f"|format(recording.duration) }}s | ๐Ÿ’พ {{ "%.2f"|format(recording.size_mb) }} MB
{% endfor %}
{% else %}

๐Ÿ“ญ

No recordings yet. Pick up the phone to start recording!

{% endif %}

๐ŸŽต Audio Player

''') else: print(f"Template already exists at {template_file}") # Start phone handling in separate thread phone_thread = threading.Thread(target=phone.phone_loop, daemon=True) phone_thread.start() # Get and display local IP local_ip = get_local_ip() print("\n" + "="*60) print(f"Web Interface Available At:") print(f" http://{local_ip}:{WEB_PORT}") print(f" http://localhost:{WEB_PORT}") print("="*60 + "\n") # Start Flask web server app.run(host='0.0.0.0', port=WEB_PORT, debug=False, threaded=True) if __name__ == "__main__": main()