作者 zed

优化数据库插件。

... ... @@ -6,6 +6,7 @@ use Swoole\Database\MysqliConfig;
use Swoole\Database\MysqliPool;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Jiaoyin\StreamLogger;
class MysqlCli
{
... ... @@ -18,6 +19,7 @@ class MysqlCli
private int $lastReconnectTime = 0;
private $connectConfig = null;
private $database = null;
private $logger = null;
public function __construct($host, $port, $database, $username, $password, $charset = 'utf8', $prefix = '', $connectCount = 20)
{
... ... @@ -34,6 +36,7 @@ class MysqlCli
'connectCount' => $connectCount,
];
$this->poolConnect();
$this->logger = new StreamLogger(); //默认路径
}
private function poolConnect()
... ... @@ -53,12 +56,17 @@ class MysqlCli
$this->prefix = $this->connectConfig['prefix'];
}
//执行sql
public function execute($action, $sql, float $timeout = 5.0, int $retry = 1)
public function execute($action, $sql, float $timeout = 5.0, int $retry = 1, bool $assoc = false)
{
$ret = false;
$db = $this->pool->get();
retry_label:
$db = null;
$start_time = microtime(true);
try {
$db = $this->pool->get();
if (!$db) {
$this->logger->error('mysql', "数据库连接池耗尽,请检查数据库连接配置...");
throw new \RuntimeException("数据库连接池耗尽");
}
$chan = new Channel(1);
Coroutine::create(function () use ($chan, $db, $sql) {
try {
... ... @@ -69,34 +77,103 @@ class MysqlCli
}
});
$result = $chan->pop($timeout); // 超时控制
/** 等待执行结果(支持超时) */
$result = $chan->pop($timeout);
/** ---------- 处理超时 ---------- */
if ($result === false) {
// 超时
output("SQL超时: {$sql}, 超时时间 {$timeout}s");
$this->logger->error('mysql', "SQL超时: {$timeout}s, SQL: {$sql}");
if ($retry > 0) {
output("超时后自动重试一次: {$sql}");
$this->pool->put($db);
return $this->execute($action, $sql, $timeout, $retry - 1);
$this->logger->error('mysql', "执行超时,SQL:{$sql} 正在销毁连接并重试...");
// 1. 永远不要把超时的连接放回池
if ($db) {
@$db->close(); // 强制关闭
// 重要:告诉连接池不要再使用这个连接
$db = null;
}
// . 重建一个新连接补入池中
$retry--;
goto retry_label;
}
} elseif ($result instanceof \Throwable) {
output("执行SQL异常: {$sql}");
output("异常信息: " . $result->getMessage());
} else {
if ($action === 'SELECT') {
$ret = $result->fetch_all();
return false;
}
/** ---------- 处理异常(断开 / gone away / server has gone away) ---------- */
if ($result instanceof \Throwable) {
$msg = $result->getMessage();
$this->logger->error('mysql', "执行异常: {$msg}; SQL: {$sql}");
$needReconnect = false;
if (
stripos($msg, 'gone away') !== false ||
stripos($msg, 'Lost connection') !== false ||
stripos($msg, 'server has gone away') !== false ||
stripos($msg, 'server closed') !== false ||
stripos($msg, 'Connection reset') !== false
) {
$needReconnect = true;
}
if ($needReconnect && $retry > 0) {
$this->logger->error('mysql', "数据库连接断开,SQL:{$sql}自动重连并重试 SQL...");
if ($db) {
try {
$db->close(); // 强制关闭
// 重要:告诉连接池不要再使用这个连接
$db = null;
} catch (\Throwable $e) {
$this->logger->error('mysql', "db close 失败,SQL:{$sql}忽略");
}
}
$this->poolConnect();
$retry--;
goto retry_label;
}
return false;
}
/** ---------- 成功执行 ---------- */
if ($action === 'SELECT') {
if ($assoc) {
$rows = $result->fetch_all(MYSQLI_ASSOC);
} else {
$ret = $result;
$rows = $result->fetch_all();
}
$result->free();
return $rows;
}
return $result;
} catch (\Throwable $e) {
$this->logger->error('mysql', "execute() SQL:{$sql}致命异常: " . $e->getMessage());
if ($retry > 0) {
$this->logger->error('mysql', "致命异常:SQL:{$sql}尝试重连并重试");
if ($db) {
@$db->close(); // 强制关闭
$db = null;
}
$retry--;
goto retry_label;
}
return false;
} finally {
$end_time = microtime(true);
$count_time = $end_time - $start_time;
if ($count_time > 0.5) {
$this->logger->log('mysql', "SQL执行时间: " . $count_time . "s, SQL: {$sql}");
}
if ($db) {
$this->pool->put($db);
try {
$this->pool->put($db);
} catch (\Throwable $e) {
$this->logger->error('mysql', "连接 put 回池失败(可能已断开),SQL:{$sql}忽略");
}
}
}
return $ret;
}
//插入数据
public function insert($table, $data)
{
... ... @@ -114,14 +191,7 @@ class MysqlCli
if (!$sql) {
return false;
}
$data = $this->execute('SELECT', $sql);
// 如果没有数据
if (!$data) return [];
$newData = [];
// ✅ 判断是否包含聚合函数
// 判断是否包含聚合函数
$isAggregate = false;
foreach ($col as $c) {
if (stripos($c, 'sum(') !== false || stripos($c, 'count(') !== false || stripos($c, 'avg(') !== false || stripos($c, 'max(') !== false || stripos($c, 'min(') !== false) {
... ... @@ -133,23 +203,37 @@ class MysqlCli
if ($isAggregate && empty($groupBy)) {
// 没有 group by 的单行聚合查询
$db = $this->pool->get();
$stmt = $db->query($sql);
$result = $stmt->fetch_assoc();
$this->pool->put($db);
return [$result];
try {
$stmt = $db->query($sql);
$result = $stmt->fetch_assoc();
$stmt->free(); // 释放结果集,防止Commands out of sync错误
return [$result];
} finally {
$this->pool->put($db);
}
} elseif ($isAggregate && !empty($groupBy)) {
// 有 group by 的聚合查询,多行返回
$db = $this->pool->get();
$stmt = $db->query($sql);
$results = [];
while ($row = $stmt->fetch_assoc()) {
$results[] = $row;
try {
$stmt = $db->query($sql);
$results = [];
while ($row = $stmt->fetch_assoc()) {
$results[] = $row;
}
$stmt->free(); // 释放结果集,防止Commands out of sync错误
return $results;
} finally {
$this->pool->put($db);
}
$this->pool->put($db);
return $results;
}
// ✅ 普通查询才查字段结构
// 非聚合查询,使用execute方法获取数据
$data = $this->execute('SELECT', $sql);
// 如果没有数据
if (!$data) return [];
// 普通查询才查字段结构
if (empty($col)) {
$newsql = 'SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = "' . $this->database . '" AND TABLE_NAME="' . $this->parseTable($table) . '" ORDER BY `ORDINAL_POSITION` ASC';
$coldata = $this->execute('SELECT', $newsql);
... ... @@ -196,7 +280,7 @@ class MysqlCli
{
$table = $this->parseTable($table);
if (!is_array($data)) {
output('insert data 不合法');
$this->logger->error('mysql', 'insert data 不合法');
return false;
}
$keys = array_keys($data);
... ... @@ -216,7 +300,7 @@ class MysqlCli
$table = $this->parseTable($table);
if (!empty($col) && !is_array($col)) {
output('select where col 不合法');
$this->logger->error('mysql', 'select where col 不合法');
return false;
}
... ... @@ -246,7 +330,7 @@ class MysqlCli
$tmp = '';
foreach ($where as $key => $value) {
if (!is_array($value) && !isset($value[1])) {
output('where 不合法');
$this->logger->error('mysql', 'where 不合法');
var_dump($where);
return false;
}
... ... @@ -265,7 +349,8 @@ class MysqlCli
}
$whereTxt .= $tmp;
} else {
output('where 不合法');
$this->logger->error('mysql', 'where 不合法');
var_dump($where);
return false;
}
... ... @@ -277,7 +362,8 @@ class MysqlCli
{
$table = $this->parseTable($table);
if (!is_array($data) || empty($data)) {
output('更新数据不能为空');
$this->logger->error('mysql', '更新数据不能为空');
return false;
}
$setTxt = 'set ';
... ... @@ -319,4 +405,28 @@ class MysqlCli
}
return $valueNew;
}
// 删除数据
public function delete($table, $where = [])
{
$sql = $this->parseDelete($table, $where);
if (!$sql) {
return false;
}
return $this->execute('DELETE', $sql);
}
// 解析 delete
private function parseDelete($table, $where)
{
$table = $this->parseTable($table);
$whereTxt = $this->parseWhere($where);
if (empty($whereTxt)) {
$this->logger->error('mysql', 'delete 必须指定 where 条件,防止全表删除');
return false;
}
$sql = "delete from {$table} {$whereTxt}";
return $sql;
}
}
... ...