array(), "broadcastTime" => 0 ); } do { $copy_sockets = $all_sockets; // 单独拷贝一份 // 因为客户端是长连接,如果客户端非正常断开,服务端会在 socket_accept 阻塞,现在使用 select 非阻塞模式 socket $num_changed_sockets = socket_select($copy_sockets, $write, $except, 0); if ($num_changed_sockets === false) { /* 错误处理 */ LogConsole("sosket_select error: " . socket_strerror(socket_last_error()), true); continue; } else if ($num_changed_sockets > 0) { /*至少有一个套接字发生改变 */ } // echo "all_sockets count:", count($all_sockets), " num_changed_sockets:", $num_changed_sockets, PHP_EOL; // 接收第一次 socket 连入,连入后移除服务端 socket if (in_array($master, $copy_sockets)) { $client = socket_accept($master); if ($client) { $buf = socket_read($client, 1024); // echo $buf; // 匹配 Sec-Websocket-Key 标识 if (preg_match("/Sec-WebSocket-Key: (.*)\r\n/i", $buf, $match)) { // 需要将 Sec-WebSocket-Key 值累加字符串,并依次进行 SHA-1 加密和 base64 加密 $key = base64_encode(sha1($match[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)); // 拼凑响应内容 $res = "HTTP/1.1 101 Switching Protocol" . PHP_EOL . "Upgrade: WebSocket" . PHP_EOL . "Connection: Upgrade" . PHP_EOL . "WebSocket-Location: ws://" . $address . ":" . $port . PHP_EOL . "Sec-WebSocket-Accept: " . $key . PHP_EOL . PHP_EOL; // 注意这里,需要两个换行 // 向客户端应答 Sec-WebSocket-Accept socket_write($client, $res, strlen($res)); $key = uniqid(rand(1000, 9999), true); // 加入客户端 socket $all_sockets[$key] = $client; LogConsole("new client key:" . $key . " now count:" . (count($all_sockets) - 1)); } // 移除服务端 socket $key = array_search($master, $copy_sockets); unset($copy_sockets[$key]); } } // 循环所有客户端 sockets foreach ($copy_sockets as $key => $s) { // 获取客户端发给服务端的内容 $buf = socket_read($s, 8024); // echo strlen($buf), '---', PHP_EOL; // 代表客户端主动关闭 if (strlen($buf) < 9) { $key = array_search($s, $all_sockets); unset($all_sockets[$key]); LogConsole("client close key:" . $key . " remain client count:" . (count($all_sockets) - 1)); foreach ($channelInfo as $spid => $info) { $clients = $info["clients"]; if (array_key_exists($key, $clients)) { unset($clients[$key]); $channelInfo[$spid]["clients"] = $clients; LogConsole(" remove from spid:" . $spid . " key:" . $key . " spid clients count:" . count($channelInfo[$spid]["clients"])); break; } } socket_close($s); continue; } $data = message($buf); $data = json_decode($data, true); if (!isset($data)) { send($s, "HEARTBEAT", "hello"); continue; } if ($data["MsgType"] == "SPID") { $spid = $data["Msg"]; if (!in_array($spid, $ChannelList)) { continue; } $clients = $channelInfo[$spid]["clients"]; $clients[$key] = $s; $channelInfo[$spid]["clients"] = $clients; LogConsole("update spid client spid:" . $spid . " key:" . $key . " spid clients count:" . count($channelInfo[$spid]["clients"])); // 同步最近历史聊天,待扩展,有需要再说 } $curSpid = ""; foreach ($channelInfo as $spid => $info) { if (array_key_exists($key, $info["clients"])) { $curSpid = $spid; break; } } if ($curSpid && broadcastChat($curSpid)) { continue; } send($s, "HEARTBEAT", "hello"); } // 广播最新消息 } while (true); socket_close($master); function broadcastChat($spid) { global $channelInfo, $GameName; $info = $channelInfo[$spid]; $broadcastTime = $info["broadcastTime"]; $curTime = time(); // 1 秒最多广播一次 if (($curTime - $broadcastTime) < 1) { return; } $channelInfo[$spid]["broadcastTime"] = $curTime; if (count($info["clients"]) <= 0) { return; } $redisKey = \RedisOper\GetFCRedisKey($GameName, "Chatmonitor", $spid); if (!\RedisOper\ListRange($redisKey, $retArray, 0, -1)) { return; } if (!isset($retArray) || count($retArray) <= 0) { return; } \RedisOper\DelKey($redisKey); LogConsole("broadcast spid:" . $spid . " clients count:" . count($info["clients"]) . " chatCount:" . count($retArray)); sendAll($info["clients"], "CHAT", $retArray); return true; } function LogConsole($msg, $isErr = false) { global $logDate; if ($logDate != date("Y-m-d")) { $logDate = date("Y-m-d"); echo "new logDate", $logDate, PHP_EOL; \Logging\CreateLogging("chatmonitorserver.php", true); } echo date("Y-m-d H:i:s"), " ", $msg, PHP_EOL; if ($isErr) { \Logging\LogError($msg); } else { \Logging\LogInfo($msg); } } /** * 解析接收数据 * @param $buffer * @return null|string */ function message($buffer) { $len = $masks = $data = $decoded = null; $len = ord($buffer[1]) & 127; if ($len === 126) { $masks = substr($buffer, 4, 4); $data = substr($buffer, 8); } else if ($len === 127) { $masks = substr($buffer, 10, 4); $data = substr($buffer, 14); } else { $masks = substr($buffer, 2, 4); $data = substr($buffer, 6); } for ($index = 0; $index < strlen($data); $index++) { $decoded .= $data[$index] ^ $masks[$index % 4]; } return $decoded; } /** * 消息广播 */ function sendAll($clients, $msgType, $msg) { global $all_sockets, $master; $sendData = getsenddata($msgType, $msg); foreach ($clients as $key => $so) { if ($so != $master && array_key_exists($key, $all_sockets)) { socket_write($so, $sendData, strlen($sendData)); } } } /** * 发送数据 */ function send($client, $msgType, $msg) { $sendData = getsenddata($msg, $msgType, $msg); socket_write($client, $sendData, strlen($sendData)); } function getsenddata($msgType, $msg) { // HEARTBEAT MSG CHAT $msg = array("MsgType" => $msgType, "Msg" => $msg); $msg = json_encode($msg) . "#end#"; $msg = frame($msg); return $msg; } /** * 处理数据帧 * * @param [type] $s */ function frame($s) { $a = str_split($s, 125); if (count($a) == 1) { return "\x81" . chr(strlen($a[0])) . $a[0]; } $ns = ""; foreach ($a as $o) { $ns .= "\x81" . chr(strlen($o)) . $o; } return $ns; }