hxp
6 天以前 ab57c59ab33f5e4bf7e3bfd5514b04cb3d38dc2b
Merge branch 'master' of http://192.168.1.20:10010/r/Project_SG_ServerCode
10个文件已添加
1515 ■■■■■ 已修改文件
ServerPython/EventServerPY/Config.ini 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/README.md 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/clients_manager.py 126 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/config.py 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/file_manager.py 232 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/main.py 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/packet_logger.py 159 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/packet_processor.py 173 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/protocol.py 214 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/server.py 264 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ServerPython/EventServerPY/Config.ini
New file
@@ -0,0 +1,16 @@
[WriteFile]
# 写文件方式(1: 按天存储, 2: 按小时存文件, 3: 按文件大小存文件, 4: 存为单个文件)
WriteMode=1
# 每个日志文件的大小(KB)(仅当WriteMode == 3时生效)
MaxFileSize=1024
# 事件记录文件的路径
LogFilePath=.\EventData
[PacketLog]
# 是否启用封包日志(0: 否, 1: 是)
EnablePacketLog=1
# 封包日志基础路径(自动按日期创建子目录)
PacketLogPath=C:\ServerLog
[Network]
ListenPort=60005
ServerPython/EventServerPY/README.md
New file
@@ -0,0 +1,87 @@
# EventServer Python版本
这是一个Python 2.7版本的游戏事件接收与记录服务器,用于接收游戏客户端发送的事件数据并记录到本地文件中。
## 功能特性
- TCP网络服务,接收游戏客户端/服务器事件
- 支持多种事件协议(二进制事件、字符串事件)
- 客户端登录/登出/心跳管理
- 灵活的文件写入策略(按天/按小时/按大小/单文件)
- 线程安全的数据处理
- 实时状态监控
## 文件结构
```
EventServerPY/
├── main.py              # 主程序入口
├── config.py            # 配置文件管理
├── protocol.py          # 网络协议定义
├── server.py            # TCP服务器
├── clients_manager.py   # 客户端管理
├── file_manager.py      # 文件写入管理
├── packet_processor.py  # 数据包处理
└── Config.ini           # 配置文件
```
## 配置说明
Config.ini 配置文件说明:
```ini
[WriteFile]
WriteMode=1              ; 1=按天, 2=按小时, 3=按大小, 4=单文件
MaxFileSize=1024         ; 最大文件大小(KB),仅WriteMode=3时有效
LogFilePath=.\EventLogs  ; 日志文件路径
[Network]
ListenPort=60000         ; 监听端口
```
## 启动方式
```bash
python main.py
```
## 数据格式
事件数据以JSON格式写入日志文件:
```json
{
    "event_id": "1001",
    "data": "event_data_string",
    "timestamp": "2026-02-01 12:00:00",
    "cid": 1
}
```
## 协议说明
### 协议头 (2字节)
- Cmd: 主命令 (1字节)
- SubCmd: 子命令 (1字节)
### 主要协议
- `0x01 0x01` - 登录
- `0x01 0x02` - 登出
- `0x01 0x03` - 事件发送
- `0x01 0x04` - 心跳
- `0x01 0x05` - 字符串事件发送
## 注意事项
1. 确保Python版本为2.7
2. 确保有足够的磁盘空间存储日志文件
3. 端口60000需要在防火墙中开放
4. 建议使用命令行启动以便查看实时日志
## 状态监控
服务器每5秒显示一次状态:
- Connections: 当前连接数
- Queue: 待处理数据包数量
- Success: 成功写入次数(增量/总计)
- Fail: 失败写入次数(增量/总计)
ServerPython/EventServerPY/clients_manager.py
New file
@@ -0,0 +1,126 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
客户端管理器
用于管理客户端连接状态和登录状态
"""
import threading
from protocol import LOGIN_MAGIC_CODE
from packet_logger import packet_logger
# 客户端状态定义
CLIENT_STATUS_NOT_CONNECT = 0
CLIENT_STATUS_NOT_LOGIN = 1
CLIENT_STATUS_LOGIN = 2
class ClientsMgr:
    """客户端管理器单例类"""
    _instance = None
    _lock = threading.Lock()
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(ClientsMgr, cls).__new__(cls)
                    cls._instance._initialized = False
        return cls._instance
    @classmethod
    def instance(cls):
        """获取单例实例"""
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance
    def __init__(self):
        if getattr(self, '_initialized', False):
            return
        self._initialized = True
        self.status_map = {}  # cid -> status
        self.server_id_map = {}  # cid -> server_id
        self.map_lock = threading.Lock()
    def set_client_status(self, cid, status):
        """设置客户端状态"""
        with self.map_lock:
            self.status_map[cid] = status
    def get_client_status(self, cid):
        """获取客户端状态"""
        with self.map_lock:
            return self.status_map.get(cid, CLIENT_STATUS_NOT_CONNECT)
    def set_client_server_id(self, cid, server_id):
        """设置客户端对应的Server ID"""
        with self.map_lock:
            self.server_id_map[cid] = server_id
    def get_client_server_id(self, cid):
        """获取客户端对应的Server ID"""
        with self.map_lock:
            return self.server_id_map.get(cid, 0)
    def del_client_cache(self, cid):
        """删除客户端缓存数据"""
        with self.map_lock:
            if cid in self.status_map:
                del self.status_map[cid]
            if cid in self.server_id_map:
                del self.server_id_map[cid]
    def on_client_disconnect(self, cid):
        """客户端断开连接时调用"""
        self.del_client_cache(cid)
    def get_logged_in_clients(self):
        """获取所有已登录的客户端ID列表"""
        with self.map_lock:
            return [cid for cid, status in self.status_map.items() if status == CLIENT_STATUS_LOGIN]
    def clear(self):
        """清空所有客户端数据"""
        with self.map_lock:
            self.status_map.clear()
            self.server_id_map.clear()
def handle_login(cid, packet):
    """处理客户端登录请求"""
    clients_mgr = ClientsMgr.instance()
    if packet.magic_code != LOGIN_MAGIC_CODE:
        packet_logger.log_text('Invalid magic code from client %d, expected 0x%08X, got 0x%08X' %
                              (cid, LOGIN_MAGIC_CODE, packet.magic_code), 'ERROR')
        return False
    clients_mgr.set_client_server_id(cid, packet.server_id)
    clients_mgr.set_client_status(cid, CLIENT_STATUS_LOGIN)
    packet_logger.log_text('Client %d logged in - Server: %d' % (cid, packet.server_id), 'INFO')
    return True
def handle_logout(cid, packet):
    """处理客户端登出请求"""
    clients_mgr = ClientsMgr.instance()
    clients_mgr.on_client_disconnect(cid)
    packet_logger.log_text('Client %d logged out' % cid, 'INFO')
def handle_heartbeat(cid, packet):
    """处理客户端心跳包"""
    clients_mgr = ClientsMgr.instance()
    status = clients_mgr.get_client_status(cid)
    if status == CLIENT_STATUS_LOGIN:
        # 已登录客户端的心跳处理,可以扩展
        pass
    else:
        # 未登录客户端的心跳处理,可以扩展
        pass
ServerPython/EventServerPY/config.py
New file
@@ -0,0 +1,97 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
配置文件管理模块
"""
import os
import ConfigParser
class ConfigReader:
    """配置文件读取器单例"""
    _instance = None
    _lock = None  # 延迟导入避免循环依赖
    def __new__(cls):
        if cls._instance is None:
            # 延迟获取锁,避免导入时循环依赖
            import threading
            if cls._lock is None:
                cls._lock = threading.Lock()
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(ConfigReader, cls).__new__(cls)
                    cls._instance._initialized = False
        return cls._instance
    @classmethod
    def instance(cls):
        """获取单例实例"""
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance
    def __init__(self):
        if hasattr(self, '_initialized') and self._initialized:
            return
        self._initialized = True
        self.config = ConfigParser.ConfigParser()
        self.config_path = os.path.join(os.path.dirname(__file__), 'Config.ini')
        self.load_config()
    def load_config(self):
        """加载配置文件"""
        if not os.path.exists(self.config_path):
            self._create_default_config()
        self.config.read(self.config_path)
    def _create_default_config(self):
        """创建默认配置文件"""
        self.config.add_section('WriteFile')
        self.config.set('WriteFile', 'WriteMode', '1')  # 1=按天
        self.config.set('WriteFile', 'MaxFileSize', '1024')
        self.config.set('WriteFile', 'LogFilePath', '.\\EventLogs')
        self.config.add_section('PacketLog')
        self.config.set('PacketLog', 'EnablePacketLog', '1')
        self.config.set('PacketLog', 'PacketLogPath', 'C:\\ServerLog')
        self.config.add_section('Network')
        self.config.set('Network', 'ListenPort', '60000')
        with open(self.config_path, 'wb') as f:
            self.config.write(f)
    def get_listen_port(self):
        """获取监听端口"""
        return self.config.getint('Network', 'ListenPort')
    def get_write_mode(self):
        """获取写文件模式: 1=按天, 2=按小时, 3=按大小, 4=单文件"""
        return self.config.getint('WriteFile', 'WriteMode')
    def get_max_file_size(self):
        """获取最大文件大小(KB)"""
        return self.config.getint('WriteFile', 'MaxFileSize')
    def get_log_file_path(self):
        """获取日志文件路径"""
        return self.config.get('WriteFile', 'LogFilePath')
    def get_enable_packet_log(self):
        """是否启用封包日志"""
        return self.config.getint('PacketLog', 'EnablePacketLog')
    def get_packet_log_path(self):
        """获取封包日志路径"""
        return self.config.get('PacketLog', 'PacketLogPath')
    def save(self):
        """保存配置"""
        with open(self.config_path, 'wb') as f:
            self.config.write(f)
ServerPython/EventServerPY/file_manager.py
New file
@@ -0,0 +1,232 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
文件管理模块
负责日志文件的创建、写入和管理
"""
import os
import time
import datetime
import threading
from collections import namedtuple
from packet_logger import packet_logger
from clients_manager import ClientsMgr
# 定义文件信息元组
FileInfo = namedtuple('FileInfo', ['server_id', 'event_type'])
class FileWriteStats:
    """文件写入统计"""
    def __init__(self):
        self.lock = threading.Lock()
        self.success_count = 0
        self.fail_count = 0
    def add_success(self):
        with self.lock:
            self.success_count += 1
    def add_fail(self):
        with self.lock:
            self.fail_count += 1
    def get_success_count(self):
        with self.lock:
            return self.success_count
    def get_fail_count(self):
        with self.lock:
            return self.fail_count
    def reset(self):
        with self.lock:
            self.success_count = 0
            self.fail_count = 0
class WriteFile:
    """单个文件写入器"""
    def __init__(self, filename, success_callback, fail_callback):
        self.filename = filename
        self.file_handle = None
        self.is_ready = False
        self.success_callback = success_callback
        self.fail_callback = fail_callback
        self.open_file()
    def open_file(self):
        """打开文件"""
        try:
            # 确保目录存在
            dir_path = os.path.dirname(self.filename)
            if dir_path and not os.path.exists(dir_path):
                os.makedirs(dir_path)
            # 以追加模式打开文件
            self.file_handle = open(self.filename, 'ab')
            self.is_ready = True
        except Exception as e:
            self.is_ready = False
            print('[WriteFile] Failed to open file: %s, error: %s' % (self.filename, str(e)))
    def write_string(self, data):
        """写入字符串数据"""
        if not self.is_ready:
            if self.fail_callback:
                self.fail_callback()
            return False
        try:
            # 添加 Windows 换行符并转换为字节(二进制模式)
            line = (data + '\r\n').encode('gbk')
            self.file_handle.write(line)
            self.file_handle.flush()
            if self.success_callback:
                self.success_callback()
            return True
        except Exception as e:
            print('[WriteFile] Failed to write: %s, error: %s' % (self.filename, str(e)))
            self.is_ready = False
            if self.fail_callback:
                self.fail_callback()
            return False
    def close(self):
        """关闭文件"""
        if self.file_handle:
            try:
                self.file_handle.close()
            except:
                pass
            self.file_handle = None
            self.is_ready = False
    def __del__(self):
        self.close()
class FileMgr:
    """文件管理器"""
    def __init__(self, config):
        self.config = config
        self.lock = threading.Lock()
        self.file_map = {}  # FileInfo -> WriteFile
        self.stats = FileWriteStats()
        self.log_path = self.config.get_log_file_path()
        # 确保日志目录存在
        if not os.path.exists(self.log_path):
            try:
                os.makedirs(self.log_path)
            except Exception as e:
                print('[FileMgr] Failed to create log dir: %s, error: %s' % (self.log_path, str(e)))
    def get_instance(self, cid, event_type):
        """获取文件写入实例"""
        # 获取客户端Server ID
        clients_mgr = ClientsMgr.instance()
        server_id = clients_mgr.get_client_server_id(cid)
        if server_id == 0:
            packet_logger.log_text('FileMgr: WARNING - cid=%d has no server_id set! Defaulting to S0' % cid, 'WARN')
        file_info = FileInfo(server_id, event_type)
        with self.lock:
            # 检查是否需要创建新文件
            if file_info not in self.file_map:
                self._create_file_writer(file_info)
            else:
                write_file = self.file_map[file_info]
                if not self._check_file_valid(write_file, file_info):
                    # 文件无效,创建新文件
                    write_file.close()
                    del self.file_map[file_info]
                    self._create_file_writer(file_info)
            return self.file_map[file_info]
    def _get_file_path(self, file_info):
        """获取文件路径"""
        write_mode = self.config.get_write_mode()
        # 文件夹名称: S + Server ID
        server_folder = 'S%d' % file_info.server_id
        folder_path = os.path.join(self.log_path, server_folder)
        # 添加日期子目录: S1\2026-01-10
        date_str = datetime.datetime.now().strftime('%Y-%m-%d')
        date_folder = os.path.join(folder_path, date_str)
        if write_mode == 1:  # 按天存储
            filename = '%s_%s.txt' % (file_info.event_type, date_str)
        elif write_mode == 2:  # 按小时存储
            date_time_str = datetime.datetime.now().strftime('%Y-%m-%d_%H')
            filename = '%s_%s.txt' % (file_info.event_type, date_time_str)
        elif write_mode == 3:  # 按文件大小存储
            date_time_str = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = '%s_%s.txt' % (file_info.event_type, date_time_str)
        else:  # 单个文件
            filename = '%s.txt' % file_info.event_type
        return os.path.join(date_folder, filename)
    def _check_file_valid(self, write_file, file_info):
        """检查文件是否有效(是否需要切换)"""
        if not write_file.is_ready:
            return False
        write_mode = self.config.get_write_mode()
        # 检查日期变化(适用于所有模式)
        current_date = datetime.datetime.now().strftime('%Y-%m-%d')
        if current_date not in write_file.filename:
            return False
        if write_mode == 3:  # 按文件大小检查
            max_size = self.config.get_max_file_size() * 1024  # KB -> Bytes
            try:
                file_size = os.path.getsize(write_file.filename)
                if file_size >= max_size:
                    return False
            except:
                pass
        return True
    def _create_file_writer(self, file_info):
        """创建文件写入器"""
        filepath = self._get_file_path(file_info)
        write_file = WriteFile(
            filepath,
            self.stats.add_success,
            self.stats.add_fail
        )
        self.file_map[file_info] = write_file
    def write_event(self, cid, event_type, event_data):
        """写入事件数据"""
        write_file = self.get_instance(cid, event_type)
        return write_file.write_string(event_data)
    def get_success_count(self):
        """获取成功写入次数"""
        return self.stats.get_success_count()
    def get_fail_count(self):
        """获取失败写入次数"""
        return self.stats.get_fail_count()
    def close_all(self):
        """关闭所有文件"""
        with self.lock:
            for write_file in self.file_map.values():
                write_file.close()
            self.file_map.clear()
ServerPython/EventServerPY/main.py
New file
@@ -0,0 +1,147 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
EventServer - 事件服务器
用于接收游戏客户端事件并记录到本地文件
"""
import os
import sys
import datetime
import time
import threading
import ctypes
from config import ConfigReader
from server import EventServer
from packet_logger import packet_logger
from clients_manager import ClientsMgr
def disable_close_button():
    """禁用控制台窗口的关闭按钮"""
    try:
        # 获取当前控制台窗口句柄
        kernel32 = ctypes.windll.kernel32
        hwnd = kernel32.GetConsoleWindow()
        if hwnd:
            # 获取当前窗口样式
            user32 = ctypes.windll.user32
            style = user32.GetWindowLongW(hwnd, -16)  # GWL_STYLE = -16
            # 移除关闭按钮 (WS_SYSMENU)
            user32.SetWindowLongW(hwnd, -16, style & ~0x80000)
    except:
        pass
def main():
    """主函数"""
    os.system("title EventServer-%s" % datetime.datetime.today())
    # 禁用关闭按钮
    disable_close_button()
    # 初始化封包日志系统
    if packet_logger.enable:
        print('[Main] Packet logger initialized: %s' % packet_logger.log_dir)
    else:
        print('[Main] Packet logger disabled')
    # 初始化单例(确保线程安全和实例一致)
    ConfigReader.instance()
    ClientsMgr.instance()
    print('=' * 60)
    print('        EventServer - 游戏事件服务器')
    print('        Version: 1.0.0')
    print('        Python: 2.7')
    print('=' * 60)
    print('')
    # 创建并初始化服务器
    server = EventServer()
    # 启动服务器
    if not server.start():
        print('[Main] Failed to start server!')
        raw_input('Press Enter to exit...')
        return
    print('[Main] Server is running...')
    print('[Main] Press Ctrl+C to stop')
    print('')
    # 启动状态显示线程
    stop_event = threading.Event()
    status_thread = threading.Thread(target=show_status, args=(server, stop_event))
    status_thread.daemon = True
    status_thread.start()
    try:
        # 主循环,等待中断信号
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print('')
        print('[Main] Received interrupt signal, stopping...')
    except Exception as e:
        print('[Main] Error: %s' % str(e))
    # 设置停止事件
    stop_event.set()
    # 停止服务器
    server.stop()
    print('[Main] Goodbye!')
def show_status(server, stop_event):
    """显示服务器状态"""
    last_success = 0
    last_fail = 0
    while not stop_event.is_set():
        time.sleep(5)
        # 清屏
        os.system('cls' if os.name == 'nt' else 'clear')
        # 重新显示标题
        print('=' * 60)
        print('        EventServer - 游戏事件服务器')
        print('        Version: 1.0.0')
        print('        Python: ' + sys.version)
        print('=' * 60)
        print('[Main] Server is running...')
        print('[Main] Press Ctrl+C to stop')
        print('')
        # 获取统计数据
        conn_count = server.get_connection_count()
        queue_size = server.get_queue_size()
        success_count = server.get_success_count()
        fail_count = server.get_fail_count()
        # 计算增量
        success_delta = success_count - last_success
        fail_delta = fail_count - last_fail
        print('[Status] Connections: %d | Queue: %d | Success: +%d (Total: %d) | Fail: +%d (Total: %d)' % (
            conn_count,
            queue_size,
            success_delta,
            success_count,
            fail_delta,
            fail_count
        ))
        last_success = success_count
        last_fail = fail_count
if __name__ == '__main__':
    main()
ServerPython/EventServerPY/packet_logger.py
New file
@@ -0,0 +1,159 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
封包日志模块
提供统一的日志记录功能,支持十六进制格式输出
"""
import os
import datetime
from config import ConfigReader
class PacketLogger:
    """封包日志记录器单例"""
    _instance = None
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(PacketLogger, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    @classmethod
    def instance(cls):
        """获取单例实例"""
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance
    def __init__(self):
        if hasattr(self, '_initialized') and self._initialized:
            return
        self._initialized = True
        # 读取配置(使用单例)
        config = ConfigReader.instance()
        self.enable = config.get_enable_packet_log()
        self.log_path = config.get_packet_log_path()
        # 日志相关属性
        self.log_dir = None
        self.log_file = None
        # 初始化日志
        if self.enable:
            self._init()
    def _init(self):
        """初始化日志系统"""
        try:
            # 创建基础路径
            if not os.path.exists(self.log_path):
                os.makedirs(self.log_path)
            # 创建日期子目录: C:\ServerLog\2026-02\EventServer
            today = datetime.datetime.now()
            date_path = today.strftime('%Y-%m')
            self.log_dir = os.path.join(self.log_path, date_path, 'EventServer')
            if not os.path.exists(self.log_dir):
                os.makedirs(self.log_dir)
            # 设置日志文件名
            log_filename = 'packet_%s.log' % today.strftime('%Y%m%d')
            self.log_file = os.path.join(self.log_dir, log_filename)
        except Exception as e:
            print('[PacketLogger] Init error: %s' % str(e))
            self.enable = False
    def _check_date_change(self):
        """检查日期变化,必要时切换日志文件"""
        if not self.enable:
            return
        today = datetime.datetime.now()
        log_filename = 'packet_%s.log' % today.strftime('%Y%m%d')
        current_log_file = os.path.join(self.log_dir, log_filename)
        # 日期变化时重新初始化
        if current_log_file != self.log_file:
            self._init()
    def log(self, cid, packet_data, packet_type, direction='RECV'):
        """
        记录封包日志
        Args:
            cid: 客户端ID
            packet_data: 封包数据(字符串)
            packet_type: 封包类型(如 LOGIN, EVENT, HEARTBEAT等)
            direction: 方向(RECV=接收, SEND=发送)
        """
        return
        if not self.enable:
            return
        try:
            # 检查日期变化
            self._check_date_change()
            # 生成时间戳(精确到毫秒)
            today = datetime.datetime.now()
            timestamp = today.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            # 转换为十六进制字符串
            hex_data = ' '.join(['%02X' % ord(b) if isinstance(b, (str, unicode)) else '%02X' % b for b in packet_data])
            # 生成日志行
            log_line = '[%s] [%s] CID=%d Type=%s Len=%d Data=%s\n' % (
                timestamp,
                direction,
                cid,
                packet_type,
                len(packet_data),
                hex_data
            )
            # 写入文件
            with open(self.log_file, 'a') as f:
                f.write(log_line)
        except Exception as e:
            print('[PacketLogger] Write error: %s' % str(e))
    def log_text(self, text, prefix='INFO'):
        """
        记录文本日志
        Args:
            text: 日志文本
            prefix: 日志前缀(INFO, WARN, ERROR等)
        """
        if not self.enable:
            return
        try:
            # 检查日期变化
            self._check_date_change()
            # 生成时间戳
            today = datetime.datetime.now()
            timestamp = today.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            # 生成日志行
            log_line = '[%s] [%s] %s\n' % (timestamp, prefix, text)
            # 写入文件
            with open(self.log_file, 'a') as f:
                f.write(log_line)
        except Exception as e:
            print('[PacketLogger] Write error: %s' % str(e))
# 全局日志实例
packet_logger = PacketLogger()
ServerPython/EventServerPY/packet_processor.py
New file
@@ -0,0 +1,173 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
数据包处理模块
负责数据包的解析、分发和处理
"""
import threading
import datetime
import time
from protocol import (
    ProtocolHead, LoginPacket, LogoutPacket, EventSendPacket, EventStrSendPacket,
    HeartBeatPacket, CMD_INTERFACE, SUB_CMD_LOGIN, SUB_CMD_LOGOUT,
    SUB_CMD_EVENT_SEND, SUB_CMD_HEARTBEAT, SUB_CMD_EVENT_STR
)
from clients_manager import handle_login, handle_logout, handle_heartbeat
from packet_logger import packet_logger
class PacketQueue:
    """数据包队列"""
    def __init__(self):
        self.queue = []
        self.condition = threading.Condition()
        self.running = True
    def put(self, cid, data):
        """放入数据包"""
        with self.condition:
            self.queue.append((cid, data))
            self.condition.notify()
    def get(self, timeout=None):
        """获取数据包"""
        with self.condition:
            while len(self.queue) == 0 and self.running:
                if timeout:
                    if not self.condition.wait(timeout):
                        return None, None
                else:
                    self.condition.wait()
            if not self.running:
                return None, None
            if len(self.queue) > 0:
                return self.queue.pop(0)
            return None, None
    def stop(self):
        """停止队列"""
        with self.condition:
            self.running = False
            self.condition.notify_all()
    def size(self):
        """获取队列大小"""
        with self.condition:
            return len(self.queue)
class PacketProcessor:
    """数据包处理器"""
    def __init__(self, file_mgr):
        self.file_mgr = file_mgr
        self.packet_queue = PacketQueue()
        self.process_thread = None
        self.running = False
    def add_packet(self, cid, data):
        """添加数据包到队列"""
        self.packet_queue.put(cid, data)
    def start(self):
        """启动处理线程"""
        if self.running:
            return
        self.running = True
        self.process_thread = threading.Thread(target=self._process_loop)
        self.process_thread.daemon = True
        self.process_thread.start()
    def stop(self):
        """停止处理线程"""
        if not self.running:
            return
        self.running = False
        self.packet_queue.stop()
        if self.process_thread:
            self.process_thread.join(timeout=5)
    def _process_loop(self):
        """处理循环"""
        while self.running:
            cid, data = self.packet_queue.get(timeout=1)
            if cid is None or data is None:
                continue
            try:
                self._process_packet(cid, data)
            except Exception as e:
                print('[PacketProcessor] Process packet error: %s' % str(e))
    def _process_packet(self, cid, data):
        """处理单个数据包"""
        if len(data) < 2:
            return
        # 解析协议头
        head = ProtocolHead(data[:2])
        # 记录原始封包日志
        packet_logger.log(cid, data, 'RAW', 'RECV')
        # 根据Cmd/SubCmd分发
        if head.cmd == CMD_INTERFACE:
            self._process_interface_packet(cid, head, data)
        else:
            print('[PacketProcessor] Unknown cmd: 0x%02X' % head.cmd)
    def _process_interface_packet(self, cid, head, data):
        """处理Interface协议数据包"""
        if head.sub_cmd == SUB_CMD_LOGIN:
            packet = LoginPacket(data)
            handle_login(cid, packet)
            packet_logger.log(cid, data, 'LOGIN', 'RECV')
        elif head.sub_cmd == SUB_CMD_LOGOUT:
            packet = LogoutPacket(data)
            handle_logout(cid, packet)
            packet_logger.log(cid, data, 'LOGOUT', 'RECV')
        elif head.sub_cmd == SUB_CMD_HEARTBEAT:
            packet = HeartBeatPacket(data)
            handle_heartbeat(cid, packet)
            packet_logger.log(cid, data, 'HEARTBEAT', 'RECV')
        elif head.sub_cmd == SUB_CMD_EVENT_SEND:
            packet = EventSendPacket(data)
            self._process_event_send(cid, packet)
            packet_logger.log(cid, data, 'EVENT', 'RECV')
        elif head.sub_cmd == SUB_CMD_EVENT_STR:
            packet = EventStrSendPacket(data)
            self._process_event_str_send(cid, packet)
            packet_logger.log(cid, data, 'EVENT_STR', 'RECV')
        else:
            print('[PacketProcessor] Unknown interface subcmd: 0x%02X' % head.sub_cmd)
    def _process_event_send(self, cid, packet):
        """处理事件发送包"""
        # 写入文件(sz_data 已经是字符串)
        event_type = packet.event_id
        self.file_mgr.write_event(cid, event_type, packet.sz_data)
    def _process_event_str_send(self, cid, packet):
        """处理字符串事件发送包"""
        # 写入文件(sz_data 已经是字符串)
        event_type = packet.sz_event_id_string
        self.file_mgr.write_event(cid, event_type, packet.sz_data)
    def get_queue_size(self):
        """获取队列大小"""
        return self.packet_queue.size()
ServerPython/EventServerPY/protocol.py
New file
@@ -0,0 +1,214 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
网络协议定义模块
定义数据包结构和协议头
"""
import struct
class ProtocolHead:
    """协议头结构"""
    def __init__(self, data=None):
        self.cmd = 0
        self.sub_cmd = 0
        if data:
            self.from_bytes(data)
    def from_bytes(self, data):
        """从字节数据解析"""
        if len(data) >= 2:
            self.cmd = ord(data[0])
            self.sub_cmd = ord(data[1])
    def to_bytes(self):
        """转换为字节数据"""
        return struct.pack('BB', self.cmd, self.sub_cmd)
    def __str__(self):
        return 'Head: (Cmd:0x%02X, SubCmd:0x%02X)' % (self.cmd, self.sub_cmd)
    def __eq__(self, other):
        return self.cmd == other.cmd and self.sub_cmd == other.sub_cmd
    def __hash__(self):
        return hash((self.cmd, self.sub_cmd))
# 协议常量
CMD_INTERFACE = 0x01
SUB_CMD_LOGIN = 0x01        # 登录
SUB_CMD_LOGOUT = 0x02       # 登出
SUB_CMD_EVENT_SEND = 0x03   # 事件发送
SUB_CMD_HEARTBEAT = 0x04    # 心跳
SUB_CMD_EVENT_STR = 0x05    # 字符串事件
CMD_MONITOR = 0x0A
SUB_CMD_OTHER_DAY_LOGIN = 0x01  # 非同天登录
SUB_CMD_PLAYER_LOGIN = 0x02     # 玩家登录
SUB_CMD_LOGIN_VALID = 0x03      # 有效登录
SUB_CMD_FIRST_LOGIN = 0x04      # 首次登录
LOGIN_MAGIC_CODE = 0x34128621
class PacketParser:
    """数据包解析器"""
    @staticmethod
    def read_byte(data, pos):
        """读取字节"""
        if pos >= len(data):
            raise Exception('read_byte: pos out of range')
        return ord(data[pos]), pos + 1
    @staticmethod
    def read_uint32(data, pos):
        """读取32位无符号整数"""
        if pos + 4 > len(data):
            raise Exception('read_uint32: pos out of range')
        value = struct.unpack('<I', data[pos:pos+4])[0]
        return value, pos + 4
    @staticmethod
    def read_int64(data, pos):
        """读取64位整数"""
        if pos + 8 > len(data):
            raise Exception('read_int64: pos out of range')
        value = struct.unpack('<q', data[pos:pos+8])[0]
        return value, pos + 8
    @staticmethod
    def read_string(data, pos, length):
        """读取字符串"""
        if pos + length > len(data):
            raise Exception('read_string: pos out of range')
        return data[pos:pos+length], pos + length
    @staticmethod
    def read_var_string(data, pos):
        """读取变长字符串 (长度前缀)"""
        length, pos = PacketParser.read_byte(data, pos)
        return PacketParser.read_string(data, pos, length)
class LoginPacket:
    """登录数据包"""
    def __init__(self, data=None):
        self.head = ProtocolHead()
        self.group_id = 0
        self.server_id = 0
        self.magic_code = 0
        if data:
            self.from_bytes(data)
    def from_bytes(self, data):
        pos = 0
        self.head = ProtocolHead(data[pos:pos+2])
        pos += 2
        self.group_id, pos = PacketParser.read_uint32(data, pos)
        self.server_id, pos = PacketParser.read_uint32(data, pos)
        self.magic_code, pos = PacketParser.read_uint32(data, pos)
class LogoutPacket:
    """登出数据包"""
    def __init__(self, data=None):
        self.head = ProtocolHead()
        self.reserve = 0
        if data:
            self.from_bytes(data)
    def from_bytes(self, data):
        pos = 0
        self.head = ProtocolHead(data[pos:pos+2])
        pos += 2
        self.reserve, pos = PacketParser.read_byte(data, pos)
class EventSendPacket:
    """事件发送数据包"""
    def __init__(self, data=None):
        self.head = ProtocolHead()
        self.event_id = 0
        self.data_len = 0
        self.sz_data = ''
        self.time = 0
        self.data = ''
        if data:
            self.from_bytes(data)
    def from_bytes(self, data):
        pos = 0
        self.head = ProtocolHead(data[pos:pos+2])
        pos += 2
        self.event_id, pos = PacketParser.read_uint32(data, pos)
        self.data_len, pos = PacketParser.read_uint32(data, pos)
        self.sz_data, pos = PacketParser.read_string(data, pos, self.data_len)
        self.time, pos = PacketParser.read_int64(data, pos)
        self.data = data[pos:]
class EventStrSendPacket:
    """字符串事件发送数据包"""
    def __init__(self, data=None):
        self.head = ProtocolHead()
        self.event_id_string_len = 0
        self.sz_event_id_string = ''
        self.data_len = 0
        self.sz_data = ''
        self.ext_len = 0
        self.ext = []
        self.data = ''
        if data:
            self.from_bytes(data)
    def from_bytes(self, data):
        pos = 0
        self.head = ProtocolHead(data[pos:pos+2])
        pos += 2
        self.event_id_string_len, pos = PacketParser.read_byte(data, pos)
        self.sz_event_id_string, pos = PacketParser.read_string(data, pos, self.event_id_string_len)
        self.data_len, pos = PacketParser.read_uint32(data, pos)
        self.sz_data, pos = PacketParser.read_string(data, pos, self.data_len)
        self.ext_len, pos = PacketParser.read_byte(data, pos)
        self.ext = []
        for i in range(self.ext_len):
            ext_value, pos = PacketParser.read_uint32(data, pos)
            self.ext.append(ext_value)
        self.data = data[pos:]
class HeartBeatPacket:
    """心跳数据包"""
    def __init__(self, data=None):
        self.head = ProtocolHead()
        self.time = 0
        if data:
            self.from_bytes(data)
    def from_bytes(self, data):
        pos = 0
        self.head = ProtocolHead(data[pos:pos+2])
        pos += 2
        self.time, pos = PacketParser.read_uint32(data, pos)
ServerPython/EventServerPY/server.py
New file
@@ -0,0 +1,264 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
EventServer主服务器模块
实现TCP服务器,接收游戏客户端事件并记录到本地文件
"""
import sys
import socket
import select
import threading
import datetime
from config import ConfigReader
from clients_manager import ClientsMgr
from file_manager import FileMgr
from packet_processor import PacketProcessor
from packet_logger import packet_logger
class Connection:
    """连接封装类"""
    def __init__(self, sock, addr, cid, packet_processor):
        self.sock = sock
        self.addr = addr
        self.cid = cid
        self.packet_processor = packet_processor
        self.running = True
        self.recv_buffer = ''
        self.lock = threading.Lock()
        # 创建接收线程
        self.recv_thread = threading.Thread(target=self._recv_loop)
        self.recv_thread.daemon = True
        self.recv_thread.start()
    def _recv_loop(self):
        """接收数据循环"""
        while self.running:
            try:
                # 检查socket是否有数据可读
                readable, _, _ = select.select([self.sock], [], [], 1.0)
                if readable:
                    # 接收缓冲区大小: 建议至少是最大包长度的2倍,避免频繁recv调用
                    # 最大包 = 4(header) + 8192(payload) = 8196 字节
                    # 使用32768可以容纳3-4个完整包,提高效率
                    data = self.sock.recv(32768)
                    if not data:
                        # 连接断开
                        break
                    # 累积数据到缓冲区
                    with self.lock:
                        self.recv_buffer += data
                        # 尝试解析完整的数据包
                        while self._try_parse_packet():
                            pass
            except socket.error as e:
                print('[Connection] Socket error: %s' % str(e))
                break
            except Exception as e:
                print('[Connection] Error: %s' % str(e))
                break
        self.close()
    def _try_parse_packet(self):
        """尝试解析数据包"""
        import struct
        # PacketHeader: FF CC (2字节) + Length (2字节) = 4字节
        PACKET_HEADER_SIZE = 4
        PACKET_HEADER_MC = 0xCCFF
        MAX_PACKET_LEN = 8192
        if len(self.recv_buffer) < PACKET_HEADER_SIZE:
            return False
        # 检查 MagicCode (FF CC)
        magic_code = struct.unpack('<H', self.recv_buffer[0:2])[0]
        if magic_code != PACKET_HEADER_MC:
            # 整个缓冲区数据无效,清空等待新数据
            packet_logger.log_text('Invalid magic code: 0x%04X, clear buffer data: %s' % (magic_code, self.recv_buffer), 'WARN')
            self.recv_buffer = ''
            return False
        # 读取包长度(不包含PacketHeader本身)
        packet_len = struct.unpack('<H', self.recv_buffer[2:4])[0]
        # 检查包长度是否合理
        if packet_len > MAX_PACKET_LEN:
            # 整个缓冲区数据无效,清空等待新数据
            packet_logger.log_text('Packet length %d exceeds max %d, clear buffer. Buffer data: %s' % (packet_len, MAX_PACKET_LEN, self.recv_buffer), 'WARN')
            self.recv_buffer = ''
            return False
        # 计算总长度(包含PacketHeader)
        total_len = PACKET_HEADER_SIZE + packet_len
        if len(self.recv_buffer) < total_len:
            # 数据不完整,等待更多数据
            return False
        # 提取协议数据(跳过4字节PacketHeader,只传Cmd/SubCmd及之后的数据)
        packet_data = self.recv_buffer[PACKET_HEADER_SIZE:total_len]
        self.recv_buffer = self.recv_buffer[total_len:]
        # 交给包处理器处理
        self.packet_processor.add_packet(self.cid, packet_data)
        return True
    def send(self, data):
        """发送数据"""
        try:
            self.sock.sendall(data)
            return True
        except Exception as e:
            print('[Connection] Send error: %s' % str(e))
            return False
    def close(self):
        """关闭连接"""
        if self.running:
            self.running = False
        if self.sock:
            try:
                self.sock.close()
            except:
                pass
            self.sock = None
        # 通知客户端管理器
        ClientsMgr.instance().on_client_disconnect(self.cid)
class EventServer:
    """事件服务器"""
    def __init__(self):
        self.config = ConfigReader.instance()
        self.file_mgr = FileMgr(self.config)
        self.packet_processor = PacketProcessor(self.file_mgr)
        self.server_sock = None
        self.connections = {}  # cid -> Connection
        self.next_cid = 1
        self.lock = threading.Lock()
        self.running = False
        self.accept_thread = None
    def start(self):
        """启动服务器"""
        port = self.config.get_listen_port()
        # 创建服务器socket
        self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        try:
            self.server_sock.bind(('0.0.0.0', port))
            self.server_sock.listen(5)
            print('[EventServer] Server started on port %d' % port)
            print('[EventServer] Log path: %s' % self.config.get_log_file_path())
            print('[EventServer] Write mode: %d' % self.config.get_write_mode())
            print('')
            # 启动包处理器
            self.packet_processor.start()
            # 启动接收线程
            self.running = True
            self.accept_thread = threading.Thread(target=self._accept_loop)
            self.accept_thread.daemon = True
            self.accept_thread.start()
            return True
        except Exception as e:
            print('[EventServer] Failed to start server: %s' % str(e))
            self.server_sock.close()
            return False
    def _accept_loop(self):
        """接受连接循环"""
        while self.running:
            try:
                # 检查是否有新的连接
                readable, _, _ = select.select([self.server_sock], [], [], 1.0)
                if readable:
                    client_sock, addr = self.server_sock.accept()
                    with self.lock:
                        cid = self.next_cid
                        self.next_cid += 1
                    connection = Connection(client_sock, addr, cid, self.packet_processor)
                    with self.lock:
                        self.connections[cid] = connection
                    print('[EventServer] New connection %d from %s:%d' % (cid, addr[0], addr[1]))
            except socket.error as e:
                if self.running:
                    print('[EventServer] Accept error: %s' % str(e))
                break
            except Exception as e:
                print('[EventServer] Error: %s' % str(e))
                break
    def get_connection_count(self):
        """获取连接数"""
        with self.lock:
            return len(self.connections)
    def get_queue_size(self):
        """获取包队列大小"""
        return self.packet_processor.get_queue_size()
    def get_success_count(self):
        """获取成功写入次数"""
        return self.file_mgr.get_success_count()
    def get_fail_count(self):
        """获取失败写入次数"""
        return self.file_mgr.get_fail_count()
    def stop(self):
        """停止服务器"""
        if not self.running:
            return
        print('[EventServer] Stopping server...')
        self.running = False
        # 关闭所有连接
        with self.lock:
            for conn in self.connections.values():
                conn.close()
            self.connections.clear()
        # 停止包处理器
        self.packet_processor.stop()
        # 关闭服务器socket
        if self.server_sock:
            try:
                self.server_sock.close()
            except:
                pass
        # 关闭所有文件
        self.file_mgr.close_all()
        print('[EventServer] Server stopped')