Websocket.php 6.8 KB
<?php

namespace jytools;

use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use Swoole\WebSocket\Frame;

class Websocket
{
    private string $host;
    private string $path;
    private int    $port;
    private bool   $ssl;
    private int    $recvTimeout  = 60;
    public ?Client $client       = null;
    public string  $url          = '';
    private int    $lastRecvTime = 0;
    private        $onOpen       = null;
    private        $onMessage    = null;
    private        $onClose      = null;
    private        $onPing       = null;
    private        $onPong       = null;
    private bool   $pingState    = true;
    private ?int   $timerCheck   = 0;
    private int    $timerPing    = 0;
    private string $desc         = 'websocket未命名';
    private int    $pingCount    = 0;
    private int    $pingDelay    = 10000; //ping tick时间 10s

    //  例如:wss://fstream.binance.com/stream、ws://175.178.36.217:9528/spot/stream
    public function __construct($url, $desc = null)
    {
        $this->url = $url;
        $pattern   = '/(ws{1,2}):\/\/([\w.a-zA-Z]+):*(\d*)([\/\-\wa-zA-Z?&=.]*)/';
        preg_match($pattern, $url, $result);
        $this->ssl  = $result[1] == 'wss';
        $this->host = $result[2];
        if (empty($result[3])) {
            $this->port = $this->ssl ? 443 : 80;
        } else {
            $this->port = $result[3];
        }
        $this->path = empty($result[4]) ? '/' : $result[4];
        if ($desc) {
            $this->desc = 'websocket ' . $desc;
        }
        output($this->host, $this->port, $this->ssl, $this->path);
    }

    public function push($data): void
    {
        if ($this->client) {
            $this->client->push($data);
        } else {
            output('push error, client is null');
        }
    }

    public function close(): void
    {
        swoole_timer_clear($this->timerPing);
        swoole_timer_clear($this->timerCheck);
        $this->client?->close();
    }

    public function connect(
        callable $onOpen = null,
        callable $onMessage = null,
        callable $onClose = null,
        $onPing = null,
        $onPong = null
    ): void {
        $this->close();
        $this->pingCount = 0;
        $this->onOpen    = $onOpen;
        $this->onMessage = $onMessage;
        $this->onClose   = $onClose;
        $this->onPing    = $onPing;
        $this->onPong    = $onPong;
        Coroutine::create(function () use ($onOpen, $onMessage, $onClose, $onPing, $onPong) {
            $client = new Client($this->host, $this->port, $this->ssl);
            $client->set(['timeout' => 5]);
            $ret = $client->upgrade($this->path);
            if ($ret) {
                output($this->desc, "连接成功");
                $this->client = &$client;
                swoole_timer_after(50, function () use ($onOpen, $client) {
                    if ($onOpen) {
                        call_user_func($onOpen, $client);
                    }
                    $this->sendPing();
                });
                while ($client) {
                    $frame = $client->recv($this->recvTimeout);
                    if (!$frame && $client->errCode != 60) {
                        output($this->desc, '错误码:' . $client->errCode . ",错误数据:", $frame);
                        break;
                    }
                    $this->lastRecvTime = time();
                    if (is_object($frame) && get_class($frame) === Frame::class) {
                        if ($frame->opcode == WEBSOCKET_OPCODE_PING) {
                            $this->push(self::createPongData($frame->data));
                            if ($onPing) {
                                call_user_func($onPing, $frame->data);
                            }
                        }
                        if ($frame->opcode == WEBSOCKET_OPCODE_PONG) {
                            $this->recvPongData($frame->data);
                            if ($onPong) {
                                call_user_func($onPong, $frame->data);
                            }
                        }
                        if ($frame->opcode == WEBSOCKET_OPCODE_TEXT) {
                            call_user_func($onMessage, $frame->data);
                        }
                        if ($frame->opcode == WEBSOCKET_OPCODE_CLOSE) {
                            output($this->desc, "服务器主动关闭连接,连接关闭");
                            break;
                        }
                    }
                }
                Coroutine::defer(function () use ($onClose) {
                    output($this->desc, "协程退出");
                    $this->client = null;
                    if ($onClose) {
                        call_user_func($onClose);
                    }
                });
            } else {
                if ($onClose) {
                    call_user_func($onClose);
                }
                output($this->desc, "升级websocket连接失败,1s后重连");
                swoole_timer_after(1000, function () {
                    $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
                });
            }
        });
    }

    static public function createPongData($data = null): Frame
    {
        $frame = new Frame();
        if ($data) {
            $frame->data = $data;
        }
        $frame->opcode = WEBSOCKET_OPCODE_PONG;
        return $frame;
    }

    static public function createPingData(): Frame
    {
        $frame         = new Frame();
        $frame->opcode = WEBSOCKET_OPCODE_PING;
        $frame->data   = getMicrotime();
        return $frame;
    }

    private function sendPing(): void
    {
        $this->push(self::createPingData());
        $this->pingState  = false;
        $this->timerCheck = swoole_timer_after(5000, function () {
            if (!$this->pingState) {
                //未收到pong且10内未获取数据。假定已断开连接(存在未收到pong,但在传数据的情况。)
                if ($this->lastRecvTime < time() - 10) {
                    output($this->desc, 'ping pong 超时且未收到数据,重新连接');
                    $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
                } else {
                    $this->sendPing();
                }
            }
        });
    }

    private function recvPongData($data): void
    {
        $this->pingState = true;
        $this->pingCount += 1;
        if ($this->pingCount % 100 == 0) {
            output($this->desc, '连接稳定,已连接时间:', $this->pingCount * $this->pingDelay / 1000, '秒');
        }
        swoole_timer_clear($this->timerCheck);
        $this->timerPing = swoole_timer_after($this->pingDelay, function () {
            //防止重复ping
            if ($this->pingState) {
                $this->sendPing();
            }
        });
    }
}