作者 karlet

feat:修复web socket内存溢出

1 <?php 1 <?php
  2 +
2 namespace Jiaoyin; 3 namespace Jiaoyin;
3 4
4 use Swoole\Coroutine; 5 use Swoole\Coroutine;
@@ -7,25 +8,25 @@ use Swoole\WebSocket\Frame; @@ -7,25 +8,25 @@ use Swoole\WebSocket\Frame;
7 8
8 class Websocket 9 class Websocket
9 { 10 {
10 - private string $host = '';  
11 - private string $path = '';  
12 - private int $port = 443;  
13 - private bool $ssl = true; 11 + private string $host;
  12 + private string $path;
  13 + private int $port;
  14 + private bool $ssl;
14 private int $recvTimeout = 60; 15 private int $recvTimeout = 60;
15 public ?Client $client = null; 16 public ?Client $client = null;
16 public string $url = ''; 17 public string $url = '';
17 - private $lastRecvTime = 0; 18 + private int $lastRecvTime = 0;
18 private $onOpen = null; 19 private $onOpen = null;
19 private $onMessage = null; 20 private $onMessage = null;
20 private $onClose = null; 21 private $onClose = null;
21 private $onPing = null; 22 private $onPing = null;
22 private $onPong = null; 23 private $onPong = null;
23 - private $pingState = true;  
24 - private $timerCheck = 0;  
25 - private $timerPing = 0;  
26 - private $desc = 'websocket未命名';  
27 - private $pingCount = 0;  
28 - private $pingDelay = 10000;//ping tick时间 10s 24 + private bool $pingState = true;
  25 + private ?int $timerCheck = 0;
  26 + private int $timerPing = 0;
  27 + private string $desc = 'websocket未命名';
  28 + private int $pingCount = 0;
  29 + private int $pingDelay = 10000;//ping tick时间 10s
29 30
30 // 例如:wss://fstream.binance.com/stream、ws://175.178.36.217:9528/spot/stream 31 // 例如:wss://fstream.binance.com/stream、ws://175.178.36.217:9528/spot/stream
31 public function __construct($url, $desc = null) 32 public function __construct($url, $desc = null)
@@ -44,10 +45,10 @@ class Websocket @@ -44,10 +45,10 @@ class Websocket
44 if ($desc) { 45 if ($desc) {
45 $this->desc = 'websocket ' . $desc; 46 $this->desc = 'websocket ' . $desc;
46 } 47 }
47 - output($this->host,$this->port,$this->ssl,$this->path); 48 + output($this->host, $this->port, $this->ssl, $this->path);
48 } 49 }
49 50
50 - public function push($data) 51 + public function push($data): void
51 { 52 {
52 if ($this->client) { 53 if ($this->client) {
53 $this->client->push($data); 54 $this->client->push($data);
@@ -56,17 +57,15 @@ class Websocket @@ -56,17 +57,15 @@ class Websocket
56 } 57 }
57 } 58 }
58 59
59 - public function close() 60 + public function close(): void
60 { 61 {
61 swoole_timer_clear($this->timerPing); 62 swoole_timer_clear($this->timerPing);
62 swoole_timer_clear($this->timerCheck); 63 swoole_timer_clear($this->timerCheck);
63 - if ($this->client) {  
64 - $this->client->close();  
65 - } 64 + $this->client?->close();
66 } 65 }
67 66
68 public function connect(callable $onOpen = null, callable $onMessage = null, 67 public function connect(callable $onOpen = null, callable $onMessage = null,
69 - callable $onClose = null, $onPing = null, $onPong = null) 68 + callable $onClose = null, $onPing = null, $onPong = null): void
70 { 69 {
71 $this->close(); 70 $this->close();
72 $this->pingCount = 0; 71 $this->pingCount = 0;
@@ -88,8 +87,8 @@ class Websocket @@ -88,8 +87,8 @@ class Websocket
88 $this->sendPing(); 87 $this->sendPing();
89 }); 88 });
90 while ($this->client) { 89 while ($this->client) {
91 - $frame = $this->client->recv(60);  
92 - if (!$frame && $this->client->errCode!=60) { 90 + $frame = $this->client->recv($this->recvTimeout);
  91 + if (!$frame && $this->client->errCode != 60) {
93 output($this->desc, "错误数据", $frame); 92 output($this->desc, "错误数据", $frame);
94 break; 93 break;
95 } 94 }
@@ -103,7 +102,7 @@ class Websocket @@ -103,7 +102,7 @@ class Websocket
103 } 102 }
104 if ($frame->opcode == WEBSOCKET_OPCODE_PONG) { 103 if ($frame->opcode == WEBSOCKET_OPCODE_PONG) {
105 $this->recvPongData($frame->data); 104 $this->recvPongData($frame->data);
106 - if($onPong){ 105 + if ($onPong) {
107 call_user_func($onPong, $frame->data); 106 call_user_func($onPong, $frame->data);
108 } 107 }
109 } 108 }
@@ -115,27 +114,27 @@ class Websocket @@ -115,27 +114,27 @@ class Websocket
115 break; 114 break;
116 } 115 }
117 } 116 }
118 - \Swoole\Coroutine::defer(function () use ($onClose) { 117 + }
  118 + Coroutine::defer(function () use ($onClose) {
119 output($this->desc, "协程退出"); 119 output($this->desc, "协程退出");
120 $this->client = null; 120 $this->client = null;
121 if ($onClose) { 121 if ($onClose) {
122 call_user_func($onClose); 122 call_user_func($onClose);
123 } 123 }
124 }); 124 });
125 - }  
126 } else { 125 } else {
127 - if($onClose){ 126 + if ($onClose) {
128 call_user_func($onClose); 127 call_user_func($onClose);
129 } 128 }
130 output($this->desc, "升级websocket连接失败,1s后重连"); 129 output($this->desc, "升级websocket连接失败,1s后重连");
131 - swoole_timer_after(1000,function (){ 130 + swoole_timer_after(1000, function () {
132 $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong); 131 $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
133 }); 132 });
134 } 133 }
135 }); 134 });
136 } 135 }
137 136
138 - static public function createPongData($data = null) 137 + static public function createPongData($data = null): Frame
139 { 138 {
140 $frame = new Frame(); 139 $frame = new Frame();
141 if ($data) { 140 if ($data) {
@@ -145,7 +144,7 @@ class Websocket @@ -145,7 +144,7 @@ class Websocket
145 return $frame; 144 return $frame;
146 } 145 }
147 146
148 - static public function createPingData() 147 + static public function createPingData(): Frame
149 { 148 {
150 $frame = new Frame(); 149 $frame = new Frame();
151 $frame->opcode = WEBSOCKET_OPCODE_PING; 150 $frame->opcode = WEBSOCKET_OPCODE_PING;
@@ -153,25 +152,24 @@ class Websocket @@ -153,25 +152,24 @@ class Websocket
153 return $frame; 152 return $frame;
154 } 153 }
155 154
156 - private function sendPing() 155 + private function sendPing(): void
157 { 156 {
158 $this->push(self::createPingData()); 157 $this->push(self::createPingData());
159 $this->pingState = false; 158 $this->pingState = false;
160 $this->timerCheck = swoole_timer_after(5000, function () { 159 $this->timerCheck = swoole_timer_after(5000, function () {
161 if (!$this->pingState) { 160 if (!$this->pingState) {
162 //未收到pong且10内未获取数据。假定已断开连接(存在未收到pong,但在传数据的情况。) 161 //未收到pong且10内未获取数据。假定已断开连接(存在未收到pong,但在传数据的情况。)
163 - if($this->lastRecvTime<time()-10){ 162 + if ($this->lastRecvTime < time() - 10) {
164 output($this->desc, 'ping pong 超时且未收到数据,重新连接'); 163 output($this->desc, 'ping pong 超时且未收到数据,重新连接');
165 $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong); 164 $this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
166 - }else{  
167 - output($this->desc, 'ping pong 超时,再次ping'); 165 + } else {
168 $this->sendPing(); 166 $this->sendPing();
169 } 167 }
170 } 168 }
171 }); 169 });
172 } 170 }
173 171
174 - private function recvPongData($data) 172 + private function recvPongData($data): void
175 { 173 {
176 $this->pingState = true; 174 $this->pingState = true;
177 $this->pingCount += 1; 175 $this->pingCount += 1;
@@ -181,7 +179,7 @@ class Websocket @@ -181,7 +179,7 @@ class Websocket
181 swoole_timer_clear($this->timerCheck); 179 swoole_timer_clear($this->timerCheck);
182 $this->timerPing = swoole_timer_after($this->pingDelay, function () { 180 $this->timerPing = swoole_timer_after($this->pingDelay, function () {
183 //防止重复ping 181 //防止重复ping
184 - if($this->pingState){ 182 + if ($this->pingState) {
185 $this->sendPing(); 183 $this->sendPing();
186 } 184 }
187 }); 185 });