Websocket.php 6.5 KB
<?php
namespace Jiaoyin;

use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use Swoole\WebSocket\Frame;

class Websocket
{
    private string $host         = '';
    private string $path         = '';
    private int    $port         = 443;
    private bool   $ssl          = true;
    private int    $recvTimeout  = 60;
    public ?Client $client       = null;
    public string  $url          = '';
    private        $lastRecvTime = 0;
    private        $onOpen       = null;
    private        $onMessage    = null;
    private        $onClose      = null;
    private        $onPing       = null;
    private        $onPong       = null;
    private        $pingState    = true;
    private        $timerCheck   = 0;
    private        $timerPing    = 0;
    private        $desc         = 'websocket未命名';
    private        $pingCount    = 0;
    private        $pingDelay    = 10000;//ping tick时间 10s

    //  例如:wss://fstream.binance.com/stream、ws://175.178.36.217:9528/spot/stream
    public function __construct($url, $desc = null)
    {
        $this->url = $url;
        $pattern   = '/(ws{1,2}):\/\/([\w.a-zA-Z]+):*(\d*)([\/\-\wa-zA-Z?=.]*)/';
        preg_match($pattern, $url, $result);
        $this->ssl  = $result[1] == 'wss';
        $this->host = $result[2];
        if (empty($result[3])) {
            $this->port = $this->ssl ? 443 : 80;
        } else {
            $this->port = $result[3];
        }
        $this->path = empty($result[4]) ? '/' : $result[4];
        if ($desc) {
            $this->desc = 'websocket ' . $desc;
        }
        output($this->host,$this->port,$this->ssl,$this->path);
    }

    public function push($data)
    {
        if ($this->client) {
            $this->client->push($data);
        } else {
            output('push error, client is null');
        }
    }

    public function close()
    {
        swoole_timer_clear($this->timerPing);
        swoole_timer_clear($this->timerCheck);
        if ($this->client) {
            $this->client->close();
        }
    }

    public function connect(callable $onOpen = null, callable $onMessage = null,
                            callable $onClose = null, $onPing = null, $onPong = null)
    {
        $this->close();
        $this->pingCount = 0;
        $this->onOpen    = $onOpen;
        $this->onMessage = $onMessage;
        $this->onClose   = $onClose;
        $this->onPing    = $onPing;
        $this->onPong    = $onPong;
        Coroutine::create(function () use ($onOpen, $onMessage, $onClose, $onPing, $onPong) {
            $this->client = new Client($this->host, $this->port, $this->ssl);
            $this->client->set(['timeout' => 5]);
            $ret = $this->client->upgrade($this->path);
            if ($ret) {
                output($this->desc, "连接成功");
                swoole_timer_after(50, function () use ($onOpen) {
                    if ($onOpen) {
                        call_user_func($onOpen, $this->client);
                    }
                    $this->sendPing();
                });
                while ($this->client) {
                    $frame = $this->client->recv(60);
                    if (!$frame && $this->client->errCode!=60) {
                        output($this->desc, "错误数据", $frame);
                        break;
                    }
                    $this->lastRecvTime = time();
                    if (is_object($frame) && get_class($frame) === Frame::class) {
                        if ($frame->opcode == WEBSOCKET_OPCODE_PING) {
                            $this->push(self::createPongData($frame->data));
                            if ($onPing) {
                                call_user_func($onPing, $frame->data);
                            }
                        }
                        if ($frame->opcode == WEBSOCKET_OPCODE_PONG) {
                            $this->recvPongData($frame->data);
                            if($onPong){
                                call_user_func($onPong, $frame->data);
                            }
                        }
                        if ($frame->opcode == WEBSOCKET_OPCODE_TEXT) {
                            call_user_func($onMessage, $frame->data);
                        }
                        if ($frame->opcode == WEBSOCKET_OPCODE_CLOSE) {
                            output($this->desc, "服务器主动关闭连接,连接关闭");
                            break;
                        }
                    }
                    \Swoole\Coroutine::defer(function () use ($onClose) {
                        output($this->desc, "协程退出");
                        $this->client = null;
                        if ($onClose) {
                            call_user_func($onClose);
                        }
                    });
                }
            } else {
                if($onClose){
                    call_user_func($onClose);
                }
                output($this->desc, "升级websocket连接失败,1s后重连");
                swoole_timer_after(1000,function (){
                    $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
                });
            }
        });
    }

    static public function createPongData($data = null)
    {
        $frame = new Frame();
        if ($data) {
            $frame->data = $data;
        }
        $frame->opcode = WEBSOCKET_OPCODE_PONG;
        return $frame;
    }

    static public function createPingData()
    {
        $frame         = new Frame();
        $frame->opcode = WEBSOCKET_OPCODE_PING;
        $frame->data   = getMicrotime();
        return $frame;
    }

    private function sendPing()
    {
        $this->push(self::createPingData());
        $this->pingState  = false;
        $this->timerCheck = swoole_timer_after(5000, function () {
            if (!$this->pingState) {
                output($this->desc, 'ping pong 超时,重新连接');
                $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
            }
        });
    }

    private function recvPongData($data)
    {
        $this->pingState = true;
        $this->pingCount += 1;
        if ($this->pingCount % 100 == 0) {
            output($this->desc, '连接稳定,已连接时间:', $this->pingCount * $this->pingDelay / 1000, '秒');
        }
        swoole_timer_clear($this->timerCheck);
        $this->timerPing = swoole_timer_after($this->pingDelay, function () {
            $this->sendPing();
        });
    }
}