MongoPool.php 2.0 KB
<?php
namespace Jiaoyin;

use MongoDB\Client;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;

class MongoPool
{
    private $pool;
    private $maxSize;
    private $currentSize;
    private $mongoUri;
    private $options;

    public function __construct($mongoUri, $maxSize=20, $options = [])
    {
        $this->mongoUri = $mongoUri;
        $this->maxSize = $maxSize;
        $this->currentSize = 0;
        $this->options = $options;
        $this->pool = new Channel($maxSize);

        // 初始化连接池
        for ($i = 0; $i < $maxSize; $i++) {
            $this->pool->push($this->createConnection());
        }
    }

    private function createConnection()
    {
        $manager = new Client($this->mongoUri, $this->options);
        return $manager;
    }

    public function get()
    {
        while ($this->pool->isEmpty()){
//            output('Mongodb连接池为空,等待释放连接');
            Coroutine::sleep(0.1);
        }
        $connection = $this->pool->pop();
        if (!$connection) {
            if ($this->currentSize < $this->maxSize) {
                // 如果连接池未满,则创建新连接
                $connection = $this->createConnection();
                $this->currentSize++;
            } else {
                // 连接池已满,等待其他协程释放连接
                $connection = $this->pool->pop();
            }
        }
        return $connection;
    }

    public function push($connection)
    {
        // 检查连接是否有效,这里仅做示例,实际应用中可能需要更复杂的逻辑
        if (is_object($connection) && get_class($connection) === Client::class) {
            $this->pool->push($connection);
        }else{
            $this->currentSize -= 1;
        }
    }

    public function close()
    {
        while (!$this->pool->isEmpty()) {
            $connection = $this->pool->pop();
            if (is_resource($connection)) {
                $connection->close();
            }
        }
        $this->currentSize = 0;
    }
}