完整的语音聊天室监听记录方案
import asyncio
import websockets
import json
import sqlite3
from datetime import datetime
from typing import Optional, Dict, List, Any
import logging
from dataclasses import dataclass, asdict
import os
import signal
import sys
from contextlib import contextmanager
import threading
import time
# ============ 数据模型 ============
@dataclass
class UserEvent:
"""用户事件数据模型"""
event_id: Optional[int]
user_id: str
username: str
event_type: str # 'join' 或 'leave'
room_id: str
timestamp: datetime
ip_address: Optional[str] = None
@dataclass
class ChatMessage:
"""聊天消息数据模型"""
message_id: Optional[int]
user_id: str
username: str
content: str
message_type: str # 'text' 或 'voice'
room_id: str
timestamp: datetime
is_deleted: bool = False
@dataclass
class RoomInfo:
"""房间信息数据模型"""
room_id: str
room_name: str
created_at: datetime
is_active: bool = True
max_users: int = 100
current_users: int = 0
# ============ 数据库管理器 ============
class DatabaseManager:
"""数据库管理类"""
def __init__(self, db_path: str = "chatroom_records.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库表"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 创建用户事件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_events (
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
username TEXT NOT NULL,
event_type TEXT NOT NULL,
room_id TEXT NOT NULL,
timestamp DATETIME NOT NULL,
ip_address TEXT
)
''')
# 创建聊天消息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS chat_messages (
message_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
username TEXT NOT NULL,
content TEXT,
message_type TEXT NOT NULL,
room_id TEXT NOT NULL,
timestamp DATETIME NOT NULL,
is_deleted BOOLEAN DEFAULT 0
)
''')
# 创建房间信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS rooms (
room_id TEXT PRIMARY KEY,
room_name TEXT NOT NULL,
created_at DATETIME NOT NULL,
is_active BOOLEAN DEFAULT 1,
max_users INTEGER DEFAULT 100,
current_users INTEGER DEFAULT 0
)
''')
# 创建索引以提高查询效率
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_user_events_timestamp
ON user_events(timestamp)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_chat_messages_timestamp
ON chat_messages(timestamp)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_chat_messages_room
ON chat_messages(room_id, timestamp)
''')
conn.commit()
@contextmanager
def get_connection(self):
"""获取数据库连接的上下文管理器"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def save_user_event(self, event: UserEvent):
"""保存用户事件"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO user_events
(user_id, username, event_type, room_id, timestamp, ip_address)
VALUES (?, ?, ?, ?, ?, ?)
''', (
event.user_id, event.username, event.event_type,
event.room_id, event.timestamp.isoformat(), event.ip_address
))
conn.commit()
return cursor.lastrowid
def save_chat_message(self, message: ChatMessage):
"""保存聊天消息"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO chat_messages
(user_id, username, content, message_type, room_id, timestamp, is_deleted)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
message.user_id, message.username, message.content,
message.message_type, message.room_id,
message.timestamp.isoformat(), message.is_deleted
))
conn.commit()
return cursor.lastrowid
def update_room_users(self, room_id: str, user_count: int):
"""更新房间在线人数"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE rooms SET current_users = ?
WHERE room_id = ?
''', (user_count, room_id))
conn.commit()
def get_recent_messages(self, room_id: str, limit: int = 100) -> List[Dict]:
"""获取最近的消息"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM chat_messages
WHERE room_id = ? AND is_deleted = 0
ORDER BY timestamp DESC LIMIT ?
''', (room_id, limit))
return [dict(row) for row in cursor.fetchall()]
def get_user_activity(self, user_id: str, limit: int = 50) -> List[Dict]:
"""获取用户活动记录"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM user_events
WHERE user_id = ?
ORDER BY timestamp DESC LIMIT ?
''', (user_id, limit))
return [dict(row) for row in cursor.fetchall()]
# ============ 聊天室监听器 ============
class ChatRoomMonitor:
"""聊天室监听主类"""
def __init__(self,
websocket_url: str,
room_id: str,
db_path: str = "chatroom_records.db",
log_file: str = "chatroom_monitor.log"):
"""
初始化监听器
:param websocket_url: WebSocket连接URL
:param room_id: 房间ID
:param db_path: 数据库路径
:param log_file: 日志文件路径
"""
self.websocket_url = websocket_url
self.room_id = room_id
self.db_manager = DatabaseManager(db_path)
self.is_running = False
self.websocket = None
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
self.reconnect_delay = 5 # 秒
# 在线用户列表
self.online_users = {}
# 设置日志
self.setup_logging(log_file)
# 注册信号处理
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def setup_logging(self, log_file: str):
"""设置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def signal_handler(self, signum, frame):
"""信号处理函数"""
self.logger.info("接收到停止信号,正在关闭...")
self.stop()
sys.exit(0)
async def connect(self):
"""建立WebSocket连接"""
try:
self.websocket = await websockets.connect(
self.websocket_url,
ping_interval=20,
ping_timeout=60
)
self.reconnect_attempts = 0
self.logger.info(f"成功连接到聊天室: {self.websocket_url}")
# 发送加入房间消息
await self.send_join_message()
except Exception as e:
self.logger.error(f"连接失败: {e}")
await self.handle_reconnect()
async def send_join_message(self):
"""发送加入房间的消息"""
join_message = {
'type': 'join_room',
'room_id': self.room_id,
'timestamp': datetime.now().isoformat()
}
await self.websocket.send(json.dumps(join_message))
async def handle_reconnect(self):
"""处理重连"""
if self.reconnect_attempts < self.max_reconnect_attempts:
self.reconnect_attempts += 1
self.logger.info(f"尝试重连 ({self.reconnect_attempts}/{self.max_reconnect_attempts})...")
await asyncio.sleep(self.reconnect_delay)
await self.connect()
else:
self.logger.error("达到最大重连次数,停止监听")
self.stop()
async def listen(self):
"""开始监听"""
self.is_running = True
while self.is_running:
try:
await self.connect()
# 监听消息
async for message in self.websocket:
await self.process_message(message)
except websockets.exceptions.ConnectionClosed:
self.logger.warning("连接已关闭")
await self.handle_reconnect()
except Exception as e:
self.logger.error(f"监听过程发生错误: {e}")
await self.handle_reconnect()
async def process_message(self, message: str):
"""处理接收到的消息"""
try:
data = json.loads(message)
message_type = data.get('type', 'unknown')
# 根据消息类型处理
handlers = {
'text': self.handle_text_message,
'user_join': self.handle_user_join,
'user_leave': self.handle_user_leave,
'room_info': self.handle_room_info,
'user_list': self.handle_user_list,
'voice': self.handle_voice_message,
'system': self.handle_system_message
}
handler = handlers.get(message_type)
if handler:
await handler(data)
else:
self.logger.debug(f"未处理的消息类型: {message_type}")
except json.JSONDecodeError:
self.logger.error(f"无法解析的消息: {message[:100]}")
async def handle_text_message(self, data: Dict[str, Any]):
"""处理文本消息"""
try:
message = ChatMessage(
message_id=None,
user_id=data.get('user_id', 'unknown'),
username=data.get('username', '未知用户'),
content=data.get('content', ''),
message_type='text',
room_id=self.room_id,
timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat()))
)
# 保存到数据库
message_id = self.db_manager.save_chat_message(message)
# 记录日志
self.logger.info(f"[文本消息] {message.username}: {message.content}")
# 触发回调(可用于实时显示)
self.on_new_message(message)
except Exception as e:
self.logger.error(f"处理文本消息失败: {e}")
async def handle_user_join(self, data: Dict[str, Any]):
"""处理用户加入事件"""
try:
event = UserEvent(
event_id=None,
user_id=data.get('user_id', 'unknown'),
username=data.get('username', '未知用户'),
event_type='join',
room_id=self.room_id,
timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat())),
ip_address=data.get('ip_address')
)
# 保存到数据库
event_id = self.db_manager.save_user_event(event)
# 更新在线用户列表
self.online_users[event.user_id] = {
'username': event.username,
'join_time': event.timestamp
}
# 更新房间人数
self.db_manager.update_room_users(self.room_id, len(self.online_users))
# 记录日志
self.logger.info(f"[用户加入] {event.username} (ID: {event.user_id})")
self.logger.info(f"当前在线人数: {len(self.online_users)}")
# 触发回调
self.on_user_join(event)
except Exception as e:
self.logger.error(f"处理用户加入事件失败: {e}")
async def handle_user_leave(self, data: Dict[str, Any]):
"""处理用户离开事件"""
try:
user_id = data.get('user_id', 'unknown')
username = data.get('username', '未知用户')
event = UserEvent(
event_id=None,
user_id=user_id,
username=username,
event_type='leave',
room_id=self.room_id,
timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat()))
)
# 保存到数据库
event_id = self.db_manager.save_user_event(event)
# 从在线用户列表中移除
if user_id in self.online_users:
del self.online_users[user_id]
# 更新房间人数
self.db_manager.update_room_users(self.room_id, len(self.online_users))
# 记录日志
self.logger.info(f"[用户离开] {username} (ID: {user_id})")
self.logger.info(f"当前在线人数: {len(self.online_users)}")
# 触发回调
self.on_user_leave(event)
except Exception as e:
self.logger.error(f"处理用户离开事件失败: {e}")
async def handle_room_info(self, data: Dict[str, Any]):
"""处理房间信息"""
try:
room_info = RoomInfo(
room_id=self.room_id,
room_name=data.get('room_name', '未知房间'),
created_at=datetime.fromisoformat(data.get('created_at', datetime.now().isoformat())),
max_users=data.get('max_users', 100),
current_users=data.get('current_users', 0)
)
# 保存房间信息到数据库
with self.db_manager.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO rooms
(room_id, room_name, created_at, max_users, current_users)
VALUES (?, ?, ?, ?, ?)
''', (
room_info.room_id, room_info.room_name,
room_info.created_at.isoformat(),
room_info.max_users, room_info.current_users
))
conn.commit()
self.logger.info(f"[房间信息] {room_info.room_name} (最大人数: {room_info.max_users})")
except Exception as e:
self.logger.error(f"处理房间信息失败: {e}")
async def handle_user_list(self, data: Dict[str, Any]):
"""处理用户列表更新"""
try:
users = data.get('users', [])
self.online_users = {}
for user in users:
self.online_users[user['user_id']] = {
'username': user['username'],
'join_time': datetime.fromisoformat(user.get('join_time', datetime.now().isoformat()))
}
self.logger.info(f"用户列表已更新,当前在线: {len(self.online_users)}人")
except Exception as e:
self.logger.error(f"处理用户列表失败: {e}")
async def handle_voice_message(self, data: Dict[str, Any]):
"""处理语音消息"""
try:
# 对于语音消息,我们只记录元数据,不保存音频文件
message = ChatMessage(
message_id=None,
user_id=data.get('user_id', 'unknown'),
username=data.get('username', '未知用户'),
content="[语音消息]",
message_type='voice',
room_id=self.room_id,
timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat()))
)
# 保存到数据库
message_id = self.db_manager.save_chat_message(message)
# 记录日志
self.logger.info(f"[语音消息] {message.username}: 发送了一条语音消息")
except Exception as e:
self.logger.error(f"处理语音消息失败: {e}")
async def handle_system_message(self, data: Dict[str, Any]):
"""处理系统消息"""
try:
content = data.get('content', '')
self.logger.info(f"[系统消息] {content}")
except Exception as e:
self.logger.error(f"处理系统消息失败: {e}")
def on_new_message(self, message: ChatMessage):
"""新消息回调函数"""
# 可以在这里实现实时消息显示或其他处理
pass
def on_user_join(self, event: UserEvent):
"""用户加入回调函数"""
pass
def on_user_leave(self, event: UserEvent):
"""用户离开回调函数"""
pass
def stop(self):
"""停止监听"""
self.is_running = False
if self.websocket:
asyncio.create_task(self.websocket.close())
self.logger.info("监听器已停止")
def get_statistics(self) -> Dict:
"""获取统计信息"""
with self.db_manager.get_connection() as conn:
cursor = conn.cursor()
# 获取总消息数
cursor.execute('SELECT COUNT(*) FROM chat_messages WHERE room_id = ?', (self.room_id,))
total_messages = cursor.fetchone()[0]
# 获取总用户事件数
cursor.execute('SELECT COUNT(*) FROM user_events WHERE room_id = ?', (self.room_id,))
total_events = cursor.fetchone()[0]
# 获取活跃用户数
cursor.execute('''
SELECT COUNT(DISTINCT user_id) FROM user_events WHERE room_id = ?
''', (self.room_id,))
unique_users = cursor.fetchone()[0]
return {
'total_messages': total_messages,
'total_events': total_events,
'unique_users': unique_users,
'current_online': len(self.online_users)
}
# ============ 数据导出器 ============
class DataExporter:
"""数据导出工具类"""
def __init__(self, db_path: str = "chatroom_records.db"):
self.db_path = db_path
def export_to_json(self, room_id: str, output_file: str):
"""导出数据到JSON文件"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
# 导出消息
messages = conn.execute('''
SELECT * FROM chat_messages
WHERE room_id = ?
ORDER BY timestamp
''', (room_id,)).fetchall()
# 导出用户事件
events = conn.execute('''
SELECT * FROM user_events
WHERE room_id = ?
ORDER BY timestamp
''', (room_id,)).fetchall()
# 导出房间信息
room = conn.execute('''
SELECT * FROM rooms WHERE room_id = ?
''', (room_id,)).fetchone()
data = {
'room': dict(room) if room else None,
'messages': [dict(m) for m in messages],
'user_events': [dict(e) for e in events],
'export_time': datetime.now().isoformat()
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"数据已导出到: {output_file}")
def export_to_html(self, room_id: str, output_file: str):
"""导出数据到HTML文件"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
# 获取房间信息
room = conn.execute('''
SELECT * FROM rooms WHERE room_id = ?
''', (room_id,)).fetchone()
# 获取所有消息
messages = conn.execute('''
SELECT * FROM chat_messages
WHERE room_id = ?
ORDER BY timestamp
''', (room_id,)).fetchall()
# 获取用户事件
events = conn.execute('''
SELECT * FROM user_events
WHERE room_id = ?
ORDER BY timestamp
''', (room_id,)).fetchall()
# 生成HTML内容
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>聊天室记录 - {room['room_name'] if room else room_id}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.container {{ max-width: 800px; margin: 0 auto; }}
.header {{ background: #f0f0f0; padding: 10px; border-radius: 5px; }}
.message {{ margin: 10px 0; padding: 10px; border-left: 3px solid #007bff; }}
.text-message {{ background: #f8f9fa; }}
.voice-message {{ background: #e3f2fd; }}
.event {{ margin: 5px 0; padding: 5px; background: #fff3cd; }}
.join {{ color: #28a745; }}
.leave {{ color: #dc3545; }}
.timestamp {{ color: #6c757d; font-size: 0.9em; }}
.username {{ font-weight: bold; color: #007bff; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>聊天室记录: {room['room_name'] if room else room_id}</h1>
<p>导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p>总消息数: {len(messages)} | 用户事件数: {len(events)}</p>
</div>
<h2>消息记录</h2>
<div class="messages">
"""
for msg in messages:
msg_class = 'voice-message' if msg['message_type'] == 'voice' else 'text-message'
content = msg['content'] if msg['content'] else '[语音消息]'
html_content += f"""
<div class="message {msg_class}">
<span class="timestamp">{msg['timestamp']}</span>
<span class="username">{msg['username']}:</span>
<span class="content">{content}</span>
</div>
"""
html_content += """
</div>
<h2>用户进出记录</h2>
<div class="events">
"""
for event in events:
event_class = 'join' if event['event_type'] == 'join' else 'leave'
action = '加入' if event['event_type'] == 'join' else '离开'
html_content += f"""
<div class="event {event_class}">
<span class="timestamp">{event['timestamp']}</span>
<span class="username">{event['username']}</span>
<span>{action}了房间</span>
</div>
"""
html_content += """
</div>
</div>
</body>
</html>
"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"HTML报告已生成: {output_file}")
# ============ 主程序 ============
async def main():
"""主函数"""
# 配置参数
config = {
'websocket_url': 'ws://your-chat-server.com/ws', # 替换为实际的WebSocket URL
'room_id': 'room_123', # 替换为实际的房间ID
'db_path': 'chatroom_records.db',
'log_file': 'chatroom_monitor.log'
}
# 创建监听器
monitor = ChatRoomMonitor(
websocket_url=config['websocket_url'],
room_id=config['room_id'],
db_path=config['db_path'],
log_file=config['log_file']
)
try:
# 启动监听
print(f"开始监听房间: {config['room_id']}")
print("按 Ctrl+C 停止监听")
listen_task = asyncio.create_task(monitor.listen())
# 定期输出统计信息
while True:
await asyncio.sleep(60) # 每分钟输出一次统计
stats = monitor.get_statistics()
print(f"\n=== 统计信息 ===")
print(f"在线人数: {stats['current_online']}")
print(f"总消息数: {stats['total_messages']}")
print(f"总事件数: {stats['total_events']}")
print(f"独立用户数: {stats['unique_users']}")
except KeyboardInterrupt:
print("\n正在停止监听...")
monitor.stop()
finally:
# 导出数据
exporter = DataExporter(config['db_path'])
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
exporter.export_to_json(config['room_id'], f"export_{timestamp}.json")
exporter.export_to_html(config['room_id'], f"report_{timestamp}.html")
print("数据导出完成")
if __name__ == "__main__":
asyncio.run(main())使用说明
1. 安装依赖
pip install websockets aiosqlite2. 配置参数
修改 main() 函数中的配置:
config = {
'websocket_url': 'ws://your-chat-server.com/ws', # WebSocket服务器地址
'room_id': 'room_123', # 要监听的房间ID
'db_path': 'chatroom_records.db', # 数据库文件路径
'log_file': 'chatroom_monitor.log' # 日志文件路径
}3. 运行程序
python chatroom_monitor.py4. 数据导出
程序会自动生成两种格式的导出文件:
- JSON格式:便于程序处理
- HTML格式:便于人类阅读
功能特点
完整的记录功能:
- 记录所有文字聊天消息
- 记录用户进出房间事件
- 记录语音消息的元数据
- 记录房间信息变更
数据持久化:
- 使用SQLite数据库存储
- 支持数据导出(JSON/HTML)
- 支持数据统计和分析
实时监控:
- 显示当前在线用户
- 实时记录用户行为
- 每分钟输出统计信息
可靠性:
- 自动重连机制
- 信号处理(优雅关闭)
- 错误日志记录
可扩展性:
- 支持添加自定义回调函数
- 模块化设计,易于扩展
- 支持多种消息类型
这个方案完整地实现了您需要的功能:监听并保存聊天室的文字消息,记录用户进出的ID和时间,并提供数据导出功能。
用了三天,记录了几万条消息,查询速度还是很快,索引优化做得不错。要是能加个简单的Web管理界面就更完美了,不过现在的功能已经完全够用了。
This is much more comprehensive than the basic WebSocket listeners I've seen. The separation into DatabaseManager, ChatRoomMonitor, and DataExporter classes makes it easy to customize. I've already adapted it for my Slack-like internal tool.
试了一下重连机制,很稳定。模拟断网5分钟,恢复后自动连上了,数据一条没丢。就是那个60秒的统计输出,建议可以做成可配置的,有些场景想要更频繁的监控。
The HTML export feature is a nice touch! I run a small gaming community and being able to generate readable reports for moderators is super helpful. The color coding for join/leave events makes it easy to spot patterns.
数据库设计得很专业,连ip_address都考虑到了,这对审计很有帮助。不过建议可以再加一个字段记录用户的操作系统或浏览器信息,有时候能发现一些有趣的数据。