RedisCli.php 4.6 KB
<?php

namespace Jiaoyin;

use Swoole\Database\RedisConfig;
use Swoole\Database\RedisPool;
use function Jiaoyin\output;

class RedisCli
{
    private $pool;
    public $subRedis = null;

    private $host;
    private $port;
    private $dbIndex;

    public function __construct($host, $port, $password, $dbIndex, $num = 40)
    {
        $this->host = $host;
        $this->port = $port;
        $this->dbIndex = $dbIndex;

        $this->pool = new RedisPool(
            (new RedisConfig)
                ->withHost($host)
                ->withPort($port)
                ->withAuth($password)
                ->withDbIndex($dbIndex),
            $num
        );
    }

    private function exec(callable $fn, string $method)
    {
        $redis = $this->pool->get();
        try {
            return $fn($redis);
        } catch (\RedisException $e) {
            $this->logError($method, $e);
            try {
                $redis->close(); // 销毁坏连接
            } catch (\Throwable $t) {
            }
            return false;
        } finally {
            if ($redis && $redis->isConnected()) {
                $this->pool->put($redis);
            }
        }
    }

    // ------------------ 集合 ------------------
    public function sAdd($key, $member)
    {
        return $this->exec(fn($r) => $r->sAdd($key, $member), "sAdd");
    }

    public function sMembers($key)
    {
        return $this->exec(fn($r) => $r->sMembers($key), "sMembers");
    }

    public function sRem($key, $member)
    {
        return $this->exec(fn($r) => $r->sRem($key, $member), "sRem");
    }

    // ------------------ 哈希 ------------------
    public function hSet($key, $field, $value)
    {
        return $this->exec(fn($r) => $r->hSet($key, $field, $value), "hSet");
    }

    public function hMSet($key, $field)
    {
        return $this->exec(fn($r) => $r->hMSet($key, $field), "hMSet");
    }

    public function hMGet($key, $field)
    {
        return $this->exec(fn($r) => $r->hMGet($key, $field), "hMGet");
    }

    public function hGet($key, $field)
    {
        return $this->exec(fn($r) => $r->hGet($key, $field), "hGet");
    }

    public function hGetAll($key)
    {
        return $this->exec(fn($r) => $r->hGetAll($key), "hGetAll");
    }

    public function hDel($key, $field)
    {
        return $this->exec(fn($r) => $r->hDel($key, $field), "hDel");
    }

    // ------------------ 发布 ------------------
    public function publish($channel, $message)
    {
        return $this->exec(fn($r) => $r->publish($channel, $message), "publish");
    }

    // ------------------ 订阅模式(独立连接) ------------------
    public function subscribe($channels, callable $onMessage)
    {
        $redis = $this->pool->get();
        $this->subRedis = $redis;
        try {
            return $redis->subscribe($channels, function ($redis, $channel, $message) use ($onMessage) {
                $onMessage($redis, $channel, $message);
            });
        } catch (\RedisException $e) {
            $this->logError("subscribe", $e);
            return false;
        } finally {
            $this->subRedis = null;
            // 不放回池子,订阅是阻塞连接
        }
    }

    public function psubscribe($channels, callable $onMessage)
    {
        $redis = $this->pool->get();
        $this->subRedis = $redis;
        try {
            return $redis->psubscribe($channels, function ($redis, $pattern, $channel, $message) use ($onMessage) {
                $onMessage($pattern, $redis, $channel, $message);
            });
        } catch (\RedisException $e) {
            $this->logError("psubscribe", $e);
            return false;
        } finally {
            $this->subRedis = null;
            // 不放回池子
        }
    }

    // ------------------ 健康检查 ------------------
    public function ping()
    {
        return $this->exec(fn($r) => $r->ping(), "ping");
    }

    // ------------------ 错误日志 ------------------
    private function logError($method, \RedisException $e)
    {
        if (function_exists('swoole_get_worker_id')) {
            $workerId = swoole_get_worker_id();
            $processInfo = "WorkerID:{$workerId}";
        } else {
            $pid = getmypid();
            $uid = function_exists('posix_getuid') ? posix_getuid() : 'N/A';
            $processInfo = "PID:{$pid}, UID:{$uid}";
        }

        output(sprintf(
            "[RedisCli][%s][%s:%d][db:%d] %s failed: %s",
            $processInfo,
            $this->host,
            $this->port,
            $this->dbIndex,
            $method,
            $e->getMessage()
        ));
    }
}