WebsocketOk.php 6.3 KB
<?php

namespace Jiaoyin;

use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use Swoole\WebSocket\Frame;

class WebsocketOk
{
    private string $host;
    private string $path;
    private int    $port;
    private bool   $ssl;
    private int    $recvTimeout  = 60;
    public ?Client $client       = null;
    public string  $url          = '';
    private int    $lastRecvTime = 0;
    private        $onOpen       = null;
    private        $onMessage    = null;
    private        $onClose      = null;
    private        $onPing       = null;
    private        $onPong       = null;
    private bool   $pingState    = true;
    private bool   $reconnecting = false; //防频繁重连
    private ?int   $timerCheck   = 0;
    private string $desc         = 'websocket未命名';
    private int    $pingCount    = 0;
    private int    $pingDelay    = 25000; // ping tick时间

    public function __construct($url, $desc = null)
    {
        $this->url = $url;
        $pattern   = '/(ws{1,2}):\/\/([\w\.-]+):*(\d*)([\/\-\w\.\?&=]*)/';
        preg_match($pattern, $url, $result);
        $this->ssl  = $result[1] === 'wss';
        $this->host = $result[2];
        $this->port = empty($result[3]) ? ($this->ssl ? 443 : 80) : (int)$result[3];
        $this->path = empty($result[4]) ? '/' : $result[4];
        if ($desc) {
            $this->desc = 'websocket ' . $desc;
        }
        output($this->host, $this->port, $this->ssl, $this->path);
    }

    public function push($data): void
    {
        if ($this->client && $this->client->connected) {
            $this->client->push($data);
        } else {
            output('push error, client not connected');
        }
    }

    public function close(): void
    {
        swoole_timer_clear($this->timerCheck);
        if ($this->client) {
            $this->client->close();
        }
        $this->client = null;
    }

    public function connect(
        callable $onOpen = null,
        callable $onMessage = null,
        callable $onClose = null,
        $onPing = null,
        $onPong = null
    ): void {
        $this->close();
        $this->pingCount = 0;
        $this->onOpen    = $onOpen;
        $this->onMessage = $onMessage;
        $this->onClose   = $onClose;
        $this->onPing    = $onPing;
        $this->onPong    = $onPong;

        Coroutine::create(function () use ($onOpen, $onMessage, $onClose, $onPing, $onPong) {
            $client = new Client($this->host, $this->port, $this->ssl);
            $client->set(['timeout' => 5]);
            $ret = $client->upgrade($this->path);

            if ($ret) {
                output($this->desc, "连接成功");
                $this->reconnecting = false; //解除重连状态
                $this->client = &$client;
                swoole_timer_after(50, function () use ($onOpen, $client) {
                    if ($onOpen) {
                        call_user_func($onOpen, $client);
                    }
                    $this->sendPing();
                });

                while ($client) {
                    $frame = $client->recv($this->recvTimeout);
                    if (!$frame && $client->errCode !== 60) {
                        output($this->desc, '错误码:' . $client->errCode . ',状态码:' . $client->statusCode . ',body:' . $client->body . ";错误数据:" . $frame);
                        break;
                    }

                    $this->lastRecvTime = time();

                    if (is_object($frame) && get_class($frame) === Frame::class) {
                        if ($frame->opcode === WEBSOCKET_OPCODE_TEXT) {
                            $data = json_decode($frame->data, true);
                            if (isset($data['event']) && $data['event'] === 'pong') {
                                $this->recvPongData($data['ts'] ?? null);
                                if ($onPong) {
                                    call_user_func($onPong, $data);
                                }
                            } else {
                                $this->pingState = true; // 收到任何消息都算活跃
                                call_user_func($onMessage, $frame->data);
                            }
                        }

                        if ($frame->opcode === WEBSOCKET_OPCODE_CLOSE) {
                            output($this->desc, "服务器主动关闭连接,连接关闭");
                            break;
                        }
                    }
                }

                Coroutine::defer(function () use ($onClose) {
                    output($this->desc, "协程退出");
                    if ($this->client) {
                        $this->client->close();
                    }
                    $this->client = null;
                    if ($onClose) {
                        call_user_func($onClose);
                    }
                });
            } else {
                if ($onClose) {
                    call_user_func($onClose);
                }
                output($this->desc, "升级websocket连接失败,1s后重连");
                swoole_timer_after(3000, function () {
                    $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
                });
            }
        });
    }

    private function sendPing(): void
    {
        $pingData = 'ping';
        $this->push($pingData);
        $this->pingState = false;

        $this->timerCheck = swoole_timer_after(15000, function () {
            if (!$this->pingState && $this->lastRecvTime < time() - 10 && !$this->reconnecting) {
                $this->reconnecting = true;
                output($this->desc, 'ping pong 超时且未收到数据,重新连接');
                $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
            } else {
                if (!$this->client || !$this->client->connected) {
                    output($this->desc, '客户端已断开,停止Ping');
                    return;
                } else {
                    $this->sendPing();
                }
            }
        });
    }
    private function recvPongData($ts = null): void
    {
        $this->pingState = true;
        if ($ts) {
            $now = round(microtime(true) * 1000);
            $delay = $now - $ts;
            output($this->desc, "收到pong,延迟:{$delay}ms");
        }
    }
}