hxp
2025-06-04 f4a514d5ac952110da846636ecbb9de951eaf3d2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
<?php
include_once "/Common/Logging.php";
include_once '/db/RedisOper.php';
 
\Logging\CreateLogging("chatmonitorserver.php");
 
$interfaceConfig = parse_ini_file("/InterfaceConfig.php", true);
$GameName = $interfaceConfig["ServerInfo"]["GameName"];
$ChannelList = explode(",", $interfaceConfig["ServerInfo"]["ChannelList"]);
$address = $interfaceConfig["Chatmonitor"]["SocketHost"];
$port = $interfaceConfig["Chatmonitor"]["SocketPort"];
 
$logDate = date("Y-m-d");
LogConsole("");
LogConsole("GameName:" . $GameName);
LogConsole("ChannelList:" . json_encode($ChannelList));
 
if (($master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) == false) {
    LogConsole("socket_create() failed: reason: " . socket_strerror(socket_last_error()), true);
}
if (($ret = socket_bind($master, $address, $port)) == false) {
    LogConsole("socket_bind() failed: reason: " . socket_strerror(socket_last_error($master)), true);
}
if (($ret = socket_listen($master, 3)) == false) {
    LogConsole("socket_listen() failed: reason: " . socket_strerror(socket_last_error($master)), true);
}
 
LogConsole("chatmonitor server start ok. " . $address . ":" . $port);
 
// $master = null;  //socket的resource,即前期初始化socket时返回的socket资源
$all_sockets = [$master];    // socket 集合
$channelInfo = array();  // 渠道对应socket信息
foreach ($ChannelList as $spid) {
    $channelInfo[$spid] = array(
        "clients" => array(),
        "broadcastTime" => 0
    );
}
 
do {
    $copy_sockets = $all_sockets;   // 单独拷贝一份
 
    // 因为客户端是长连接,如果客户端非正常断开,服务端会在 socket_accept 阻塞,现在使用 select 非阻塞模式 socket
    $num_changed_sockets = socket_select($copy_sockets, $write, $except, 0);
    if ($num_changed_sockets === false) {
        /* 错误处理 */
        LogConsole("sosket_select error: " . socket_strerror(socket_last_error()), true);
        continue;
    } else if ($num_changed_sockets > 0) {
        /*至少有一个套接字发生改变 */
    }
    // echo "all_sockets count:", count($all_sockets), " num_changed_sockets:", $num_changed_sockets, PHP_EOL;
 
    // 接收第一次 socket 连入,连入后移除服务端 socket
    if (in_array($master, $copy_sockets)) {
        $client = socket_accept($master);
        if ($client) {
            $buf = socket_read($client, 1024);
            // echo $buf;
 
            // 匹配 Sec-Websocket-Key 标识
            if (preg_match("/Sec-WebSocket-Key: (.*)\r\n/i", $buf, $match)) {
                // 需要将 Sec-WebSocket-Key 值累加字符串,并依次进行 SHA-1 加密和 base64 加密
                $key = base64_encode(sha1($match[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
                // 拼凑响应内容
                $res = "HTTP/1.1 101 Switching Protocol" . PHP_EOL
                    . "Upgrade: WebSocket" . PHP_EOL
                    . "Connection: Upgrade" . PHP_EOL
                    . "WebSocket-Location: ws://" . $address . ":" . $port . PHP_EOL
                    . "Sec-WebSocket-Accept: " . $key . PHP_EOL . PHP_EOL;  // 注意这里,需要两个换行
                // 向客户端应答 Sec-WebSocket-Accept
                socket_write($client, $res, strlen($res));
 
                $key = uniqid(rand(1000, 9999), true);
 
                // 加入客户端 socket
                $all_sockets[$key] = $client;
                LogConsole("new client key:" . $key . " now count:" . (count($all_sockets) - 1));
            }
            // 移除服务端 socket
            $key = array_search($master, $copy_sockets);
            unset($copy_sockets[$key]);
        }
    }
 
    // 循环所有客户端 sockets
    foreach ($copy_sockets as $key => $s) {
        // 获取客户端发给服务端的内容
        $buf = socket_read($s, 8024);
        // echo strlen($buf), '---', PHP_EOL;
        // 代表客户端主动关闭
        if (strlen($buf) < 9) {
            $key = array_search($s, $all_sockets);
            unset($all_sockets[$key]);
            LogConsole("client close key:" . $key . " remain client count:" . (count($all_sockets) - 1));
            foreach ($channelInfo as $spid => $info) {
                $clients = $info["clients"];
                if (array_key_exists($key, $clients)) {
                    unset($clients[$key]);
                    $channelInfo[$spid]["clients"] = $clients;
                    LogConsole("    remove from spid:" . $spid . " key:" . $key . " spid clients count:" . count($channelInfo[$spid]["clients"]));
                    break;
                }
            }
            socket_close($s);
            continue;
        }
 
        $data = message($buf);
        $data = json_decode($data, true);
        if (!isset($data)) {
            send($s, "HEARTBEAT", "hello");
            continue;
        }
 
        if ($data["MsgType"] == "SPID") {
            $spid = $data["Msg"];
            if (!in_array($spid, $ChannelList)) {
                continue;
            }
            $clients = $channelInfo[$spid]["clients"];
            $clients[$key] = $s;
            $channelInfo[$spid]["clients"] = $clients;
            LogConsole("update spid client spid:" . $spid . " key:" . $key . " spid clients count:" . count($channelInfo[$spid]["clients"]));
            // 同步最近历史聊天,待扩展,有需要再说
        }
 
        $curSpid = "";
        foreach ($channelInfo as $spid => $info) {
            if (array_key_exists($key, $info["clients"])) {
                $curSpid = $spid;
                break;
            }
        }
        if ($curSpid && broadcastChat($curSpid)) {
            continue;
        }
        send($s, "HEARTBEAT", "hello");
    }
 
    // 广播最新消息
 
} while (true);
socket_close($master);
 
function broadcastChat($spid)
{
    global $channelInfo, $GameName;
 
    $info = $channelInfo[$spid];
    $broadcastTime = $info["broadcastTime"];
 
    $curTime = time();
    // 1 秒最多广播一次
    if (($curTime - $broadcastTime) < 1) {
        return;
    }
    $channelInfo[$spid]["broadcastTime"] = $curTime;
    if (count($info["clients"]) <= 0) {
        return;
    }
 
    $redisKey = \RedisOper\GetFCRedisKey($GameName, "Chatmonitor", $spid);
    if (!\RedisOper\ListRange($redisKey, $retArray, 0, -1)) {
        return;
    }
    if (!isset($retArray) || count($retArray) <= 0) {
        return;
    }
    \RedisOper\DelKey($redisKey);
    LogConsole("broadcast spid:" . $spid . " clients count:" . count($info["clients"]) . " chatCount:" . count($retArray));
    sendAll($info["clients"], "CHAT", $retArray);
    return true;
}
 
function LogConsole($msg, $isErr = false)
{
    global $logDate;
    if ($logDate != date("Y-m-d")) {
        $logDate = date("Y-m-d");
        echo "new logDate", $logDate, PHP_EOL;
        \Logging\CreateLogging("chatmonitorserver.php", true);
    }
    echo date("Y-m-d H:i:s"), " ", $msg, PHP_EOL;
    if ($isErr) {
        \Logging\LogError($msg);
    } else {
        \Logging\LogInfo($msg);
    }
}
 
/**
 * 解析接收数据
 * @param $buffer
 * @return null|string
 */
function message($buffer)
{
    $len = $masks = $data = $decoded = null;
    $len = ord($buffer[1]) & 127;
    if ($len === 126) {
        $masks = substr($buffer, 4, 4);
        $data = substr($buffer, 8);
    } else if ($len === 127) {
        $masks = substr($buffer, 10, 4);
        $data = substr($buffer, 14);
    } else {
        $masks = substr($buffer, 2, 4);
        $data = substr($buffer, 6);
    }
    for ($index = 0; $index < strlen($data); $index++) {
        $decoded .= $data[$index] ^ $masks[$index % 4];
    }
    return $decoded;
}
 
/**
 * 消息广播
 */
function sendAll($clients, $msgType, $msg)
{
    global $all_sockets, $master;
    $sendData = getsenddata($msgType, $msg);
    foreach ($clients as $key => $so) {
        if ($so != $master && array_key_exists($key, $all_sockets)) {
            socket_write($so, $sendData, strlen($sendData));
        }
    }
}
 
/**
 * 发送数据
 */
function send($client, $msgType, $msg)
{
    $sendData = getsenddata($msg, $msgType, $msg);
    socket_write($client, $sendData, strlen($sendData));
}
 
function getsenddata($msgType, $msg)
{
    // HEARTBEAT MSG CHAT
    $msg = array("MsgType" => $msgType, "Msg" => $msg);
    $msg = json_encode($msg) . "#end#";
    $msg = frame($msg);
    return $msg;
}
 
/**
 * 处理数据帧
 *
 * @param [type] $s
 */
function frame($s)
{
    $a = str_split($s, 125);
    if (count($a) == 1) {
        return "\x81" . chr(strlen($a[0])) . $a[0];
    }
    $ns = "";
    foreach ($a as $o) {
        $ns .= "\x81" . chr(strlen($o)) . $o;
    }
    return $ns;
}