|
|
|
<?php
|
|
|
|
|
|
|
|
namespace jiaoyin;
|
|
|
|
namespace Jiaoyin;
|
|
|
|
|
|
|
|
use Swoole\Database\RedisConfig;
|
|
|
|
use Swoole\Database\RedisPool;
|
|
|
|
use function Jiaoyin\output;
|
|
|
|
|
|
|
|
class RedisCli
|
|
|
|
{
|
|
|
|
private $pool;
|
|
|
|
public $subRedis = null;
|
|
|
|
public function __construct($host, $port, $password, $dbIndex, $num = 20)
|
|
|
|
|
|
|
|
private $host;
|
|
|
|
private $port;
|
|
|
|
private $dbIndex;
|
|
|
|
|
|
|
|
public function __construct($host, $port, $password, $dbIndex, $num = 40)
|
|
|
|
{
|
|
|
|
$this->pool = new RedisPool((new RedisConfig)
|
|
|
|
$this->host = $host;
|
|
|
|
$this->port = $port;
|
|
|
|
$this->dbIndex = $dbIndex;
|
|
|
|
|
|
|
|
$this->pool = new RedisPool(
|
|
|
|
(new RedisConfig)
|
|
|
|
->withHost($host)
|
|
|
|
->withPort($port)
|
|
|
|
->withAuth($password)
|
|
|
|
->withDbIndex($dbIndex),
|
|
|
|
$num //默认64个连接池
|
|
|
|
$num
|
|
|
|
);
|
|
|
|
}
|
|
|
|
//集合存数据
|
|
|
|
public function sAdd($key, $member)
|
|
|
|
|
|
|
|
private function exec(callable $fn, string $method)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->sAdd($key, $member);
|
|
|
|
return $fn($redis);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
$this->logError($method, $e);
|
|
|
|
try {
|
|
|
|
$redis->close(); // 销毁坏连接
|
|
|
|
} catch (\Throwable $t) {
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
} finally {
|
|
|
|
if ($redis && $redis->isConnected()) {
|
|
|
|
$this->pool->put($redis);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
}
|
|
|
|
|
|
|
|
//获取集合
|
|
|
|
public function sMembers($key)
|
|
|
|
// ------------------ 集合 ------------------
|
|
|
|
public function sAdd($key, $member)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->sMembers($key);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
return $this->exec(fn($r) => $r->sAdd($key, $member), "sAdd");
|
|
|
|
}
|
|
|
|
|
|
|
|
//移除集合成员
|
|
|
|
public function sRem($key, $member)
|
|
|
|
public function sMembers($key)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->sRem($key, $member);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return $this->exec(fn($r) => $r->sMembers($key), "sMembers");
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
public function sRem($key, $member)
|
|
|
|
{
|
|
|
|
return $this->exec(fn($r) => $r->sRem($key, $member), "sRem");
|
|
|
|
}
|
|
|
|
|
|
|
|
//哈希 键 字段 值 设置
|
|
|
|
// ------------------ 哈希 ------------------
|
|
|
|
public function hSet($key, $field, $value)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->hSet($key, $field, $value);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
return $this->exec(fn($r) => $r->hSet($key, $field, $value), "hSet");
|
|
|
|
}
|
|
|
|
|
|
|
|
public function hMSet($key, $field)
|
|
|
|
{
|
|
|
|
return $this->exec(fn($r) => $r->hMSet($key, $field), "hMSet");
|
|
|
|
}
|
|
|
|
//获取 键对应字段的值
|
|
|
|
|
|
|
|
public function hMGet($key, $field)
|
|
|
|
{
|
|
|
|
return $this->exec(fn($r) => $r->hMGet($key, $field), "hMGet");
|
|
|
|
}
|
|
|
|
|
|
|
|
public function hGet($key, $field)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->hGet($key, $field);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
return $this->exec(fn($r) => $r->hGet($key, $field), "hGet");
|
|
|
|
}
|
|
|
|
//获取键所有字段和值
|
|
|
|
|
|
|
|
public function hGetAll($key)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->hGetAll($key);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
return $this->exec(fn($r) => $r->hGetAll($key), "hGetAll");
|
|
|
|
}
|
|
|
|
//删除一个值
|
|
|
|
|
|
|
|
public function hDel($key, $field)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->hDel($key, $field);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
return $this->exec(fn($r) => $r->hDel($key, $field), "hDel");
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* 发布订阅等
|
|
|
|
*/
|
|
|
|
//将信息发送到指定的频道
|
|
|
|
// ------------------ 发布 ------------------
|
|
|
|
public function publish($channel, $message)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
try {
|
|
|
|
$res = $redis->publish($channel, $message);
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
return $this->exec(fn($r) => $r->publish($channel, $message), "publish");
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------ 订阅模式(独立连接) ------------------
|
|
|
|
public function subscribe($channels, callable $onMessage)
|
|
|
|
{
|
|
|
|
$redis = $this->pool->get();
|
|
|
|
$this->subRedis = $redis;
|
|
|
|
try {
|
|
|
|
$res = $redis->subscribe($channels, function ($redis, $channel, $message) use ($onMessage) {
|
|
|
|
call_user_func($onMessage, $redis, $channel, $message);
|
|
|
|
return $redis->subscribe($channels, function ($redis, $channel, $message) use ($onMessage) {
|
|
|
|
$onMessage($redis, $channel, $message);
|
|
|
|
});
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
$this->logError("subscribe", $e);
|
|
|
|
return false;
|
|
|
|
} finally {
|
|
|
|
$this->subRedis = null;
|
|
|
|
// 不放回池子,订阅是阻塞连接
|
|
|
|
}
|
|
|
|
$this->subRedis = null;
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function psubscribe($channels, callable $onMessage)
|
|
...
|
...
|
@@ -145,14 +126,44 @@ class RedisCli |
|
|
|
$redis = $this->pool->get();
|
|
|
|
$this->subRedis = $redis;
|
|
|
|
try {
|
|
|
|
$res = $redis->psubscribe($channels, function ($redis, $pattern, $channel, $message) use ($onMessage) {
|
|
|
|
call_user_func($onMessage, $pattern, $redis, $channel, $message);
|
|
|
|
return $redis->psubscribe($channels, function ($redis, $pattern, $channel, $message) use ($onMessage) {
|
|
|
|
$onMessage($pattern, $redis, $channel, $message);
|
|
|
|
});
|
|
|
|
} catch (\RedisException $e) {
|
|
|
|
$this->logError("psubscribe", $e);
|
|
|
|
return false;
|
|
|
|
} finally {
|
|
|
|
$this->subRedis = null;
|
|
|
|
// 不放回池子
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------ 健康检查 ------------------
|
|
|
|
public function ping()
|
|
|
|
{
|
|
|
|
return $this->exec(fn($r) => $r->ping(), "ping");
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------ 错误日志 ------------------
|
|
|
|
private function logError($method, \RedisException $e)
|
|
|
|
{
|
|
|
|
if (function_exists('swoole_get_worker_id')) {
|
|
|
|
$workerId = swoole_get_worker_id();
|
|
|
|
$processInfo = "WorkerID:{$workerId}";
|
|
|
|
} else {
|
|
|
|
$pid = getmypid();
|
|
|
|
$uid = function_exists('posix_getuid') ? posix_getuid() : 'N/A';
|
|
|
|
$processInfo = "PID:{$pid}, UID:{$uid}";
|
|
|
|
}
|
|
|
|
$this->subRedis = null;
|
|
|
|
$this->pool->put($redis);
|
|
|
|
return $res;
|
|
|
|
|
|
|
|
output(sprintf(
|
|
|
|
"[RedisCli][%s][%s:%d][db:%d] %s failed: %s",
|
|
|
|
$processInfo,
|
|
|
|
$this->host,
|
|
|
|
$this->port,
|
|
|
|
$this->dbIndex,
|
|
|
|
$method,
|
|
|
|
$e->getMessage()
|
|
|
|
));
|
|
|
|
}
|
|
|
|
} |
...
|
...
|
|