
PHP异步处理与消息队列实战Web应用中有很多耗时操作不适合在请求中同步执行比如发送邮件、生成报表、处理图片等。把这些任务放到消息队列中异步处理可以大大提升用户体验。今天说说PHP中的异步处理方案和消息队列的使用。先说一个简单但有效的异步处理方法——在请求结束后继续执行。php// 使用fastcgi_finish_request实现异步function asyncTask(callable $task): void{if (function_exists(fastcgi_finish_request)) {$task();} else {register_shutdown_function($task);}}echo 请求开始处理...\n;// 注册一个异步任务asyncTask(function () {sleep(2);file_put_contents(/tmp/async.log, date(Y-m-d H:i:s) . 异步任务执行完成\n, FILE_APPEND);});echo 请求响应已发送\n;// 如果有fastcgi_finish_request调用后响应会立即发送给客户端if (function_exists(fastcgi_finish_request)) {fastcgi_finish_request();}?用MySQL做简单的消息队列phpclass MySQLQueue{private PDO $pdo;private string $table;public function __construct(PDO $pdo, string $table jobs){$this-pdo $pdo;$this-table $table;$this-initTable();}private function initTable(): void{$this-pdo-exec(CREATE TABLE IF NOT EXISTS {$this-table} (id BIGINT AUTO_INCREMENT PRIMARY KEY,queue VARCHAR(50) NOT NULL DEFAULT default,payload TEXT NOT NULL,status ENUM(pending, processing, completed, failed) DEFAULT pending,attempts INT DEFAULT 0,max_attempts INT DEFAULT 3,available_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,completed_at TIMESTAMP NULL,INDEX idx_queue_status (queue, status),INDEX idx_available (available_at)) ENGINEInnoDB DEFAULT CHARSETutf8mb4);}public function push(string $queue, array $data, int $delay 0): int{$availableAt date(Y-m-d H:i:s, time() $delay);$stmt $this-pdo-prepare(INSERT INTO {$this-table} (queue, payload, available_at)VALUES (?, ?, ?));$stmt-execute([$queue, json_encode($data), $availableAt]);return (int)$this-pdo-lastInsertId();}public function pop(string $queue, int $timeout 5): ?array{$start time();while (time() - $start $timeout) {$this-pdo-beginTransaction();$stmt $this-pdo-prepare(SELECT * FROM {$this-table}WHERE queue ? AND status pending AND available_at NOW()ORDER BY id ASCLIMIT 1FOR UPDATE);$stmt-execute([$queue]);$job $stmt-fetch(PDO::FETCH_ASSOC);if ($job) {$stmt $this-pdo-prepare(UPDATE {$this-table} SET status processing, attempts attempts 1WHERE id ?);$stmt-execute([$job[id]]);$this-pdo-commit();return $job;}$this-pdo-commit();usleep(200000); // 200ms}return null;}public function complete(int $id): void{$stmt $this-pdo-prepare(UPDATE {$this-table} SET status completed, completed_at NOW()WHERE id ?);$stmt-execute([$id]);}public function fail(int $id): void{$stmt $this-pdo-prepare(UPDATE {$this-table} SET status failed WHERE id ?);$stmt-execute([$id]);}public function retryFailed(string $queue): int{$stmt $this-pdo-prepare(UPDATE {$this-table} SET status pending, available_at NOW()WHERE queue ? AND status failed AND attempts max_attempts);$stmt-execute([$queue]);return $stmt-rowCount();}public function size(string $queue): int{$stmt $this-pdo-prepare(SELECT COUNT(*) FROM {$this-table}WHERE queue ? AND status pending);$stmt-execute([$queue]);return (int)$stmt-fetchColumn();}public function worker(string $queue, callable $handler): void{echo Worker启动监听队列: $queue\n;while (true) {try {$job $this-pop($queue);if ($job null) {sleep(1);continue;}echo 处理任务 #{$job[id]}\n;$payload json_decode($job[payload], true);try {$handler($payload);$this-complete($job[id]);echo 任务 #{$job[id]} 完成\n;} catch (Exception $e) {echo 任务 #{$job[id]} 失败: {$e-getMessage()}\n;if ($job[attempts] $job[max_attempts]) {$this-fail($job[id]);} else {$this-release($job[id], 60);}}} catch (Exception $e) {echo Worker错误: {$e-getMessage()}\n;sleep(5);}// 检查内存if (memory_get_usage(true) 128 * 1024 * 1024) {echo 内存超限退出\n;break;}}}private function release(int $id, int $delay): void{$availableAt date(Y-m-d H:i:s, time() $delay);$stmt $this-pdo-prepare(UPDATE {$this-table} SET status pending, available_at ?WHERE id ?);$stmt-execute([$availableAt, $id]);}}$pdo new PDO(mysql:hostlocalhost;dbnametest, root, );$queue new MySQLQueue($pdo);// 推送任务$queue-push(emails, [to usertest.com, subject 欢迎注册, body Hello!]);$queue-push(emails, [to admintest.com, subject 新用户注册, body 新用户已注册]);$queue-push(reports, [type daily, date 2024-01-15]);echo 队列大小: emails . $queue-size(emails) . , reports . $queue-size(reports) . \n;?用Redis做消息队列性能更好支持优先级和延迟消息phpclass RedisQueue{private Redis $redis;private string $prefix queue:;public function __construct(Redis $redis){$this-redis $redis;}public function push(string $queue, array $data, int $delay 0): string{$jobId uniqid(job_, true);$payload json_encode([id $jobId,data $data,created_at time(),attempts 0,]);if ($delay 0) {$this-redis-zAdd($this-prefix . delayed:{$queue}, time() $delay, $payload);} else {$this-redis-lPush($this-prefix . $queue, $payload);}return $jobId;}public function later(string $queue, array $data, int $delay): string{return $this-push($queue, $data, $delay);}public function pop(string $queue, int $timeout 5): ?array{$result $this-redis-brPop([$this-prefix . $queue], $timeout);if ($result null) return null;$payload json_decode($result[1], true);if ($payload null) return null;$payload[attempts];return $payload;}public function size(string $queue): int{return $this-redis-lLen($this-prefix . $queue);}private function migrateDelayed(string $queue): int{$now time();$delayedKey $this-prefix . delayed:{$queue};$jobs $this-redis-zRangeByScore($delayedKey, 0, $now);$count 0;foreach ($jobs as $job) {$this-redis-lPush($this-prefix . $queue, $job);$this-redis-zRem($delayedKey, $job);$count;}return $count;}}?消息队列让耗时操作异步执行不阻塞主请求。选择合适的队列后端很重要。MySQL队列简单但性能有限Redis队列性能好功能完善RabbitMQ和Kafka适合大型分布式系统。对于大部分PHP项目来说Redis队列已经足够了。