作者 karlet

feat:更新常用类

<?php
namespace Jiaoyin;
use Jiaoyin;
use Jiaoyin\MongoPool;
class MongoCli
{
private $pool;
private $database;
public function __construct($host,$port,$username,$password,$database, $num = 20)
{
$this->database = $database;
$userInfo = $username?"{$username}:{$password}@":"";
$dsn = "mongodb://{$userInfo}{$host}:{$port}/{$database}";
$this->pool = new MongoPool($dsn,$num);
}
/*
* 创建集合
*/
public function createCollection($table,$createIndex=[]){
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collections = $database->listCollections(['filter' => ['name' => $table]]);
$collectionExists = iterator_count($collections) > 0;
if (empty($collectionExists)) {
// 集合不存在,创建集合并添加索引
$database->createCollection($table);
$collection = $database->selectCollection($table);
$collection->createIndex($createIndex);
}
$this->pool->push($mongodb);
} catch (\Exception $e) {
output($e->getMessage());
}
}
/*
* 检查集合是否存在
* */
public function checkCollection($table): bool
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collections = $database->listCollections(['filter' => ['name' => $table]]);
$collectionExists = iterator_count($collections) > 0;
$this->pool->push($mongodb);
if (empty($collectionExists)) {
// 集合不存在
return false;
}
return true;
} catch (\Exception $e) {
output($e->getMessage());
return false;
}
}
/*
* 写入数据
* $collection->insertOne(['name' => 'John Doe']);
* */
public function insertOne($table,$data = [])
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collection = $database->selectCollection($table);
$result = $collection->insertOne($data);
$this->pool->push($mongodb);
return $result;
} catch (\Exception $e) {
output($e->getMessage());
return false;
}
}
/*
* 批量写入数据
* $collection->insertMany([['name' => 'John Doe'],['name' => 'John Doe']]);
* */
public function insertAll($table,$data = [])
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collection = $database->selectCollection($table);
$result = $collection->insertMany($data);
$this->pool->push($mongodb);
return $result;
} catch (\Exception $e) {
output($e->getMessage());
return [];
}
}
/*
* 查询单条数据
* $collection->findOne(['name' => 'John Doe']);
* */
public function findOne($table,$where = [])
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collection = $database->selectCollection($table);
$result = $collection->findOne($where);
$this->pool->push($mongodb);
return $result;
} catch (\Exception $e) {
output($e->getMessage());
return [];
}
}
/*
* 查询所有数据
* $collection->findOne(['name' => 'John Doe']);
* ['t' => ['$gt' => 1]]; // 查询大于1的数据
* ['t' => ['$lt' => 1]]; // 查询小于1的数据
* ['t' => ['$gte' => 1]]; // 查询大于等于1的数据
* ['t' => ['$lte' => 1]]; // 查询小于等于1的数据
* $options = [
* 'sort' => ['field_name' => 1], // 1 表示升序,-1 表示降序,
* 'limit' => 100,
* ];
* */
public function findAll($table,$where = [], $options=[])
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collection = $database->selectCollection($table);
$result = $collection->find($where,$options);
$this->pool->push($mongodb);
// 遍历结果集
$data = [];
if ($result){
$arr = $result->toArray();
foreach ($arr as $item) {
$data[] = $item->getArrayCopy();
}
return $data;
}
return [];
} catch (\Exception $e) {
output($e->getMessage());
return [];
}
}
/*
* 更新数据
* $collection->updateOne(['name' => 'John Doe'], ['$set' => ['age' => 31]]);
* */
public function updateOne($table,$where = [],$upData = [])
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collection = $database->selectCollection($table);
$result = $collection->updateOne($where,$upData);
$this->pool->push($mongodb);
return $result;
} catch (\Exception $e) {
output($e->getMessage());
return [];
}
}
/*
* 删除数据
* $collection->deleteOne(['name' => 'John Doe']);
* */
public function deleteOne($table,$where = [])
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collection = $database->selectCollection($table);
$result = $collection->deleteOne($where);
$this->pool->push($mongodb);
return $result;
} catch (\Exception $e) {
output($e->getMessage());
return [];
}
}
/*
* 批量删除数据
* $collection->deleteMany(['name' => 'John Doe']);
* */
public function deleteBatch($table,$where = [])
{
try {
$mongodb = $this->pool->get();
$database = $mongodb->selectDatabase($this->database);
$collection = $database->selectCollection($table);
$result = $collection->deleteMany($where);
$this->pool->push($mongodb);
return $result;
} catch (\Exception $e) {
output($e->getMessage());
return [];
}
}
}
\ No newline at end of file
... ...
<?php
namespace Jiaoyin;
use Jiaoyin;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use MongoDB\Client;
class MongoPool
{
private $pool;
private $maxSize;
private $currentSize;
private $mongoUri;
private $options;
public function __construct($mongoUri, $maxSize=20, $options = [])
{
$this->mongoUri = $mongoUri;
$this->maxSize = $maxSize;
$this->currentSize = 0;
$this->options = $options;
$this->pool = new Channel($maxSize);
// 初始化连接池
for ($i = 0; $i < $maxSize; $i++) {
$this->pool->push($this->createConnection());
}
}
private function createConnection()
{
$manager = new Client($this->mongoUri, $this->options);
return $manager;
}
public function get()
{
while ($this->pool->isEmpty()){
// output('Mongodb连接池为空,等待释放连接');
Coroutine::sleep(0.1);
}
$connection = $this->pool->pop();
if (!$connection) {
if ($this->currentSize < $this->maxSize) {
// 如果连接池未满,则创建新连接
$connection = $this->createConnection();
$this->currentSize++;
} else {
// 连接池已满,等待其他协程释放连接
$connection = $this->pool->pop();
}
}
return $connection;
}
public function push($connection)
{
// 检查连接是否有效,这里仅做示例,实际应用中可能需要更复杂的逻辑
if (is_object($connection) && get_class($connection) === Client::class) {
$this->pool->push($connection);
}else{
$this->currentSize -= 1;
}
}
public function close()
{
while (!$this->pool->isEmpty()) {
$connection = $this->pool->pop();
if (is_resource($connection)) {
$connection->close();
}
}
$this->currentSize = 0;
}
}
\ No newline at end of file
... ...
<?php
namespace Jiaoyin;
class MysqlCli{
public function start(){
echo 'mysqlCli test';
use Jiaoyin;
use Swoole\Database\MysqliConfig;
use Swoole\Database\MysqliPool;
class MysqlCli
{
private $pool = null;
private $prefix = '';// 前缀
private int $reconnectCount = 0; //period时间内重连次数
private int $period = 300;
private int $lastReconnectTime = 0;
private $connectConfig = null;
public function __construct($host,$port,$database,$username,$password,$charset='utf8',$prefix='', $connectCount = 20)
{
$this->prefix = $prefix;
$this->connectConfig = [
'host' => $host,
'port' => $port,
'database' => $database,
'username' => $username,
'password' => $password,
'charset' => $charset,
'prefix' => $prefix,
'connectCount' => $connectCount,
];
$this->poolConnect();
}
private function poolConnect()
{
if($this->pool){
$this->pool->close();
}
$this->pool = new MysqliPool((new MysqliConfig)
->withHost($this->connectConfig['host'])
->withPort($this->connectConfig['port'])
->withDbName($this->connectConfig['database'])
->withCharset($this->connectConfig['charset'])
->withUsername($this->connectConfig['username'])
->withPassword($this->connectConfig['password'])
,$this->connectConfig['connectCount']
);
$this->prefix = $this->connectConfig['prefix'];
}
//执行sql
public function execute($action, $sql)
{
$ret = false;
$db = $this->pool->get();
try {
$stmt = $db->query($sql);
if ($stmt == false) {
output($db->errno, $db->error);
} else {
if ($action == 'SELECT') {
$ret = $stmt->fetch_all();
} else {
$ret = $stmt;
}
}
} catch (\Exception $e) {
$errorMsg = $e->getMessage();
output($sql);
output('sql错误', $errorMsg);
if ($errorMsg == 'MySQL server has gone away') {
if ($this->reconnectCount >= 3 && time() - $this->lastReconnectTime < $this->period) {
//period内重连超过3次,不继续重连但电话通知异常
output('数据库连接失效,并重连3次失败');
} else {
//重连
output('重新创建mysql连接池', $this->reconnectCount);
$this->pool->put($db);
$this->poolConnect();
if (time() - $this->lastReconnectTime < $this->period) {
$this->reconnectCount += 1;
} else {
$this->reconnectCount = 0;
}
$this->lastReconnectTime = time();
return $this->execute($action, $sql);
}
} else {
output('数据库操作异常');
}
}
$this->pool->put($db);
return $ret;
}
//插入数据
public function insert($table, $data)
{
$sql = $this->parseInsert($table, $data);
if (!$sql) {
return false;
}
return $this->execute('INSERT', $sql);
}
//select数据
public function select($table, $where = [], $col = [], $orderBy = '', $limit = '')
{
$sql = $this->parseSelect($table, $where, $col, $orderBy, $limit);
if (!$sql) {
return false;
}
$data = $this->execute('SELECT', $sql);
$newData = [];
if (empty($col)) {
$newsql = 'SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME="' . $this->parseTable($table) . '" ORDER BY `ORDINAL_POSITION` ASC';
$coldata = $this->execute('SELECT', $newsql);
if (!$coldata) {
return false;
} else {
foreach ($coldata as $key => $value) {
$col[] = $value[0];
}
}
}
foreach ($data as $key => $value) {
$tmp = [];
foreach ($col as $k => $v) {
$tmp[$v] = $value[$k];
}
$data[$key] = $tmp;
}
return $data;
}
public function update($table, $data, $where = [])
{
$sql = $this->parseUpdate($table, $where, $data);
if (!$sql) {
return false;
}
return $this->execute('UPDATE', $sql);
}
//查找符合条件的一条记录
public function find($table, $where, $col = [], $orderBy = '')
{
$data = $this->select($table, $where, $col, $orderBy, 1);
if ($data && isset($data[0])) {
return $data['0'];
}
return $data;
}
//解析insert
private function parseInsert($table, $data)
{
$table = $this->parseTable($table);
if (!is_array($data)) {
output('insert data 不合法');
return false;
}
$keys = array_keys($data);
$values = array_values($data);
foreach ($values as $key => $value) {
$values[$key] = $this->transferValue($value);
}
$keysFormat = implode(',', $keys);
$valuesFormat = implode(',', $values);
$sql = "insert into {$table}({$keysFormat}) values({$valuesFormat})";
return $sql;
}
//解析select
private function parseSelect($table, $where, $col, $orderBy, $limit)
{
$table = $this->parseTable($table);
if (!empty($col) && !is_array($col)) {
output('select where col 不合法');
return false;
}
$colTxt = empty($col) ? '*' : implode(',', $col);
$whereTxt = $this->parseWhere($where);
$orderByTxt = !empty($orderBy) && isset($orderBy[0]) && isset($orderBy[1]) ? 'order by ' . $orderBy[0] . ' ' . $orderBy[1] : '';
$limitTxt = !empty($limit) ? 'limit ' . $limit : '';
$sql = "select {$colTxt} from {$table} {$whereTxt} {$orderByTxt} {$limitTxt}";
return $sql;
}
//解析where
private function parseWhere($where)
{
if (empty($where)) {
return '';
}
$whereTxt = 'where ';
if (is_string($where)) {
$whereTxt .= $where;
} else if (is_array($where)) {
$tmp = '';
foreach ($where as $key => $value) {
if (!is_array($value) && !isset($value[1])) {
output('where 不合法');
var_dump($where);
return false;
}
$value[1] = $this->transferValue($value[1]);
$symbol = isset($value[2]) ? $value[2] : '=';
$tmp .= $value[0] . ' ' . $symbol . ' ' . $value[1];
$tmp .= $key < count($where) - 1 ? ' and ' : '';
}
$whereTxt .= $tmp;
} else {
output('where 不合法');
var_dump($where);
return false;
}
return $whereTxt;
}
//解析udpate
private function parseUpdate($table, $where, $data)
{
$table = $this->parseTable($table);
if (!is_array($data) || empty($data)) {
output('更新数据不能为空');
return false;
}
$setTxt = 'set ';
$count = 0;
foreach ($data as $key => $value) {
$setTxt .= $key . '=';
$setTxt .= $this->transferValue($value);
if ($count < count($data) - 1) {
$setTxt .= ',';
}
$count += 1;
}
$whereTxt = $this->parseWhere($where);
$sql = 'update ' . $table . ' ' . $setTxt . ' ' . $whereTxt;
return $sql;
}
private function parseTable($table)
{
return $this->prefix . $table;
}
//值转义
private function transferValue($value)
{
$valueNew = null;
if (is_string($value)) {
$valueNew = '"' . $value . '"';
} else if ($value === null) {
$valueNew = 'NULL';
} else {
if (is_numeric($value) && strpos($value . '', 'E') !== false) {
$value = number_format($value, 8);
while (substr($value, -1) === '0') {
$value = substr($value, 0, -1);
}
}
$valueNew = $value;
}
return $valueNew;
}
}
\ No newline at end of file
... ...
<?php
namespace Jiaoyin;
use Jiaoyin;
use Swoole\Database\RedisConfig;
use Swoole\Database\RedisPool;
class RedisCli
{
private $pool;
public function __construct($host,$port,$password,$dbIndex, $num = 20)
{
$this->pool = new RedisPool((new RedisConfig)
->withHost($host)
->withPort($port)
->withAuth($password)
->withDbIndex($dbIndex)
->withTimeout(5)
,$num //默认64个连接池
);
}
//集合存数据
public function sAdd($key,$member){
$redis = $this->pool->get();
try {
$res = $redis->sAdd($key,$member);
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
//获取集合
public function sMembers($key){
$redis = $this->pool->get();
try {
$res = $redis->sMembers($key);
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
//移除集合成员
public function sRem($key,$member){
$redis = $this->pool->get();
try {
$res = $redis->sRem($key,$member);
}catch (\RedisException $e){
return false;
}
$this->pool->put($redis);
return $res;
}
//哈希 键 字段 值 设置
public function hSet($key,$field,$value){
$redis = $this->pool->get();
try {
$res = $redis->hSet($key, $field, $value);
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
//获取 键对应字段的值
public function hGet($key,$field){
$redis = $this->pool->get();
try {
$res = $redis->hGet($key, $field);
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
//获取键所有字段和值
public function hGetAll($key){
$redis = $this->pool->get();
try {
$res = $redis->hGetAll($key);
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
//删除一个值
public function hDel($key,$field){
$redis = $this->pool->get();
try {
$res = $redis->hDel($key, $field);
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
/*
* 发布订阅等
*/
//将信息发送到指定的频道
public function publish($channel,$message){
$redis = $this->pool->get();
try {
$res = $redis->publish($channel, $message);
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
public function subscribe($channels,callable $onMessage)
{
$redis = $this->pool->get();
try {
$res = $redis->subscribe($channels,function($redis, $channel, $message)use($onMessage){
call_user_func($onMessage,$redis, $channel, $message);
});
} catch (\RedisException $e) {
return false;
}
$this->pool->put($redis);
return $res;
}
}
\ No newline at end of file
... ...
<?php
namespace Jiaoyin;
use Jiaoyin;
use Jiaoyin\Curl;
/*
binance合约接口
*/
class BinanceFutures
{
static private string $host = 'https://fapi.binance.com';
private string $apikey = '';
private string $secret = '';
public function __construct($apikey='', $secret='',$host='')
{
$this->apikey = $apikey;
$this->secret = $secret;
if(!empty($host)){
self::$host = $host;
}
}
static private function createUrl($path): string
{
return self::$host . $path;
}
static private function createHeader($apikey): array
{
return ["X-MBX-APIKEY:" . $apikey];
}
static private function createSign($secret, $param)
{
$len = count($param);
if ($len == 0) {
output('param 为空,签名失败');
return $param;
}
$paramStr = http_build_query($param);
return hash_hmac('sha256', $paramStr, $secret);
}
private function requestAccount($method, $path, $param)
{
if(empty($this->apikey) || empty($this->secret)){
output('api 或 secret 为空');
return ['code'=>-1,'msg'=>'apikey or secret is empty'];
}
$url = $this->createUrl($path);
$param['timestamp'] = getMicrotime();
$param['signature'] = $this->createSign($this->secret, $param);
$header = $this->createHeader($this->apikey);
if (!in_array(strtoupper($method), ['GET', 'POST', 'DELETE'])) {
output('请求方法错误', $method, $path, $param);
return 0;
}
$data = json_encode([]);
if (strtoupper($method) == 'POST') {
$data = Curl::httpPost($url, $param, $header);
}
if (strtoupper($method) == 'GET') {
$data = Curl::httpGet($url, $param, $header);
}
if (strtoupper($method) == 'DELETE') {
$data = Curl::httpDelete($url, $param, $header);
}
return json_decode($data, true);
}
static private function requestMarket($method, $path, $param)
{
$url = self::createUrl($path);
if (!in_array(strtoupper($method), ['GET', 'POST', 'DELETE'])) {
output('请求方法错误', $method, $path, $param);
return 0;
}
$data = json_encode([]);
if (strtoupper($method) == 'POST') {
$data = Curl::httpPost($url, $param);
}
if (strtoupper($method) == 'GET') {
$data = Curl::httpGet($url, $param);
}
if (strtoupper($method) == 'DELETE') {
$data = Curl::httpDelete($url, $param);
}
return json_decode($data, true);
}
public function cancelOrderById($param)
{
$path = '/fapi/v1/order';
$method = 'DELETE';
return $this->requestAccount($method, $path, $param);
}
//下单
public function order($param)
{
$path = '/fapi/v1/order';
$method = 'POST';
return $this->requestAccount($method, $path, $param);
}
//获取或者延长listenKey
public function listenKey($param = [])
{
$path = '/fapi/v1/listenKey';
$method = 'POST';
return $this->requestAccount($method, $path, $param);
}
//获取仓位
public function account($param = [])
{
$path = '/fapi/v2/account';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//查询历史订单
public function allOrders($param)
{
$path = '/fapi/v1/allOrders';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//查询持仓模式
public function positionSideDual($param = [])
{
$path = '/fapi/v1/positionSide/dual';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//设置持仓模式
public function setPositionSideDual($param)
{
$path = '/fapi/v1/positionSide/dual';
$method = 'POST';
return $this->requestAccount($method, $path, $param);
}
//查询资产
public function balance($param = [])
{
$path = '/fapi/v2/balance';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//查询杠杆层级
public function leverageBracket($param = [])
{
$path = '/fapi/v1/leverageBracket';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//查询用户持仓风险,杠杆倍数
public function positionRisk($param = [])
{
$path = '/fapi/v2/positionRisk';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//设置杠杆
public function leverage($param)
{
$path = '/fapi/v1/leverage';
$method = 'POST';
return $this->requestAccount($method, $path, $param);
}
//查询资金流水
public function income($param)
{
$path = '/fapi/v1/income';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
/*
* ==================================
* 以下公共接口,无限apikey
* ==================================
*/
//24小时成交数据
static public function ticker24hr($param = [])
{
$path = '/fapi/v1/ticker/24hr';
$method = 'GET';
return self::requestMarket($method, $path, $param);
}
//获取交易对和交易规则
static public function exchangeInfo($param = [])
{
$path = '/fapi/v1/exchangeInfo';
$method = 'GET';
return self::requestMarket($method, $path, $param);
}
//获取k线数据
static public function klines($param)
{
$path = '/fapi/v1/klines';
$method = 'GET';
return self::requestMarket($method, $path, $param);
}
}
?>
\ No newline at end of file
... ...
<?php
namespace Jiaoyin;
use Jiaoyin;
use Jiaoyin\Curl;
/*
binance现货接口
*/
class BinanceSpot
{
static private string $host = 'https://api.binance.com';
private string $apikey = '';
private string $secret = '';
public function __construct($apikey='', $secret='',$host='')
{
$this->apikey = $apikey;
$this->secret = $secret;
if(!empty($host)){
self::$host = $host;
}
}
static private function createUrl($path): string
{
return self::$host . $path;
}
static private function createHeader($apikey): array
{
return ["X-MBX-APIKEY:" . $apikey];
}
static private function createSign($secret, $param)
{
$len = count($param);
if ($len == 0) {
output('param 为空,签名失败');
return $param;
}
$paramStr = http_build_query($param);
return hash_hmac('sha256', $paramStr, $secret);
}
private function requestAccount($method, $path, $param)
{
$url = $this->createUrl($path);
$param['timestamp'] = getMicrotime();
$param['signature'] = $this->createSign($this->secret, $param);
$header = $this->createHeader($this->apikey);
if (!in_array(strtoupper($method), ['GET', 'POST', 'DELETE'])) {
output('请求方法错误', $method, $path, $param);
return 0;
}
$data = json_encode([]);
if (strtoupper($method) == 'POST') {
$data = Curl::httpPost($url, $param, $header);
}
if (strtoupper($method) == 'GET') {
$data = Curl::httpGet($url, $param, $header);
}
if (strtoupper($method) == 'DELETE') {
$data = Curl::httpDelete($url, $param, $header);
}
return json_decode($data, true);
}
static private function requestMarket($method, $path, $param)
{
$url = self::createUrl($path);
if (!in_array(strtoupper($method), ['GET', 'POST', 'DELETE'])) {
output('请求方法错误', $method, $path, $param);
return 0;
}
$data = json_encode([]);
if (strtoupper($method) == 'POST') {
$data = network\Curl::httpPost($url, $param);
}
if (strtoupper($method) == 'GET') {
$data = network\Curl::httpGet($url, $param);
}
if (strtoupper($method) == 'DELETE') {
$data = network\Curl::httpDelete($url, $param);
}
return json_decode($data, true);
}
//查询用户万向划转历史
public function getTransfer($param)
{
$path = '/sapi/v1/asset/transfer';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//查询子账户划转历史 (仅适用子账户)
public function subUserHistory($param)
{
$path = '/sapi/v1/sub-account/transfer/subUserHistory';
$method = 'GET';
return $this->requestAccount($method, $path, $param);
}
//合约资金划转
public function futuresTransfer($param)
{
$path = '/sapi/v1/futures/transfer';
$method = 'POST';
return $this->requestAccount($method, $path, $param);
}
//子账户向子账户划转
public function subToSub($param)
{
$path = '/sapi/v1/sub-account/transfer/subToSub';
$method = 'POST';
return $this->requestAccount($method, $path, $param);
}
//获取用户资产
public function getUserAsset($param)
{
$path = '/sapi/v3/asset/getUserAsset';
$method = 'POST';
return $this->requestAccount($method, $path, $param);
}
/*
* ==================================
* 以下公共接口,无限apikey
* ==================================
*/
//24小时成交数据
// static public function ticker24hr($param = [])
// {
// $path = '/fapi/v1/ticker/24hr';
// $method = 'GET';
// return self::requestMarket($method, $path, $param);
// }
}
?>
\ No newline at end of file
... ...
<?php
namespace Jiaoyin;
class Test {
public function sayHello() {
return "Hello from My Private Library!";
}
}
<?php
namespace Jiaoyin;
//格式化输出
function output(){
$args = func_get_args();
$outStr = '['.timeFormat('ms').']:';
foreach($args as $key => $value){
$value = is_array($value) ? json_encode($value) : $value;
if(is_bool($value)){
$value = $value ? 'bool:true':'bool:false';
}
$outStr .= count($args) - $key > 1 ? $value.' ' : $value;
}
echo $outStr.PHP_EOL;
}
//格式化时间
function timeFormat($type='s',$format='Y-m-d H:i:s'){
date_default_timezone_set('Asia/Shanghai');
$microTime = microtime();
list($msTime,$sTime) = explode(' ',$microTime);
$timeStr = date($format,$sTime);
if($type == 'ms'){
$timeStr .= '.'.sprintf("%03d",floor($msTime*1000));
}
return $timeStr;
}
// 获取当前时间的微秒数
function getMicrotime()
{
list($uSec, $sec) = explode(' ', microtime());
return $sec*1000+round($uSec*1000);
}
//获取精度
function getPrecision($number): int
{
$count = 0;
while($number < 1){
$number *= 10;
$count += 1;
}
return $count;
}
function getIntervalUnit($interval,$type='s'){
$unitTime = 0;
if($interval == '1m'){
$unitTime = 60;
}
if($interval == '5m'){
$unitTime = 60*5;
}
if($interval == '15m'){
$unitTime = 60*15;
}
if($interval == '1h'){
$unitTime = 60*60;
}
if($interval == '4h'){
$unitTime = 60*60*4;
}
if($interval == '1d'){
$unitTime = 60*60*24;
}
return $type == 's' ? $unitTime : $unitTime*1000;
}
... ...
<?php
namespace Jiaoyin;
use Jiaoyin;
class Curl{
// get 获取数据
static public function httpGet($url, $param=[], $header=[]){
if(empty($url)){
return false;
}
if(count($param) > 0){
$url = $url.'?'.http_build_query($param);
}
$ch = curl_init();
$ch = self::curlSet($ch,$url, $header);
$output = curl_exec($ch);
if($output === false){
echo 'Curl error: ' . curl_error($ch);
}
curl_close($ch);
return $output;
}
//post 获取数据
static public function httpPost($url, $param=[], $header=[]){
if(empty($url)){
return false;
}
if(count($param) > 0){
$url = $url.'?'.http_build_query($param);
}
$ch = curl_init();
$ch = self::curlSet($ch, $url, $header);
curl_setopt($ch, CURLOPT_POST, 1);
$output = curl_exec($ch);
if($output === false){
echo 'Curl error: ' . curl_error($ch);
}
curl_close($ch);
return $output;
}
//delete 请求
static public function httpDelete($url, $param=[], $header=[]){
if(empty($url)){
return false;
}
if(count($param) > 0){
$url = $url.'?'.http_build_query($param);
}
$ch = curl_init();
$ch = self::curlSet($ch, $url, $header);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'DELETE');
$output = curl_exec($ch);
if($output === false){
echo 'Curl error: ' . curl_error($ch);
}
curl_close($ch);
return $output;
}
//curl 参数设置
static private function curlSet($ch,$url, $header){
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true) ;
curl_setopt($ch, CURLOPT_URL, $url);
//参数为1表示传输数据,为0表示直接输出显示。
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
//参数为0表示不带头文件,为1表示带头文件
curl_setopt($ch, CURLOPT_HEADER,0);
if(!empty($header)){
curl_setopt($ch, CURLOPT_HTTPHEADER,$header);
}
// 关闭SSL验证
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, 0);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 0);
return $ch;
}
}
\ No newline at end of file
... ...
<?php
namespace Jiaoyin;
use Jiaoyin;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use Swoole\WebSocket\Frame;
class Websocket
{
private string $host = '';
private string $path = '';
private int $port = 443;
private bool $ssl = true;
private int $recvTimeout = 60;
public ?Client $client = null;
public string $url = '';
private $lastRecvTime = 0;
private $onOpen = null;
private $onMessage = null;
private $onClose = null;
private $onPing = null;
private $onPong = null;
private $pingState = true;
private $timerCheck = 0;
private $timerPing = 0;
private $desc = 'websocket未命名';
private $pingCount = 0;
private $pingDelay = 10000;//ping tick时间 10s
// 例如:wss://fstream.binance.com/stream、ws://175.178.36.217:9528/spot/stream
public function __construct($url, $desc = null)
{
$this->url = $url;
$pattern = '/(ws{1,2}):\/\/([\w.a-zA-Z]+):*(\d*)([\/\-\wa-zA-Z?=.]*)/';
preg_match($pattern, $url, $result);
$this->ssl = $result[1] == 'wss';
$this->host = $result[2];
if (empty($result[3])) {
$this->port = $this->ssl ? 443 : 80;
} else {
$this->port = $result[3];
}
$this->path = empty($result[4]) ? '/' : $result[4];
if ($desc) {
$this->desc = 'websocket ' . $desc;
}
output($this->host,$this->port,$this->ssl,$this->path);
}
public function push($data)
{
if ($this->client) {
$this->client->push($data);
} else {
output('push error, client is null');
}
}
public function close()
{
swoole_timer_clear($this->timerPing);
swoole_timer_clear($this->timerCheck);
if ($this->client) {
$this->client->close();
}
}
public function connect(callable $onOpen = null, callable $onMessage = null,
callable $onClose = null, $onPing = null, $onPong = null)
{
$this->close();
$this->pingCount = 0;
$this->onOpen = $onOpen;
$this->onMessage = $onMessage;
$this->onClose = $onClose;
$this->onPing = $onPing;
$this->onPong = $onPong;
Coroutine::create(function () use ($onOpen, $onMessage, $onClose, $onPing, $onPong) {
$this->client = new Client($this->host, $this->port, $this->ssl);
$this->client->set(['timeout' => 5]);
$ret = $this->client->upgrade($this->path);
if ($ret) {
output($this->desc, "连接成功");
swoole_timer_after(50, function () use ($onOpen) {
if ($onOpen) {
call_user_func($onOpen, $this->client);
}
$this->sendPing();
});
while ($this->client) {
$frame = $this->client->recv(60);
if (!$frame && $this->client->errCode!=60) {
output($this->desc, "错误数据", $frame);
break;
}
$this->lastRecvTime = time();
if (is_object($frame) && get_class($frame) === Frame::class) {
if ($frame->opcode == WEBSOCKET_OPCODE_PING) {
$this->push(self::createPongData($frame->data));
if ($onPing) {
call_user_func($onPing, $frame->data);
}
}
if ($frame->opcode == WEBSOCKET_OPCODE_PONG) {
$this->recvPongData($frame->data);
if($onPong){
call_user_func($onPong, $frame->data);
}
}
if ($frame->opcode == WEBSOCKET_OPCODE_TEXT) {
call_user_func($onMessage, $frame->data);
}
if ($frame->opcode == WEBSOCKET_OPCODE_CLOSE) {
output($this->desc, "服务器主动关闭连接,连接关闭");
break;
}
}
\Swoole\Coroutine::defer(function () use ($onClose) {
output($this->desc, "协程退出");
$this->client = null;
if ($onClose) {
call_user_func($onClose);
}
});
}
} else {
if($onClose){
call_user_func($onClose);
}
output($this->desc, "升级websocket连接失败,1s后重连");
swoole_timer_after(1000,function (){
$this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
});
}
});
}
static public function createPongData($data = null)
{
$frame = new Frame();
if ($data) {
$frame->data = $data;
}
$frame->opcode = WEBSOCKET_OPCODE_PONG;
return $frame;
}
static public function createPingData()
{
$frame = new Frame();
$frame->opcode = WEBSOCKET_OPCODE_PING;
$frame->data = getMicrotime();
return $frame;
}
private function sendPing()
{
$this->push(self::createPingData());
$this->pingState = false;
$this->timerCheck = swoole_timer_after(5000, function () {
if (!$this->pingState) {
output($this->desc, 'ping pong 超时,重新连接');
$this->connect($this->onOpen, $this->onMessage, $this->onClose, $this->onPing, $this->onPong);
}
});
}
private function recvPongData($data)
{
$this->pingState = true;
$this->pingCount += 1;
if ($this->pingCount % 100 == 0) {
output($this->desc, '连接稳定,已连接时间:', $this->pingCount * $this->pingDelay / 1000, '秒');
}
swoole_timer_clear($this->timerCheck);
$this->timerPing = swoole_timer_after($this->pingDelay, function () {
$this->sendPing();
});
}
}
... ...
<?php
namespace Jiaoyin;
use Jiaoyin;
class Feishu{
//发送预警通知
static public function send($content)
{
$url = 'https://open.feishu.cn/open-apis/bot/v2/hook/f30bbb72-091c-499d-a577-43c45dceb158';
$content = [
'msg_type'=>"text",
'content'=>[
'text'=>$content
]
];
$con = json_encode($content);
return self::send_post_json($url,$con);
}
static public function sendNoError($content)
{
$url = 'https://open.feishu.cn/open-apis/bot/v2/hook/a344e313-4d5d-4aa4-912b-dda37b2e3ee8';
$content = [
'msg_type'=>"text",
'content'=>[
'text'=>$content
]
];
$con = json_encode($content);
return self::send_post_json($url,$con);
}
static public function notice($content,$channel="warning")
{
$urls = [
'follow' => 'https://open.feishu.cn/open-apis/bot/v2/hook/f30bbb72-091c-499d-a577-43c45dceb158',
'warning' => 'https://open.feishu.cn/open-apis/bot/v2/hook/d48c2e4a-0ef9-4684-88d4-e62d878fdaca',
'error' => 'https://open.feishu.cn/open-apis/bot/v2/hook/a344e313-4d5d-4aa4-912b-dda37b2e3ee8',
];
$url = $urls[$channel] ?? $urls['warning'];
$content = [
'msg_type'=>"text",
'content'=>[
'text'=>$content
]
];
$con = json_encode($content);
return self::send_post_json($url,$con);
}
static public function send_post_json($url, $jsonStr)
{
$ch = curl_init();
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch,CURLOPT_SSL_VERIFYPEER,false);
curl_setopt($ch,CURLOPT_SSL_VERIFYPEER,false);
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_POSTFIELDS, $jsonStr);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: application/json; charset=utf-8',
'Content-Length: ' . strlen($jsonStr)
)
);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return array($httpCode, $response);
}
}
... ...