一个基于Python的语音聊天室聊天记录监听方案。这个方案包括实时语音转文字和记录保存功能。

一个基于Python的语音聊天室聊天记录监听方案。这个方案包括实时语音转文字和记录保存功能。

方案一:基于WebSocket的语音聊天室监听

import asyncio
import websockets
import json
import speech_recognition as sr
import pyaudio
import wave
import threading
from datetime import datetime
import os
import queue
from typing import Optional, Dict, List
import logging

class VoiceChatMonitor:
    """语音聊天室监听器"""
    
    def __init__(self, 
                 room_url: str,
                 save_audio: bool = False,
                 audio_save_path: str = "./recordings",
                 log_file: str = "./chat_logs.txt"):
        """
        初始化监听器
        :param room_url: 聊天室WebSocket URL
        :param save_audio: 是否保存原始音频
        :param audio_save_path: 音频保存路径
        :param log_file: 日志文件路径
        """
        self.room_url = room_url
        self.save_audio = save_audio
        self.audio_save_path = audio_save_path
        self.log_file = log_file
        self.is_listening = False
        self.audio_queue = queue.Queue()
        self.recognizer = sr.Recognizer()
        
        # 创建必要的目录
        if save_audio and not os.path.exists(audio_save_path):
            os.makedirs(audio_save_path)
            
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    async def connect_to_room(self):
        """连接到聊天室"""
        try:
            async with websockets.connect(self.room_url) as websocket:
                self.logger.info(f"成功连接到聊天室: {self.room_url}")
                self.is_listening = True
                
                # 启动音频处理线程
                processing_thread = threading.Thread(target=self.process_audio_queue)
                processing_thread.daemon = True
                processing_thread.start()
                
                # 监听消息
                while self.is_listening:
                    try:
                        message = await websocket.recv()
                        await self.handle_message(message)
                    except websockets.exceptions.ConnectionClosed:
                        self.logger.warning("连接已关闭")
                        break
                    except Exception as e:
                        self.logger.error(f"接收消息时出错: {e}")
                        
        except Exception as e:
            self.logger.error(f"连接失败: {e}")
            
    async def handle_message(self, message):
        """处理接收到的消息"""
        try:
            # 尝试解析JSON消息
            data = json.loads(message)
            
            # 根据消息类型处理
            if data.get('type') == 'audio':
                await self.handle_audio_data(data)
            elif data.get('type') == 'text':
                self.handle_text_message(data)
            elif data.get('type') == 'user_join':
                self.logger.info(f"用户加入: {data.get('username')}")
            elif data.get('type') == 'user_leave':
                self.logger.info(f"用户离开: {data.get('username')}")
                
        except json.JSONDecodeError:
            # 如果不是JSON,可能是纯文本消息
            self.handle_text_message({'content': message, 'username': 'System'})
            
    async def handle_audio_data(self, audio_data: Dict):
        """处理音频数据"""
        try:
            # 提取音频信息
            username = audio_data.get('username', 'Unknown')
            audio_content = audio_data.get('audio', '')
            timestamp = datetime.now()
            
            # 保存音频文件(如果需要)
            if self.save_audio:
                audio_file = self.save_audio_file(username, audio_content, timestamp)
                self.logger.info(f"音频已保存: {audio_file}")
            
            # 将音频加入处理队列
            self.audio_queue.put({
                'username': username,
                'audio': audio_content,
                'timestamp': timestamp
            })
            
        except Exception as e:
            self.logger.error(f"处理音频数据时出错: {e}")
            
    def handle_text_message(self, text_data: Dict):
        """处理文本消息"""
        username = text_data.get('username', 'System')
        content = text_data.get('content', '')
        timestamp = datetime.now()
        
        # 记录文本消息
        log_entry = f"[{timestamp}] {username}: {content}"
        self.logger.info(log_entry)
        
    def process_audio_queue(self):
        """处理音频队列(语音转文字)"""
        while self.is_listening:
            try:
                # 从队列获取音频数据
                audio_item = self.audio_queue.get(timeout=1)
                
                # 语音转文字
                text = self.speech_to_text(audio_item['audio'])
                
                if text:
                    # 记录转换结果
                    log_entry = f"[{audio_item['timestamp']}] {audio_item['username']} [语音]: {text}"
                    self.logger.info(log_entry)
                    
            except queue.Empty:
                continue
            except Exception as e:
                self.logger.error(f"处理音频队列时出错: {e}")
                
    def speech_to_text(self, audio_data) -> Optional[str]:
        """语音转文字"""
        try:
            # 这里假设audio_data是音频文件路径或音频数据
            # 实际使用时需要根据具体情况调整
            
            # 方法1:使用Google语音识别
            # audio_file = sr.AudioFile(audio_data)
            # with audio_file as source:
            #     audio = self.recognizer.record(source)
            # text = self.recognizer.recognize_google(audio, language='zh-CN')
            
            # 方法2:使用本地语音识别(需要安装Vosk)
            # import vosk
            # model = vosk.Model("model_path")
            # rec = vosk.KaldiRecognizer(model, 16000)
            # ...
            
            # 这里返回示例文本
            return "这是一段语音转换的文本"
            
        except sr.UnknownValueError:
            self.logger.warning("无法识别语音")
        except sr.RequestError as e:
            self.logger.error(f"语音识别服务错误: {e}")
        except Exception as e:
            self.logger.error(f"语音转文字时出错: {e}")
            
        return None
        
    def save_audio_file(self, username: str, audio_data: str, timestamp: datetime) -> str:
        """保存音频文件"""
        filename = f"{timestamp.strftime('%Y%m%d_%H%M%S')}_{username}.wav"
        filepath = os.path.join(self.audio_save_path, filename)
        
        # 这里需要根据实际的音频数据格式进行保存
        # 例如,如果audio_data是base64编码的
        # import base64
        # audio_bytes = base64.b64decode(audio_data)
        # with open(filepath, 'wb') as f:
        #     f.write(audio_bytes)
        
        return filepath
        
    def stop_listening(self):
        """停止监听"""
        self.is_listening = False
        self.logger.info("监听已停止")

# 使用示例
async def main():
    # 配置监听器
    monitor = VoiceChatMonitor(
        room_url="ws://your-chat-server.com/room/123",
        save_audio=True,
        audio_save_path="./recordings",
        log_file="./chat_history.txt"
    )
    
    try:
        # 开始监听
        await monitor.connect_to_room()
    except KeyboardInterrupt:
        monitor.stop_listening()
        
if __name__ == "__main__":
    asyncio.run(main())

方案二:基于本地音频捕获的监听

import pyaudio
import wave
import threading
import speech_recognition as sr
from datetime import datetime
import queue
import os
import numpy as np

class LocalAudioMonitor:
    """本地音频监听器(捕获麦克风输入)"""
    
    def __init__(self, 
                 log_file: str = "./chat_logs.txt",
                 save_audio: bool = False,
                 audio_save_path: str = "./recordings"):
        
        self.log_file = log_file
        self.save_audio = save_audio
        self.audio_save_path = audio_save_path
        self.is_recording = False
        self.audio_queue = queue.Queue()
        self.recognizer = sr.Recognizer()
        
        # 音频参数
        self.CHUNK = 1024
        self.FORMAT = pyaudio.paInt16
        self.CHANNELS = 1
        self.RATE = 16000
        
        # 创建保存目录
        if save_audio and not os.path.exists(audio_save_path):
            os.makedirs(audio_save_path)
            
    def start_monitoring(self):
        """开始监听"""
        self.is_recording = True
        
        # 启动音频捕获线程
        capture_thread = threading.Thread(target=self.capture_audio)
        capture_thread.daemon = True
        capture_thread.start()
        
        # 启动语音识别线程
        recognition_thread = threading.Thread(target=self.process_audio)
        recognition_thread.daemon = True
        recognition_thread.start()
        
        print("开始监听语音...")
        
    def capture_audio(self):
        """捕获音频"""
        p = pyaudio.PyAudio()
        
        try:
            stream = p.open(
                format=self.FORMAT,
                channels=self.CHANNELS,
                rate=self.RATE,
                input=True,
                frames_per_buffer=self.CHUNK
            )
            
            frames = []
            silent_chunks = 0
            recording = False
            
            while self.is_recording:
                data = stream.read(self.CHUNK, exception_on_overflow=False)
                
                # 检测音量(简单的声音活动检测)
                audio_data = np.frombuffer(data, dtype=np.int16)
                volume = np.abs(audio_data).mean()
                
                if volume > 500:  # 音量阈值
                    if not recording:
                        recording = True
                        frames = []
                        print("检测到语音开始...")
                    
                    frames.append(data)
                    silent_chunks = 0
                else:
                    if recording:
                        silent_chunks += 1
                        
                        # 如果静音超过一定时间,认为一句话结束
                        if silent_chunks > 20:  # 约0.5秒静音
                            recording = False
                            self.save_audio_chunk(frames)
                            frames = []
                            
        except Exception as e:
            print(f"音频捕获错误: {e}")
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
            
    def save_audio_chunk(self, frames):
        """保存音频片段"""
        if not frames:
            return
            
        timestamp = datetime.now()
        
        if self.save_audio:
            filename = f"audio_{timestamp.strftime('%Y%m%d_%H%M%S')}.wav"
            filepath = os.path.join(self.audio_save_path, filename)
            
            p = pyaudio.PyAudio()
            wf = wave.open(filepath, 'wb')
            wf.setnchannels(self.CHANNELS)
            wf.setsampwidth(p.get_sample_size(self.FORMAT))
            wf.setframerate(self.RATE)
            wf.writeframes(b''.join(frames))
            wf.close()
            p.terminate()
            
            print(f"音频已保存: {filepath}")
            
        # 将音频数据加入识别队列
        self.audio_queue.put({
            'frames': frames,
            'timestamp': timestamp
        })
        
    def process_audio(self):
        """处理音频(语音识别)"""
        while self.is_recording:
            try:
                audio_item = self.audio_queue.get(timeout=1)
                
                # 将音频数据转换为AudioData对象
                audio_data = sr.AudioData(
                    b''.join(audio_item['frames']),
                    self.RATE,
                    2  # sample width
                )
                
                try:
                    # 使用Google语音识别
                    text = self.recognizer.recognize_google(
                        audio_data, 
                        language='zh-CN'
                    )
                    
                    # 记录识别结果
                    log_entry = f"[{audio_item['timestamp']}] [语音识别]: {text}"
                    self.save_log(log_entry)
                    print(log_entry)
                    
                except sr.UnknownValueError:
                    pass  # 忽略无法识别的语音
                except sr.RequestError as e:
                    print(f"语音识别服务错误: {e}")
                    
            except queue.Empty:
                continue
                
    def save_log(self, log_entry: str):
        """保存日志"""
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(log_entry + '\n')
            
    def stop_monitoring(self):
        """停止监听"""
        self.is_recording = False
        print("监听已停止")

# 使用示例
def main():
    monitor = LocalAudioMonitor(
        log_file="./voice_chat_log.txt",
        save_audio=True,
        audio_save_path="./recordings"
    )
    
    try:
        monitor.start_monitoring()
        input("按回车键停止监听...\n")
    finally:
        monitor.stop_monitoring()

if __name__ == "__main__":
    main()

方案三:基于Vosk的离线语音识别

import json
import wave
import sys
import queue
import threading
import pyaudio
from vosk import Model, KaldiRecognizer
from datetime import datetime
import os

class VoskVoiceMonitor:
    """使用Vosk的离线语音监听器"""
    
    def __init__(self, 
                 model_path: str = "vosk-model-small-cn-0.22",
                 log_file: str = "./chat_logs.txt"):
        
        self.model_path = model_path
        self.log_file = log_file
        self.is_recording = False
        self.audio_queue = queue.Queue()
        
        # 检查模型是否存在
        if not os.path.exists(model_path):
            print(f"请下载Vosk模型: {model_path}")
            print("下载地址: https://alphacephei.com/vosk/models")
            sys.exit(1)
            
        # 加载模型
        self.model = Model(model_path)
        self.recognizer = KaldiRecognizer(self.model, 16000)
        
        # 音频参数
        self.CHUNK = 1024
        self.FORMAT = pyaudio.paInt16
        self.CHANNELS = 1
        self.RATE = 16000
        
    def start_monitoring(self):
        """开始监听"""
        self.is_recording = True
        
        # 启动音频捕获线程
        capture_thread = threading.Thread(target=self.capture_audio)
        capture_thread.daemon = True
        capture_thread.start()
        
        # 启动语音识别线程
        recognition_thread = threading.Thread(target=self.process_audio)
        recognition_thread.daemon = True
        recognition_thread.start()
        
        print("Vosk离线语音监听已启动...")
        
    def capture_audio(self):
        """捕获音频"""
        p = pyaudio.PyAudio()
        
        stream = p.open(
            format=self.FORMAT,
            channels=self.CHANNELS,
            rate=self.RATE,
            input=True,
            frames_per_buffer=self.CHUNK
        )
        
        while self.is_recording:
            data = stream.read(self.CHUNK, exception_on_overflow=False)
            self.audio_queue.put(data)
            
        stream.stop_stream()
        stream.close()
        p.terminate()
        
    def process_audio(self):
        """处理音频识别"""
        while self.is_recording:
            try:
                data = self.audio_queue.get(timeout=1)
                
                if self.recognizer.AcceptWaveform(data):
                    result = json.loads(self.recognizer.Result())
                    text = result.get('text', '')
                    
                    if text:
                        timestamp = datetime.now()
                        log_entry = f"[{timestamp}] {text}"
                        self.save_log(log_entry)
                        print(f"识别: {text}")
                        
            except queue.Empty:
                continue
                
    def save_log(self, log_entry: str):
        """保存日志"""
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(log_entry + '\n')
            
    def stop_monitoring(self):
        """停止监听"""
        self.is_recording = False
        print("监听已停止")

# 使用示例
def main():
    monitor = VoskVoiceMonitor(
        model_path="vosk-model-small-cn-0.22",  # 需要先下载模型
        log_file="./voice_chat_log.txt"
    )
    
    try:
        monitor.start_monitoring()
        input("按回车键停止监听...\n")
    finally:
        monitor.stop_monitoring()

if __name__ == "__main__":
    main()

安装依赖

# 基础依赖
pip install websockets pyaudio numpy

# 语音识别依赖
pip install SpeechRecognition

# 或者使用Vosk(推荐离线使用)
pip install vosk

# 可能需要安装系统依赖
# Ubuntu/Debian:
sudo apt-get install portaudio19-dev python3-pyaudio

# macOS:
brew install portaudio

# Windows:
# 通常直接安装pyaudio即可

使用建议

  1. 选择合适的方案

    • 方案一:适合监听WebSocket协议的语音聊天室
    • 方案二:适合本地音频捕获
    • 方案三:适合离线环境,无需网络
  2. 性能优化

    • 使用多线程处理音频和识别
    • 实现声音活动检测(VAD)减少处理量
    • 考虑使用GPU加速语音识别
  3. 错误处理

    • 添加重连机制
    • 记录错误日志
    • 异常情况的处理
  4. 隐私保护

    • 确保遵守相关法律法规
    • 明确告知用户正在录音
    • 安全存储录音文件

这个方案可以根据具体需求进行调整和扩展。

已有 14 条评论

    1. JamesWilson JamesWilson

      This code structure is excellent for learning. The way you've separated concerns into different classes and methods makes it super easy to customize. I've already adapted the WebSocket version for my Discord bot!

    2. Isabella Isabella

      中文识别准确率出乎意料的高,特别是Vosk那个方案。之前试过其他的离线库,对中文支持都不太好,这个真心推荐。

    3. William William

      Very comprehensive guide! I appreciate that you included installation instructions for all major OSes. The portaudio setup on Linux can be a nightmare, so those commands are a lifesaver.

    4. Olivia Olivia

      听觉上感觉方案一的实时性最好,延迟很低。我们用在远程会议记录上,基本能做到说完话5秒内就看到文字,体验很棒!

    5. David_Lee David_Lee

      The error handling and logging setup is very professional. Most tutorials just show the happy path, but you've covered connection drops and queue management. Great job!