一个基于Python的语音聊天室聊天记录监听方案。这个方案包括实时语音转文字和记录保存功能。
方案一:基于WebSocket的语音聊天室监听
import asyncio
import websockets
import json
import speech_recognition as sr
import pyaudio
import wave
import threading
from datetime import datetime
import os
import queue
from typing import Optional, Dict, List
import logging
class VoiceChatMonitor:
"""语音聊天室监听器"""
def __init__(self,
room_url: str,
save_audio: bool = False,
audio_save_path: str = "./recordings",
log_file: str = "./chat_logs.txt"):
"""
初始化监听器
:param room_url: 聊天室WebSocket URL
:param save_audio: 是否保存原始音频
:param audio_save_path: 音频保存路径
:param log_file: 日志文件路径
"""
self.room_url = room_url
self.save_audio = save_audio
self.audio_save_path = audio_save_path
self.log_file = log_file
self.is_listening = False
self.audio_queue = queue.Queue()
self.recognizer = sr.Recognizer()
# 创建必要的目录
if save_audio and not os.path.exists(audio_save_path):
os.makedirs(audio_save_path)
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
async def connect_to_room(self):
"""连接到聊天室"""
try:
async with websockets.connect(self.room_url) as websocket:
self.logger.info(f"成功连接到聊天室: {self.room_url}")
self.is_listening = True
# 启动音频处理线程
processing_thread = threading.Thread(target=self.process_audio_queue)
processing_thread.daemon = True
processing_thread.start()
# 监听消息
while self.is_listening:
try:
message = await websocket.recv()
await self.handle_message(message)
except websockets.exceptions.ConnectionClosed:
self.logger.warning("连接已关闭")
break
except Exception as e:
self.logger.error(f"接收消息时出错: {e}")
except Exception as e:
self.logger.error(f"连接失败: {e}")
async def handle_message(self, message):
"""处理接收到的消息"""
try:
# 尝试解析JSON消息
data = json.loads(message)
# 根据消息类型处理
if data.get('type') == 'audio':
await self.handle_audio_data(data)
elif data.get('type') == 'text':
self.handle_text_message(data)
elif data.get('type') == 'user_join':
self.logger.info(f"用户加入: {data.get('username')}")
elif data.get('type') == 'user_leave':
self.logger.info(f"用户离开: {data.get('username')}")
except json.JSONDecodeError:
# 如果不是JSON,可能是纯文本消息
self.handle_text_message({'content': message, 'username': 'System'})
async def handle_audio_data(self, audio_data: Dict):
"""处理音频数据"""
try:
# 提取音频信息
username = audio_data.get('username', 'Unknown')
audio_content = audio_data.get('audio', '')
timestamp = datetime.now()
# 保存音频文件(如果需要)
if self.save_audio:
audio_file = self.save_audio_file(username, audio_content, timestamp)
self.logger.info(f"音频已保存: {audio_file}")
# 将音频加入处理队列
self.audio_queue.put({
'username': username,
'audio': audio_content,
'timestamp': timestamp
})
except Exception as e:
self.logger.error(f"处理音频数据时出错: {e}")
def handle_text_message(self, text_data: Dict):
"""处理文本消息"""
username = text_data.get('username', 'System')
content = text_data.get('content', '')
timestamp = datetime.now()
# 记录文本消息
log_entry = f"[{timestamp}] {username}: {content}"
self.logger.info(log_entry)
def process_audio_queue(self):
"""处理音频队列(语音转文字)"""
while self.is_listening:
try:
# 从队列获取音频数据
audio_item = self.audio_queue.get(timeout=1)
# 语音转文字
text = self.speech_to_text(audio_item['audio'])
if text:
# 记录转换结果
log_entry = f"[{audio_item['timestamp']}] {audio_item['username']} [语音]: {text}"
self.logger.info(log_entry)
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"处理音频队列时出错: {e}")
def speech_to_text(self, audio_data) -> Optional[str]:
"""语音转文字"""
try:
# 这里假设audio_data是音频文件路径或音频数据
# 实际使用时需要根据具体情况调整
# 方法1:使用Google语音识别
# audio_file = sr.AudioFile(audio_data)
# with audio_file as source:
# audio = self.recognizer.record(source)
# text = self.recognizer.recognize_google(audio, language='zh-CN')
# 方法2:使用本地语音识别(需要安装Vosk)
# import vosk
# model = vosk.Model("model_path")
# rec = vosk.KaldiRecognizer(model, 16000)
# ...
# 这里返回示例文本
return "这是一段语音转换的文本"
except sr.UnknownValueError:
self.logger.warning("无法识别语音")
except sr.RequestError as e:
self.logger.error(f"语音识别服务错误: {e}")
except Exception as e:
self.logger.error(f"语音转文字时出错: {e}")
return None
def save_audio_file(self, username: str, audio_data: str, timestamp: datetime) -> str:
"""保存音频文件"""
filename = f"{timestamp.strftime('%Y%m%d_%H%M%S')}_{username}.wav"
filepath = os.path.join(self.audio_save_path, filename)
# 这里需要根据实际的音频数据格式进行保存
# 例如,如果audio_data是base64编码的
# import base64
# audio_bytes = base64.b64decode(audio_data)
# with open(filepath, 'wb') as f:
# f.write(audio_bytes)
return filepath
def stop_listening(self):
"""停止监听"""
self.is_listening = False
self.logger.info("监听已停止")
# 使用示例
async def main():
# 配置监听器
monitor = VoiceChatMonitor(
room_url="ws://your-chat-server.com/room/123",
save_audio=True,
audio_save_path="./recordings",
log_file="./chat_history.txt"
)
try:
# 开始监听
await monitor.connect_to_room()
except KeyboardInterrupt:
monitor.stop_listening()
if __name__ == "__main__":
asyncio.run(main())方案二:基于本地音频捕获的监听
import pyaudio
import wave
import threading
import speech_recognition as sr
from datetime import datetime
import queue
import os
import numpy as np
class LocalAudioMonitor:
"""本地音频监听器(捕获麦克风输入)"""
def __init__(self,
log_file: str = "./chat_logs.txt",
save_audio: bool = False,
audio_save_path: str = "./recordings"):
self.log_file = log_file
self.save_audio = save_audio
self.audio_save_path = audio_save_path
self.is_recording = False
self.audio_queue = queue.Queue()
self.recognizer = sr.Recognizer()
# 音频参数
self.CHUNK = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
# 创建保存目录
if save_audio and not os.path.exists(audio_save_path):
os.makedirs(audio_save_path)
def start_monitoring(self):
"""开始监听"""
self.is_recording = True
# 启动音频捕获线程
capture_thread = threading.Thread(target=self.capture_audio)
capture_thread.daemon = True
capture_thread.start()
# 启动语音识别线程
recognition_thread = threading.Thread(target=self.process_audio)
recognition_thread.daemon = True
recognition_thread.start()
print("开始监听语音...")
def capture_audio(self):
"""捕获音频"""
p = pyaudio.PyAudio()
try:
stream = p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
frames = []
silent_chunks = 0
recording = False
while self.is_recording:
data = stream.read(self.CHUNK, exception_on_overflow=False)
# 检测音量(简单的声音活动检测)
audio_data = np.frombuffer(data, dtype=np.int16)
volume = np.abs(audio_data).mean()
if volume > 500: # 音量阈值
if not recording:
recording = True
frames = []
print("检测到语音开始...")
frames.append(data)
silent_chunks = 0
else:
if recording:
silent_chunks += 1
# 如果静音超过一定时间,认为一句话结束
if silent_chunks > 20: # 约0.5秒静音
recording = False
self.save_audio_chunk(frames)
frames = []
except Exception as e:
print(f"音频捕获错误: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
def save_audio_chunk(self, frames):
"""保存音频片段"""
if not frames:
return
timestamp = datetime.now()
if self.save_audio:
filename = f"audio_{timestamp.strftime('%Y%m%d_%H%M%S')}.wav"
filepath = os.path.join(self.audio_save_path, filename)
p = pyaudio.PyAudio()
wf = wave.open(filepath, 'wb')
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b''.join(frames))
wf.close()
p.terminate()
print(f"音频已保存: {filepath}")
# 将音频数据加入识别队列
self.audio_queue.put({
'frames': frames,
'timestamp': timestamp
})
def process_audio(self):
"""处理音频(语音识别)"""
while self.is_recording:
try:
audio_item = self.audio_queue.get(timeout=1)
# 将音频数据转换为AudioData对象
audio_data = sr.AudioData(
b''.join(audio_item['frames']),
self.RATE,
2 # sample width
)
try:
# 使用Google语音识别
text = self.recognizer.recognize_google(
audio_data,
language='zh-CN'
)
# 记录识别结果
log_entry = f"[{audio_item['timestamp']}] [语音识别]: {text}"
self.save_log(log_entry)
print(log_entry)
except sr.UnknownValueError:
pass # 忽略无法识别的语音
except sr.RequestError as e:
print(f"语音识别服务错误: {e}")
except queue.Empty:
continue
def save_log(self, log_entry: str):
"""保存日志"""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_entry + '\n')
def stop_monitoring(self):
"""停止监听"""
self.is_recording = False
print("监听已停止")
# 使用示例
def main():
monitor = LocalAudioMonitor(
log_file="./voice_chat_log.txt",
save_audio=True,
audio_save_path="./recordings"
)
try:
monitor.start_monitoring()
input("按回车键停止监听...\n")
finally:
monitor.stop_monitoring()
if __name__ == "__main__":
main()方案三:基于Vosk的离线语音识别
import json
import wave
import sys
import queue
import threading
import pyaudio
from vosk import Model, KaldiRecognizer
from datetime import datetime
import os
class VoskVoiceMonitor:
"""使用Vosk的离线语音监听器"""
def __init__(self,
model_path: str = "vosk-model-small-cn-0.22",
log_file: str = "./chat_logs.txt"):
self.model_path = model_path
self.log_file = log_file
self.is_recording = False
self.audio_queue = queue.Queue()
# 检查模型是否存在
if not os.path.exists(model_path):
print(f"请下载Vosk模型: {model_path}")
print("下载地址: https://alphacephei.com/vosk/models")
sys.exit(1)
# 加载模型
self.model = Model(model_path)
self.recognizer = KaldiRecognizer(self.model, 16000)
# 音频参数
self.CHUNK = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
def start_monitoring(self):
"""开始监听"""
self.is_recording = True
# 启动音频捕获线程
capture_thread = threading.Thread(target=self.capture_audio)
capture_thread.daemon = True
capture_thread.start()
# 启动语音识别线程
recognition_thread = threading.Thread(target=self.process_audio)
recognition_thread.daemon = True
recognition_thread.start()
print("Vosk离线语音监听已启动...")
def capture_audio(self):
"""捕获音频"""
p = pyaudio.PyAudio()
stream = p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
while self.is_recording:
data = stream.read(self.CHUNK, exception_on_overflow=False)
self.audio_queue.put(data)
stream.stop_stream()
stream.close()
p.terminate()
def process_audio(self):
"""处理音频识别"""
while self.is_recording:
try:
data = self.audio_queue.get(timeout=1)
if self.recognizer.AcceptWaveform(data):
result = json.loads(self.recognizer.Result())
text = result.get('text', '')
if text:
timestamp = datetime.now()
log_entry = f"[{timestamp}] {text}"
self.save_log(log_entry)
print(f"识别: {text}")
except queue.Empty:
continue
def save_log(self, log_entry: str):
"""保存日志"""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_entry + '\n')
def stop_monitoring(self):
"""停止监听"""
self.is_recording = False
print("监听已停止")
# 使用示例
def main():
monitor = VoskVoiceMonitor(
model_path="vosk-model-small-cn-0.22", # 需要先下载模型
log_file="./voice_chat_log.txt"
)
try:
monitor.start_monitoring()
input("按回车键停止监听...\n")
finally:
monitor.stop_monitoring()
if __name__ == "__main__":
main()安装依赖
# 基础依赖
pip install websockets pyaudio numpy
# 语音识别依赖
pip install SpeechRecognition
# 或者使用Vosk(推荐离线使用)
pip install vosk
# 可能需要安装系统依赖
# Ubuntu/Debian:
sudo apt-get install portaudio19-dev python3-pyaudio
# macOS:
brew install portaudio
# Windows:
# 通常直接安装pyaudio即可使用建议
选择合适的方案:
- 方案一:适合监听WebSocket协议的语音聊天室
- 方案二:适合本地音频捕获
- 方案三:适合离线环境,无需网络
性能优化:
- 使用多线程处理音频和识别
- 实现声音活动检测(VAD)减少处理量
- 考虑使用GPU加速语音识别
错误处理:
- 添加重连机制
- 记录错误日志
- 异常情况的处理
隐私保护:
- 确保遵守相关法律法规
- 明确告知用户正在录音
- 安全存储录音文件
这个方案可以根据具体需求进行调整和扩展。
试了一下方案二,录音保存功能很稳定。不过那个音量阈值500对于不同麦克风可能需要调整,建议可以加一个自动校准的功能就更完美了。
As a podcaster, I'm thinking about using this to automatically transcribe my live shows. The audio capture logic with the silence detection is really clever. Saved me hours of work!
方案三的Vosk离线识别太实用了,我们公司内部网络不能连外网,这个正好解决了大问题。代码几乎可以直接拿来用,点赞!
This is exactly what I've been looking for! I've been trying to build a voice logger for my online gaming community to catch toxic players, and the WebSocket example is pure gold. Thanks for the detailed breakdown!