一个基于Python的语音聊天室聊天记录监听方案。这个方案包括实时语音转文字和记录保存功能。

一个基于Python的语音聊天室聊天记录监听方案。这个方案包括实时语音转文字和记录保存功能。

方案一:基于WebSocket的语音聊天室监听

import asyncio
import websockets
import json
import speech_recognition as sr
import pyaudio
import wave
import threading
from datetime import datetime
import os
import queue
from typing import Optional, Dict, List
import logging

class VoiceChatMonitor:
    """语音聊天室监听器"""
    
    def __init__(self, 
                 room_url: str,
                 save_audio: bool = False,
                 audio_save_path: str = "./recordings",
                 log_file: str = "./chat_logs.txt"):
        """
        初始化监听器
        :param room_url: 聊天室WebSocket URL
        :param save_audio: 是否保存原始音频
        :param audio_save_path: 音频保存路径
        :param log_file: 日志文件路径
        """
        self.room_url = room_url
        self.save_audio = save_audio
        self.audio_save_path = audio_save_path
        self.log_file = log_file
        self.is_listening = False
        self.audio_queue = queue.Queue()
        self.recognizer = sr.Recognizer()
        
        # 创建必要的目录
        if save_audio and not os.path.exists(audio_save_path):
            os.makedirs(audio_save_path)
            
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    async def connect_to_room(self):
        """连接到聊天室"""
        try:
            async with websockets.connect(self.room_url) as websocket:
                self.logger.info(f"成功连接到聊天室: {self.room_url}")
                self.is_listening = True
                
                # 启动音频处理线程
                processing_thread = threading.Thread(target=self.process_audio_queue)
                processing_thread.daemon = True
                processing_thread.start()
                
                # 监听消息
                while self.is_listening:
                    try:
                        message = await websocket.recv()
                        await self.handle_message(message)
                    except websockets.exceptions.ConnectionClosed:
                        self.logger.warning("连接已关闭")
                        break
                    except Exception as e:
                        self.logger.error(f"接收消息时出错: {e}")
                        
        except Exception as e:
            self.logger.error(f"连接失败: {e}")
            
    async def handle_message(self, message):
        """处理接收到的消息"""
        try:
            # 尝试解析JSON消息
            data = json.loads(message)
            
            # 根据消息类型处理
            if data.get('type') == 'audio':
                await self.handle_audio_data(data)
            elif data.get('type') == 'text':
                self.handle_text_message(data)
            elif data.get('type') == 'user_join':
                self.logger.info(f"用户加入: {data.get('username')}")
            elif data.get('type') == 'user_leave':
                self.logger.info(f"用户离开: {data.get('username')}")
                
        except json.JSONDecodeError:
            # 如果不是JSON,可能是纯文本消息
            self.handle_text_message({'content': message, 'username': 'System'})
            
    async def handle_audio_data(self, audio_data: Dict):
        """处理音频数据"""
        try:
            # 提取音频信息
            username = audio_data.get('username', 'Unknown')
            audio_content = audio_data.get('audio', '')
            timestamp = datetime.now()
            
            # 保存音频文件(如果需要)
            if self.save_audio:
                audio_file = self.save_audio_file(username, audio_content, timestamp)
                self.logger.info(f"音频已保存: {audio_file}")
            
            # 将音频加入处理队列
            self.audio_queue.put({
                'username': username,
                'audio': audio_content,
                'timestamp': timestamp
            })
            
        except Exception as e:
            self.logger.error(f"处理音频数据时出错: {e}")
            
    def handle_text_message(self, text_data: Dict):
        """处理文本消息"""
        username = text_data.get('username', 'System')
        content = text_data.get('content', '')
        timestamp = datetime.now()
        
        # 记录文本消息
        log_entry = f"[{timestamp}] {username}: {content}"
        self.logger.info(log_entry)
        
    def process_audio_queue(self):
        """处理音频队列(语音转文字)"""
        while self.is_listening:
            try:
                # 从队列获取音频数据
                audio_item = self.audio_queue.get(timeout=1)
                
                # 语音转文字
                text = self.speech_to_text(audio_item['audio'])
                
                if text:
                    # 记录转换结果
                    log_entry = f"[{audio_item['timestamp']}] {audio_item['username']} [语音]: {text}"
                    self.logger.info(log_entry)
                    
            except queue.Empty:
                continue
            except Exception as e:
                self.logger.error(f"处理音频队列时出错: {e}")
                
    def speech_to_text(self, audio_data) -> Optional[str]:
        """语音转文字"""
        try:
            # 这里假设audio_data是音频文件路径或音频数据
            # 实际使用时需要根据具体情况调整
            
            # 方法1:使用Google语音识别
            # audio_file = sr.AudioFile(audio_data)
            # with audio_file as source:
            #     audio = self.recognizer.record(source)
            # text = self.recognizer.recognize_google(audio, language='zh-CN')
            
            # 方法2:使用本地语音识别(需要安装Vosk)
            # import vosk
            # model = vosk.Model("model_path")
            # rec = vosk.KaldiRecognizer(model, 16000)
            # ...
            
            # 这里返回示例文本
            return "这是一段语音转换的文本"
            
        except sr.UnknownValueError:
            self.logger.warning("无法识别语音")
        except sr.RequestError as e:
            self.logger.error(f"语音识别服务错误: {e}")
        except Exception as e:
            self.logger.error(f"语音转文字时出错: {e}")
            
        return None
        
    def save_audio_file(self, username: str, audio_data: str, timestamp: datetime) -> str:
        """保存音频文件"""
        filename = f"{timestamp.strftime('%Y%m%d_%H%M%S')}_{username}.wav"
        filepath = os.path.join(self.audio_save_path, filename)
        
        # 这里需要根据实际的音频数据格式进行保存
        # 例如,如果audio_data是base64编码的
        # import base64
        # audio_bytes = base64.b64decode(audio_data)
        # with open(filepath, 'wb') as f:
        #     f.write(audio_bytes)
        
        return filepath
        
    def stop_listening(self):
        """停止监听"""
        self.is_listening = False
        self.logger.info("监听已停止")

# 使用示例
async def main():
    # 配置监听器
    monitor = VoiceChatMonitor(
        room_url="ws://your-chat-server.com/room/123",
        save_audio=True,
        audio_save_path="./recordings",
        log_file="./chat_history.txt"
    )
    
    try:
        # 开始监听
        await monitor.connect_to_room()
    except KeyboardInterrupt:
        monitor.stop_listening()
        
if __name__ == "__main__":
    asyncio.run(main())

方案二:基于本地音频捕获的监听

import pyaudio
import wave
import threading
import speech_recognition as sr
from datetime import datetime
import queue
import os
import numpy as np

class LocalAudioMonitor:
    """本地音频监听器(捕获麦克风输入)"""
    
    def __init__(self, 
                 log_file: str = "./chat_logs.txt",
                 save_audio: bool = False,
                 audio_save_path: str = "./recordings"):
        
        self.log_file = log_file
        self.save_audio = save_audio
        self.audio_save_path = audio_save_path
        self.is_recording = False
        self.audio_queue = queue.Queue()
        self.recognizer = sr.Recognizer()
        
        # 音频参数
        self.CHUNK = 1024
        self.FORMAT = pyaudio.paInt16
        self.CHANNELS = 1
        self.RATE = 16000
        
        # 创建保存目录
        if save_audio and not os.path.exists(audio_save_path):
            os.makedirs(audio_save_path)
            
    def start_monitoring(self):
        """开始监听"""
        self.is_recording = True
        
        # 启动音频捕获线程
        capture_thread = threading.Thread(target=self.capture_audio)
        capture_thread.daemon = True
        capture_thread.start()
        
        # 启动语音识别线程
        recognition_thread = threading.Thread(target=self.process_audio)
        recognition_thread.daemon = True
        recognition_thread.start()
        
        print("开始监听语音...")
        
    def capture_audio(self):
        """捕获音频"""
        p = pyaudio.PyAudio()
        
        try:
            stream = p.open(
                format=self.FORMAT,
                channels=self.CHANNELS,
                rate=self.RATE,
                input=True,
                frames_per_buffer=self.CHUNK
            )
            
            frames = []
            silent_chunks = 0
            recording = False
            
            while self.is_recording:
                data = stream.read(self.CHUNK, exception_on_overflow=False)
                
                # 检测音量(简单的声音活动检测)
                audio_data = np.frombuffer(data, dtype=np.int16)
                volume = np.abs(audio_data).mean()
                
                if volume > 500:  # 音量阈值
                    if not recording:
                        recording = True
                        frames = []
                        print("检测到语音开始...")
                    
                    frames.append(data)
                    silent_chunks = 0
                else:
                    if recording:
                        silent_chunks += 1
                        
                        # 如果静音超过一定时间,认为一句话结束
                        if silent_chunks > 20:  # 约0.5秒静音
                            recording = False
                            self.save_audio_chunk(frames)
                            frames = []
                            
        except Exception as e:
            print(f"音频捕获错误: {e}")
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
            
    def save_audio_chunk(self, frames):
        """保存音频片段"""
        if not frames:
            return
            
        timestamp = datetime.now()
        
        if self.save_audio:
            filename = f"audio_{timestamp.strftime('%Y%m%d_%H%M%S')}.wav"
            filepath = os.path.join(self.audio_save_path, filename)
            
            p = pyaudio.PyAudio()
            wf = wave.open(filepath, 'wb')
            wf.setnchannels(self.CHANNELS)
            wf.setsampwidth(p.get_sample_size(self.FORMAT))
            wf.setframerate(self.RATE)
            wf.writeframes(b''.join(frames))
            wf.close()
            p.terminate()
            
            print(f"音频已保存: {filepath}")
            
        # 将音频数据加入识别队列
        self.audio_queue.put({
            'frames': frames,
            'timestamp': timestamp
        })
        
    def process_audio(self):
        """处理音频(语音识别)"""
        while self.is_recording:
            try:
                audio_item = self.audio_queue.get(timeout=1)
                
                # 将音频数据转换为AudioData对象
                audio_data = sr.AudioData(
                    b''.join(audio_item['frames']),
                    self.RATE,
                    2  # sample width
                )
                
                try:
                    # 使用Google语音识别
                    text = self.recognizer.recognize_google(
                        audio_data, 
                        language='zh-CN'
                    )
                    
                    # 记录识别结果
                    log_entry = f"[{audio_item['timestamp']}] [语音识别]: {text}"
                    self.save_log(log_entry)
                    print(log_entry)
                    
                except sr.UnknownValueError:
                    pass  # 忽略无法识别的语音
                except sr.RequestError as e:
                    print(f"语音识别服务错误: {e}")
                    
            except queue.Empty:
                continue
                
    def save_log(self, log_entry: str):
        """保存日志"""
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(log_entry + '\n')
            
    def stop_monitoring(self):
        """停止监听"""
        self.is_recording = False
        print("监听已停止")

# 使用示例
def main():
    monitor = LocalAudioMonitor(
        log_file="./voice_chat_log.txt",
        save_audio=True,
        audio_save_path="./recordings"
    )
    
    try:
        monitor.start_monitoring()
        input("按回车键停止监听...\n")
    finally:
        monitor.stop_monitoring()

if __name__ == "__main__":
    main()

方案三:基于Vosk的离线语音识别

import json
import wave
import sys
import queue
import threading
import pyaudio
from vosk import Model, KaldiRecognizer
from datetime import datetime
import os

class VoskVoiceMonitor:
    """使用Vosk的离线语音监听器"""
    
    def __init__(self, 
                 model_path: str = "vosk-model-small-cn-0.22",
                 log_file: str = "./chat_logs.txt"):
        
        self.model_path = model_path
        self.log_file = log_file
        self.is_recording = False
        self.audio_queue = queue.Queue()
        
        # 检查模型是否存在
        if not os.path.exists(model_path):
            print(f"请下载Vosk模型: {model_path}")
            print("下载地址: https://alphacephei.com/vosk/models")
            sys.exit(1)
            
        # 加载模型
        self.model = Model(model_path)
        self.recognizer = KaldiRecognizer(self.model, 16000)
        
        # 音频参数
        self.CHUNK = 1024
        self.FORMAT = pyaudio.paInt16
        self.CHANNELS = 1
        self.RATE = 16000
        
    def start_monitoring(self):
        """开始监听"""
        self.is_recording = True
        
        # 启动音频捕获线程
        capture_thread = threading.Thread(target=self.capture_audio)
        capture_thread.daemon = True
        capture_thread.start()
        
        # 启动语音识别线程
        recognition_thread = threading.Thread(target=self.process_audio)
        recognition_thread.daemon = True
        recognition_thread.start()
        
        print("Vosk离线语音监听已启动...")
        
    def capture_audio(self):
        """捕获音频"""
        p = pyaudio.PyAudio()
        
        stream = p.open(
            format=self.FORMAT,
            channels=self.CHANNELS,
            rate=self.RATE,
            input=True,
            frames_per_buffer=self.CHUNK
        )
        
        while self.is_recording:
            data = stream.read(self.CHUNK, exception_on_overflow=False)
            self.audio_queue.put(data)
            
        stream.stop_stream()
        stream.close()
        p.terminate()
        
    def process_audio(self):
        """处理音频识别"""
        while self.is_recording:
            try:
                data = self.audio_queue.get(timeout=1)
                
                if self.recognizer.AcceptWaveform(data):
                    result = json.loads(self.recognizer.Result())
                    text = result.get('text', '')
                    
                    if text:
                        timestamp = datetime.now()
                        log_entry = f"[{timestamp}] {text}"
                        self.save_log(log_entry)
                        print(f"识别: {text}")
                        
            except queue.Empty:
                continue
                
    def save_log(self, log_entry: str):
        """保存日志"""
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(log_entry + '\n')
            
    def stop_monitoring(self):
        """停止监听"""
        self.is_recording = False
        print("监听已停止")

# 使用示例
def main():
    monitor = VoskVoiceMonitor(
        model_path="vosk-model-small-cn-0.22",  # 需要先下载模型
        log_file="./voice_chat_log.txt"
    )
    
    try:
        monitor.start_monitoring()
        input("按回车键停止监听...\n")
    finally:
        monitor.stop_monitoring()

if __name__ == "__main__":
    main()

安装依赖

# 基础依赖
pip install websockets pyaudio numpy

# 语音识别依赖
pip install SpeechRecognition

# 或者使用Vosk(推荐离线使用)
pip install vosk

# 可能需要安装系统依赖
# Ubuntu/Debian:
sudo apt-get install portaudio19-dev python3-pyaudio

# macOS:
brew install portaudio

# Windows:
# 通常直接安装pyaudio即可

使用建议

  1. 选择合适的方案

    • 方案一:适合监听WebSocket协议的语音聊天室
    • 方案二:适合本地音频捕获
    • 方案三:适合离线环境,无需网络
  2. 性能优化

    • 使用多线程处理音频和识别
    • 实现声音活动检测(VAD)减少处理量
    • 考虑使用GPU加速语音识别
  3. 错误处理

    • 添加重连机制
    • 记录错误日志
    • 异常情况的处理
  4. 隐私保护

    • 确保遵守相关法律法规
    • 明确告知用户正在录音
    • 安全存储录音文件

这个方案可以根据具体需求进行调整和扩展。

已有 14 条评论

    1. EmmaW EmmaW

      试了一下方案二,录音保存功能很稳定。不过那个音量阈值500对于不同麦克风可能需要调整,建议可以加一个自动校准的功能就更完美了。

    2. Mike_Jazz Mike_Jazz

      As a podcaster, I'm thinking about using this to automatically transcribe my live shows. The audio capture logic with the silence detection is really clever. Saved me hours of work!

    3. Sophia Sophia

      方案三的Vosk离线识别太实用了,我们公司内部网络不能连外网,这个正好解决了大问题。代码几乎可以直接拿来用,点赞!

    4. AlexChen AlexChen

      This is exactly what I've been looking for! I've been trying to build a voice logger for my online gaming community to catch toxic players, and the WebSocket example is pure gold. Thanks for the detailed breakdown!