基于 pcntl 多进程扩展的 WebSocket 服务端代码,支持并发处理多个客户端连接
服务端 ws_multi_process.php
<?php
// 需启用 sockets + pcntl 扩展(php.ini 取消对应注释)
if (!extension_loaded('sockets') || !extension_loaded('pcntl')) {
die('请启用 sockets 和 pcntl 扩展');
}
// 服务配置
$host = '0.0.0.0';
$port = 8080;
$workerNum = 4; // 工作进程数(建议等于 CPU 核心数)
$connections = []; // 存储所有客户端连接(主进程共享)
// 1. 创建主监听套接字(主进程持有)
$mainSocket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_set_option($mainSocket, SOL_SOCKET, SO_REUSEADDR, 1);
socket_bind($mainSocket, $host, $port);
socket_listen($mainSocket, 1024); // 监听队列大小
echo "多进程 WebSocket 服务启动:ws://{$host}:{$port},工作进程数:{$workerNum}\n";
// 2. 创建指定数量的工作进程
for ($i = 0; $i < $workerNum; $i++) {
$pid = pcntl_fork();
if ($pid === -1) {
die('创建进程失败');
} elseif ($pid === 0) {
// 子进程:处理客户端连接
workerProcess($mainSocket);
exit(); // 子进程处理完后退出
}
}
// 3. 主进程:监听子进程退出信号,重启异常退出的进程
while (true) {
$pid = pcntl_wait($status, WNOHANG);
if ($pid > 0) {
echo "工作进程 {$pid} 退出,重启中...\n";
$newPid = pcntl_fork();
if ($newPid === 0) {
workerProcess($mainSocket);
exit();
}
}
usleep(100000); // 100ms 轮询一次
}
// 4. 工作进程逻辑:接收连接 + 处理消息
function workerProcess($mainSocket) {
global $connections;
$workerPid = getmypid();
echo "工作进程 {$workerPid} 启动\n";
while (true) {
// 监听主套接字(非阻塞)
$read = [$mainSocket];
$write = [];
$except = [];
socket_select($read, $write, $except, 0);
if (!empty($read)) {
// 接收新客户端连接
$clientSocket = socket_accept($mainSocket);
if ($clientSocket) {
$clientId = (int)$clientSocket;
$connections[$clientId] = $clientSocket;
echo "进程 {$workerPid} 新增连接:{$clientId}\n";
// 处理握手(带 Token 认证)
handshake($clientSocket);
}
}
// 处理已连接客户端的消息
if (!empty($connections)) {
$readClients = $connections;
socket_select($readClients, $write, $except, 0);
foreach ($readClients as $clientSocket) {
$data = socket_read($clientSocket, 1024);
if (!$data) {
// 客户端断开连接
$clientId = (int)$clientSocket;
unset($connections[$clientId]);
socket_close($clientSocket);
echo "进程 {$workerPid} 断开连接:{$clientId}\n";
continue;
}
// 解析消息并回复
$msg = parseWebSocketFrame($data);
echo "进程 {$workerPid} 收到消息:{$msg}\n";
sendWebSocketFrame($clientSocket, "服务端(进程 {$workerPid})已接收:{$msg}");
}
}
usleep(10000); // 降低 CPU 占用
}
}
// 5. WebSocket 握手 + Token 认证
function handshake($clientSocket) {
$request = socket_read($clientSocket, 1024);
// 解析 Token 并校验(实际项目替换为你的校验逻辑)
preg_match('#GET /\?token=(.*) HTTP/1.1#', $request, $tokenMatches);
$token = $tokenMatches[1] ?? '';
if (!$token || $token !== 'valid_token_123') { // 示例:仅允许 valid_token_123
socket_write($clientSocket, "HTTP/1.1 403 Forbidden\r\n\r\n");
socket_close($clientSocket);
return false;
}
// 握手响应
preg_match('#Sec-WebSocket-Key: (.*)\r\n#', $request, $keyMatches);
$key = base64_encode(sha1($keyMatches[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
$response = "HTTP/1.1 101 Switching Protocols\r\n";
$response .= "Upgrade: websocket\r\n";
$response .= "Connection: Upgrade\r\n";
$response .= "Sec-WebSocket-Accept: {$key}\r\n\r\n";
socket_write($clientSocket, $response);
return true;
}
// 6. 解析 WebSocket 帧
function parseWebSocketFrame($data) {
$length = ord($data[1]) & 127;
$mask = substr($data, 2, 4);
$payload = substr($data, 6, $length);
$unmasked = '';
for ($i = 0; $i < $length; $i++) {
$unmasked .= $payload[$i] ^ $mask[$i % 4];
}
return $unmasked;
}
// 7. 发送 WebSocket 帧
function sendWebSocketFrame($clientSocket, $msg) {
$len = strlen($msg);
$frame = chr(0x81) . ($len <= 125 ? chr($len) : chr(126) . pack('n', $len)) . $msg;
socket_write($clientSocket, $frame);
}
?>
核心优化点
1. 多进程并发:通过 pcntl_fork 创建多个工作进程,同时处理多个客户端连接,突破单进程瓶颈。
2. 连接管理:主进程持有监听套接字,子进程共享连接列表,实现负载分担。
3. 信号监听:主进程自动重启异常退出的子进程,保证服务稳定性。
4. 保留认证:集成 Token 校验(客户端需带 ?token=valid_token_123 连接)。
运行
启用扩展 extension=sockets, extension=pcntl
ulimit -n 65535 # 临时提高文件描述符上限 echo 100000 > /proc/sys/net/core/somaxconn # 提高监听队列大小 php ws_multi_process.php # 启动服务
客户端
const ws = new WebSocket('ws://localhost:8080?token=valid_token_123');
ws.onopen = () => ws.send('多进程测试');
ws.onmessage = (e) => console.log('服务端回复:', e.data);
配置为服务
sudo vim /etc/systemd/system/websocket-service.service
websocket-service.service
[Unit] Description=多进程 WebSocket 服务(基于 pcntl+sockets) After=network.target Documentation=无 [Service] Type=simple # 核心:替换为你的 PHP 路径和脚本绝对路径(必填) ExecStart=/usr/bin/php /opt/websocket/ws_multi_process.php # 运行用户(建议非 root,如 www-data 或自定义用户) User=www-data # 工作目录(脚本所在目录,必填) WorkingDirectory=/opt/websocket # 自动重启(崩溃、退出均重启,契合脚本自身进程守护) Restart=always # 重启延迟(避免频繁重启) RestartSec=3 # 提高文件描述符上限(支持更多连接) LimitNOFILE=65535 [Install] WantedBy=multi-user.target
启用
# 重载配置使服务生效 sudo systemctl daemon-reload # 设为开机自启 sudo systemctl enable websocket-service # 启动服务 sudo systemctl start websocket-service # 查看运行状态(验证是否成功) sudo systemctl status websocket-service