首页 » 前端 » Javascript » 正文

基于 pcntl 多进程扩展的 WebSocket 服务端代码,支持并发处理多个客户端连接

发布者:站点默认
2011/04/30 浏览数(2,067) 分类:Javascript, PHP 基于 pcntl 多进程扩展的 WebSocket 服务端代码,支持并发处理多个客户端连接已关闭评论

服务端 ws_multi_process.php

<?php
// 需启用 sockets + pcntl 扩展(php.ini 取消对应注释)
if (!extension_loaded('sockets') || !extension_loaded('pcntl')) {
    die('请启用 sockets 和 pcntl 扩展');
}

// 服务配置
$host = '0.0.0.0';
$port = 8080;
$workerNum = 4; // 工作进程数(建议等于 CPU 核心数)
$connections = []; // 存储所有客户端连接(主进程共享)

// 1. 创建主监听套接字(主进程持有)
$mainSocket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_set_option($mainSocket, SOL_SOCKET, SO_REUSEADDR, 1);
socket_bind($mainSocket, $host, $port);
socket_listen($mainSocket, 1024); // 监听队列大小
echo "多进程 WebSocket 服务启动:ws://{$host}:{$port},工作进程数:{$workerNum}\n";

// 2. 创建指定数量的工作进程
for ($i = 0; $i < $workerNum; $i++) {
    $pid = pcntl_fork();
    if ($pid === -1) {
        die('创建进程失败');
    } elseif ($pid === 0) {
        // 子进程:处理客户端连接
        workerProcess($mainSocket);
        exit(); // 子进程处理完后退出
    }
}

// 3. 主进程:监听子进程退出信号,重启异常退出的进程
while (true) {
    $pid = pcntl_wait($status, WNOHANG);
    if ($pid > 0) {
        echo "工作进程 {$pid} 退出,重启中...\n";
        $newPid = pcntl_fork();
        if ($newPid === 0) {
            workerProcess($mainSocket);
            exit();
        }
    }
    usleep(100000); // 100ms 轮询一次
}

// 4. 工作进程逻辑:接收连接 + 处理消息
function workerProcess($mainSocket) {
    global $connections;
    $workerPid = getmypid();
    echo "工作进程 {$workerPid} 启动\n";

    while (true) {
        // 监听主套接字(非阻塞)
        $read = [$mainSocket];
        $write = [];
        $except = [];
        socket_select($read, $write, $except, 0);

        if (!empty($read)) {
            // 接收新客户端连接
            $clientSocket = socket_accept($mainSocket);
            if ($clientSocket) {
                $clientId = (int)$clientSocket;
                $connections[$clientId] = $clientSocket;
                echo "进程 {$workerPid} 新增连接:{$clientId}\n";

                // 处理握手(带 Token 认证)
                handshake($clientSocket);
            }
        }

        // 处理已连接客户端的消息
        if (!empty($connections)) {
            $readClients = $connections;
            socket_select($readClients, $write, $except, 0);
            foreach ($readClients as $clientSocket) {
                $data = socket_read($clientSocket, 1024);
                if (!$data) {
                    // 客户端断开连接
                    $clientId = (int)$clientSocket;
                    unset($connections[$clientId]);
                    socket_close($clientSocket);
                    echo "进程 {$workerPid} 断开连接:{$clientId}\n";
                    continue;
                }

                // 解析消息并回复
                $msg = parseWebSocketFrame($data);
                echo "进程 {$workerPid} 收到消息:{$msg}\n";
                sendWebSocketFrame($clientSocket, "服务端(进程 {$workerPid})已接收:{$msg}");
            }
        }

        usleep(10000); // 降低 CPU 占用
    }
}

// 5. WebSocket 握手 + Token 认证
function handshake($clientSocket) {
    $request = socket_read($clientSocket, 1024);
    // 解析 Token 并校验(实际项目替换为你的校验逻辑)
    preg_match('#GET /\?token=(.*) HTTP/1.1#', $request, $tokenMatches);
    $token = $tokenMatches[1] ?? '';
    if (!$token || $token !== 'valid_token_123') { // 示例:仅允许 valid_token_123
        socket_write($clientSocket, "HTTP/1.1 403 Forbidden\r\n\r\n");
        socket_close($clientSocket);
        return false;
    }

    // 握手响应
    preg_match('#Sec-WebSocket-Key: (.*)\r\n#', $request, $keyMatches);
    $key = base64_encode(sha1($keyMatches[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
    $response = "HTTP/1.1 101 Switching Protocols\r\n";
    $response .= "Upgrade: websocket\r\n";
    $response .= "Connection: Upgrade\r\n";
    $response .= "Sec-WebSocket-Accept: {$key}\r\n\r\n";
    socket_write($clientSocket, $response);
    return true;
}

// 6. 解析 WebSocket 帧
function parseWebSocketFrame($data) {
    $length = ord($data[1]) & 127;
    $mask = substr($data, 2, 4);
    $payload = substr($data, 6, $length);
    $unmasked = '';
    for ($i = 0; $i < $length; $i++) {
        $unmasked .= $payload[$i] ^ $mask[$i % 4];
    }
    return $unmasked;
}

// 7. 发送 WebSocket 帧
function sendWebSocketFrame($clientSocket, $msg) {
    $len = strlen($msg);
    $frame = chr(0x81) . ($len <= 125 ? chr($len) : chr(126) . pack('n', $len)) . $msg;
    socket_write($clientSocket, $frame);
}
?>

核心优化点

1. 多进程并发:通过 pcntl_fork 创建多个工作进程,同时处理多个客户端连接,突破单进程瓶颈。

2. 连接管理:主进程持有监听套接字,子进程共享连接列表,实现负载分担。

3. 信号监听:主进程自动重启异常退出的子进程,保证服务稳定性。

4. 保留认证:集成 Token 校验(客户端需带 ?token=valid_token_123 连接)。

运行

启用扩展 extension=sockets, extension=pcntl

ulimit -n 65535    # 临时提高文件描述符上限
echo 100000 > /proc/sys/net/core/somaxconn # 提高监听队列大小

php ws_multi_process.php # 启动服务

客户端

const ws = new WebSocket('ws://localhost:8080?token=valid_token_123');
ws.onopen = () => ws.send('多进程测试');
ws.onmessage = (e) => console.log('服务端回复:', e.data);

配置为服务

sudo vim /etc/systemd/system/websocket-service.service

websocket-service.service

[Unit]
Description=多进程 WebSocket 服务(基于 pcntl+sockets)
After=network.target
Documentation=无

[Service]
Type=simple
# 核心:替换为你的 PHP 路径和脚本绝对路径(必填)
ExecStart=/usr/bin/php /opt/websocket/ws_multi_process.php
# 运行用户(建议非 root,如 www-data 或自定义用户)
User=www-data
# 工作目录(脚本所在目录,必填)
WorkingDirectory=/opt/websocket
# 自动重启(崩溃、退出均重启,契合脚本自身进程守护)
Restart=always
# 重启延迟(避免频繁重启)
RestartSec=3
# 提高文件描述符上限(支持更多连接)
LimitNOFILE=65535

[Install]
WantedBy=multi-user.target

启用

# 重载配置使服务生效
sudo systemctl daemon-reload
# 设为开机自启
sudo systemctl enable websocket-service
# 启动服务
sudo systemctl start websocket-service
# 查看运行状态(验证是否成功)
sudo systemctl status websocket-service
点击返回顶部
  1. 留言
  2. 联系方式