import argparse import ssl import os import socket import threading import tempfile from pathlib import Path from flask import Flask, request, jsonify, send_file, Response from flask_sock import Sock from werkzeug.utils import secure_filename import websocket as ws_client import json import logging def format_time_srt(s): hours = int(s // 3600) minutes = int((s % 3600) // 60) seconds = int(s % 60) milliseconds = int((s - int(s)) * 1000) return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}" def format_time_vtt(s): hours = int(s // 3600) minutes = int((s % 3600) // 60) seconds = int(s % 60) milliseconds = int((s - int(s)) * 1000) return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}" def generate_srt(segments): output = "" for i, segment in enumerate(segments, start=1): start_time = format_time_srt(float(segment['start'])) end_time = format_time_srt(float(segment['end'])) text = segment['text'].strip() output += f"{i}\n{start_time} --> {end_time}\n{text}\n\n" return output def generate_vtt(segments): output = "WEBVTT\n\n" for segment in segments: start_time = format_time_vtt(float(segment['start'])) end_time = format_time_vtt(float(segment['end'])) text = segment['text'].strip() output += f"{start_time} --> {end_time}\n{text}\n\n" return output # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def check_port_availability(port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('0.0.0.0', port)) sock.close() return result != 0 class HybridWhisperServer: def __init__(self, websocket_port, http_port, backend="faster_whisper", faster_whisper_custom_model_path=None, whisper_tensorrt_path=None, trt_multilingual=False, single_model=True, ssl_context=None): self.websocket_port = websocket_port self.http_port = http_port self.backend = backend self.faster_whisper_custom_model_path = faster_whisper_custom_model_path self.whisper_tensorrt_path = whisper_tensorrt_path self.trt_multilingual = trt_multilingual self.single_model = single_model self.ssl_context = ssl_context # Initialize Flask app self.app = Flask(__name__) self.app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size self.sock = Sock(self.app) self.setup_routes() # Initialize WhisperLive server from whisper_live.server import TranscriptionServer self.whisper_server = TranscriptionServer() # Create a shared transcriber instance for HTTP requests self.shared_transcriber = None if self.backend == "faster_whisper": from whisper_live.transcriber import WhisperModel # Use base model as default for HTTP requests model_size = "base" if self.faster_whisper_custom_model_path: model_size = self.faster_whisper_custom_model_path self.shared_transcriber = WhisperModel(model_size) def setup_routes(self): @self.app.route('/health', methods=['GET']) def health_check(): # Get GPU memory from nvidia-smi (GPU 1) import subprocess try: gpu_mem = float(subprocess.check_output( 'nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits -i 1', shell=True ).decode().strip()) / 1024.0 except Exception: gpu_mem = 0.0 # Get active WS connections active = len(self.whisper_server.clients) if hasattr(self.whisper_server, 'clients') else 0 return jsonify({ 'status': 'healthy', 'service': 'WhisperLive Hybrid Server', 'model_loaded': self.shared_transcriber is not None, 'gpu_memory_used_gb': round(gpu_mem, 1), 'active_connections': active }) @self.app.route('/', methods=['GET']) def serve_test_form(): """Serve the HTML test form""" html_content = """ WhisperLive Dashboard

WhisperLive

High-Performance Real-Time Audio Transcription

Connection Settings

HTTP Status: Checking...
Click Start Recording to begin live transcription...

OpenAI Compatible API

WhisperLive acts as a drop-in replacement for OpenAI's Whisper API. You can use any standard OpenAI client by changing the base URL.

Python (openai package)

from openai import OpenAI

client = OpenAI(
    api_key="sk-no-key-required",
    base_url="https://whisperlive.classroomcopilot.ai/v1/"
)

with open("audio.wav", "rb") as file:
    transcription = client.audio.transcriptions.create(
        file=file,
        model="base",
        response_format="verbose_json"
    )
    
print(transcription.text)

cURL

curl https://whisperlive.classroomcopilot.ai/v1/audio/transcriptions \
  -H "Content-Type: multipart/form-data" \
  -F file="@audio.wav" \
  -F model="base" \
  -F response_format="verbose_json"
""" return html_content, 200, {'Content-Type': 'text/html'} @self.app.route('/transcribe', methods=['POST']) def transcribe_file(): try: if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Get optional parameters language = request.form.get('language', None) task = request.form.get('task', 'transcribe') # 'transcribe' or 'translate' model_size = request.form.get('model', 'base') use_vad = request.args.get('use_vad', request.form.get('use_vad', 'true')).lower() == 'true' # For now, we'll use the shared transcriber regardless of the requested model size # In the future, we could create different transcriber instances for different models # Validate file type allowed_extensions = {'wav', 'mp3', 'flac', 'm4a', 'ogg', 'webm', 'opus', 'oga'} if not file.filename.lower().endswith(tuple('.' + ext for ext in allowed_extensions)): return jsonify({'error': f'Unsupported file type. Allowed: {", ".join(allowed_extensions)}'}), 400 # Save file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as temp_file: file.save(temp_file.name) temp_path = temp_file.name try: # Transcribe the file using WhisperLive if self.backend == "faster_whisper": # Use the shared transcriber instance if self.shared_transcriber is None: return jsonify({'error': 'Transcriber not initialized'}), 500 segments, info = self.shared_transcriber.transcribe( temp_path, language=language, task=task, vad_filter=use_vad ) else: # For other backends, use the server's transcriber # This would need to be adapted based on your specific backend setup return jsonify({'error': 'Backend not yet supported for file transcription'}), 501 # Convert segments to serializable format transcript_segments = [] for segment in segments: transcript_segments.append({ 'start': segment.start, 'end': segment.end, 'text': segment.text, 'no_speech_prob': segment.no_speech_prob }) # Get transcription info transcription_info = { 'language': info.language, 'language_probability': info.language_probability, 'duration': info.duration, 'duration_after_vad': info.duration_after_vad, 'transcription_options': info.transcription_options } return jsonify({ 'success': True, 'segments': transcript_segments, 'info': transcription_info, 'filename': file.filename }) finally: # Clean up temporary file if os.path.exists(temp_path): os.unlink(temp_path) except Exception as e: logger.error(f"Error transcribing file: {str(e)}") return jsonify({'error': f'Transcription failed: {str(e)}'}), 500 @self.app.route('/transcribe/url', methods=['POST']) def transcribe_url(): try: data = request.get_json() if not data or 'url' not in data: return jsonify({'error': 'No URL provided'}), 400 url = data['url'] language = data.get('language', None) task = data.get('task', 'transcribe') model_size = data.get('model', 'base') # Validate URL if not url.startswith(('http://', 'https://', 'rtsp://', 'hls://')): return jsonify({'error': 'Invalid URL format'}), 400 # For now, we'll return a message that this endpoint is available # but the actual implementation would depend on your specific needs return jsonify({ 'message': 'URL transcription endpoint available', 'url': url, 'note': 'This endpoint is ready for implementation based on your specific requirements' }) except Exception as e: logger.error(f"Error processing URL transcription request: {str(e)}") return jsonify({'error': f'URL transcription failed: {str(e)}'}), 500 def handle_openai_audio_request(task_type): try: if 'file' not in request.files: return jsonify({'error': {'message': 'No file provided', 'type': 'invalid_request_error', 'code': 'invalid_parameters'}}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': {'message': 'No file selected', 'type': 'invalid_request_error', 'code': 'invalid_parameters'}}), 400 # Get OpenAI specific parameters language = request.form.get('language', None) model_size = request.form.get('model', 'base') prompt = request.form.get('prompt', None) response_format = request.form.get('response_format', 'json') temperature = request.form.get('temperature', 0) try: temperature = float(temperature) except ValueError: temperature = 0.0 allowed_extensions = {'wav', 'mp3', 'flac', 'm4a', 'ogg', 'webm', 'mp4', 'mpeg', 'mpga', 'opus', 'oga'} if not file.filename.lower().endswith(tuple('.' + ext for ext in allowed_extensions)): return jsonify({'error': {'message': 'Unsupported file type.', 'type': 'invalid_request_error', 'code': 'invalid_file_format'}}), 400 with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as temp_file: file.save(temp_file.name) temp_path = temp_file.name try: if self.backend == "faster_whisper": if self.shared_transcriber is None: return jsonify({'error': {'message': 'Transcriber not initialized', 'type': 'internal_server_error'}}), 500 kwargs = { "language": language, "task": task_type, "temperature": temperature } if prompt: kwargs["initial_prompt"] = prompt segments, info = self.shared_transcriber.transcribe(temp_path, **kwargs) else: return jsonify({'error': {'message': 'Backend not yet supported for file transcription', 'type': 'internal_server_error'}}), 501 transcript_segments = [] full_text = "" for segment in segments: text = segment.text full_text += text transcript_segments.append({ 'id': segment.id, 'seek': segment.seek, 'start': segment.start, 'end': segment.end, 'text': text, 'tokens': segment.tokens, 'temperature': segment.temperature, 'avg_logprob': segment.avg_logprob, 'compression_ratio': segment.compression_ratio, 'no_speech_prob': segment.no_speech_prob }) full_text = full_text.strip() if response_format == 'json': return jsonify({'text': full_text}) elif response_format == 'text': return Response(full_text, mimetype='text/plain') elif response_format == 'srt': return Response(generate_srt(transcript_segments), mimetype='text/plain') elif response_format == 'vtt': return Response(generate_vtt(transcript_segments), mimetype='text/plain') elif response_format == 'verbose_json': return jsonify({ 'task': task_type, 'language': info.language, 'duration': info.duration, 'text': full_text, 'segments': transcript_segments }) else: return jsonify({'text': full_text}) finally: if os.path.exists(temp_path): os.unlink(temp_path) except Exception as e: logger.error(f"Error processing OpenAI audio request: {str(e)}") return jsonify({'error': {'message': f'Transcription failed: {str(e)}', 'type': 'internal_server_error'}}), 500 @self.app.route('/v1/audio/transcriptions', methods=['POST']) def openai_transcriptions(): return handle_openai_audio_request('transcribe') @self.app.route('/v1/audio/translations', methods=['POST']) def openai_translations(): return handle_openai_audio_request('translate') @self.app.route('/v1/models', methods=['GET']) def list_models(): # Standard Whisper models supported by faster-whisper model_names = [ "whisper-1", "tiny", "tiny.en", "base", "base.en", "small", "small.en", "medium", "medium.en", "large", "large-v1", "large-v2", "large-v3" ] models = [] for name in model_names: models.append({ "id": name, "object": "model", "created": 1677532384, "owned_by": "openai" if name == "whisper-1" else "local", "permission": [], "root": name, "parent": None }) return jsonify({ "object": "list", "data": models }) # ===== WebSocket Bridge ===== # Bridges browser WebSocket connections on the HTTP port (8080) # to the internal WhisperLive WebSocket server (port 5000). # This allows live transcription through a single HTTPS port via NPM. @self.sock.route('/ws') def ws_bridge(ws): """Bridge WebSocket from HTTP port to internal WhisperLive WS server""" internal_url = f"ws://127.0.0.1:{self.websocket_port}" logger.info(f"WebSocket bridge: new connection, proxying to {internal_url}") internal = None try: internal = ws_client.create_connection(internal_url) # Thread: internal server → browser def server_to_browser(): try: while True: opcode, data = internal.recv_data() if opcode == ws_client.ABNF.OPCODE_TEXT: ws.send(data.decode('utf-8')) elif opcode == ws_client.ABNF.OPCODE_BINARY: ws.send(data) elif opcode in (ws_client.ABNF.OPCODE_CLOSE, ): break except Exception: pass relay_thread = threading.Thread(target=server_to_browser, daemon=True) relay_thread.start() # Main thread: browser → internal server while True: data = ws.receive() if data is None: break if isinstance(data, bytes): internal.send_binary(data) else: if data == "END_OF_AUDIO": internal.send_binary(b"END_OF_AUDIO") else: # Pass through session_metadata if present in config message try: msg = json.loads(data) session_metadata = msg.get('session_metadata') if session_metadata: logger.info(f"Session metadata received: session_id={session_metadata.get('session_id')}, teacher_id={session_metadata.get('teacher_id')}") except (json.JSONDecodeError, AttributeError): pass internal.send(data) except Exception as e: logger.error(f"WebSocket bridge error: {e}") finally: if internal: try: internal.close() except Exception: pass logger.info("WebSocket bridge: connection closed") def run_websocket_server(self): """Run the WebSocket server in a separate thread""" logger.info(f"Starting WebSocket server on port {self.websocket_port}") self.whisper_server.run( "0.0.0.0", port=self.websocket_port, backend=self.backend, faster_whisper_custom_model_path=self.faster_whisper_custom_model_path, whisper_tensorrt_path=self.whisper_tensorrt_path, trt_multilingual=self.trt_multilingual, single_model=self.single_model, ssl_context=self.ssl_context ) def run_http_server(self): """Run the HTTP server""" logger.info(f"Starting HTTP server on port {self.http_port}") self.app.run(host='0.0.0.0', port=self.http_port, debug=False, threaded=True) def start(self): """Start both servers""" # Start WebSocket server in a separate thread websocket_thread = threading.Thread(target=self.run_websocket_server, daemon=True) websocket_thread.start() # Start HTTP server in main thread self.run_http_server() if __name__ == "__main__": parser = argparse.ArgumentParser(description='WhisperLive Hybrid Server (WebSocket + HTTP)') parser.add_argument('--websocket-port', '-wp', type=int, default=int(os.getenv('PORT_WHISPERLIVE', 9090)), help="WebSocket port to run the server on.") parser.add_argument('--http-port', '-hp', type=int, default=int(os.getenv('HTTP_PORT', 8080)), help="HTTP port to run the server on.") parser.add_argument('--backend', '-b', type=str, default='faster_whisper', help='Backends from ["tensorrt", "faster_whisper"]') parser.add_argument('--faster_whisper_custom_model_path', '-fw', type=str, default=None, help="Custom Faster Whisper Model") parser.add_argument('--trt_model_path', '-trt', type=str, default=None, help='Whisper TensorRT model path') parser.add_argument('--trt_multilingual', '-m', action="store_true", help='Boolean only for TensorRT model. True if multilingual.') parser.add_argument('--ssl_cert_path', '-ssl', type=str, default=None, help='Path to cert.pem and key.pem if ssl should be used.') parser.add_argument('--omp_num_threads', '-omp', type=int, default=1, help="Number of threads to use for OpenMP") parser.add_argument('--no_single_model', '-nsm', action='store_true', help='Set this if every connection should instantiate its own model. Only relevant for custom model, passed using -trt or -fw.') args = parser.parse_args() if args.backend == "tensorrt": if args.trt_model_path is None: raise ValueError("Please Provide a valid tensorrt model path") websocket_port = args.websocket_port http_port = args.http_port if not check_port_availability(websocket_port): print(f"Warning: WebSocket port {websocket_port} might already be in use!") if not check_port_availability(http_port): print(f"Warning: HTTP port {http_port} might already be in use!") ssl_context = None if args.ssl_cert_path is not None: try: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain( certfile=f"{args.ssl_cert_path}/cert.pem", keyfile=f"{args.ssl_cert_path}/privkey.pem" ) print("SSL context created successfully") except Exception as e: print(f"Failed to load SSL certificates: {str(e)}") raise if "OMP_NUM_THREADS" not in os.environ: print(f"Setting OMP_NUM_THREADS to {args.omp_num_threads}") os.environ["OMP_NUM_THREADS"] = str(args.omp_num_threads) print(f"Running hybrid server with args: {args}") server = HybridWhisperServer( websocket_port=websocket_port, http_port=http_port, backend=args.backend, faster_whisper_custom_model_path=args.faster_whisper_custom_model_path, whisper_tensorrt_path=args.trt_model_path, trt_multilingual=args.trt_multilingual, single_model=not args.no_single_model, ssl_context=ssl_context ) print(f"Starting hybrid server with WebSocket on port {websocket_port} and HTTP on port {http_port}") print(f"Backend: {args.backend}, SSL: {args.ssl_cert_path is not None}") server.start()