基于 PHP 8.1+ Fiber 和事件循环的非阻塞 IO 设计,包含一个 Goroutine 调度器 和一个 go
函数,和 Golang 类似,不需要依赖生成器函数和 yield
显式切换上下文,不会改变原函数的返回值(没有函数颜色),并且可以直接获取协程返回值。
<?php
/**
* An AsyncTask represents a stateful asynchronous task.
*/
class AsyncTask
{
private mixed $result;
private Throwable | null $error = null;
private $resolved = false;
public function setResult($result): void
{
$this->result = $result;
$this->resolved = true;
}
public function setError(Throwable $error): void
{
$this->error = $error;
$this->resolved = true;
}
public function getResult()
{
while (!$this->resolved) {
// Yield control to the scheduler
Fiber::suspend();
}
if ($this->error) {
throw $this->error;
} else {
return $this->result;
}
}
}
/**
* A simple goroutine scheduler.
*/
class Goroutine
{
private static ?Goroutine $instance = null;
private array $jobs = [];
private array $streams = [];
private bool $isRunning = false;
private function __construct() // Prevent instantiation from outside
{
}
/**
* Starts the scheduler and runs the main function in a goroutine.
*/
public static function main(callable $fn): void
{
if (self::$instance !== null) {
throw new Exception("Goroutine already started", 1);
}
self::$instance = new Goroutine();
self::$instance->run($fn);
}
/**
* Runs the given job in a goroutine.
*/
public static function go(callable $job): AsyncTask
{
if (self::$instance === null) {
throw new Exception("Goroutine not started, make sure the application runs inside the Goroutine::main()", 1);
}
$task = new AsyncTask();
self::$instance->addJob(function () use ($job, $task) {
try {
$result = $job();
$task->setResult($result);
} catch (Throwable $e) {
$task->setError($e);
}
});
return $task;
}
/**
* Waits for a stream to be ready for reading or writing. When the stream is
* ready to be read, the function returns `1`. When the stream is ready to be
* written, the function returns `2`. When an error occurs, the function
* returns `3`.
*/
public static function wait($stream): int
{
$ins = self::$instance;
$fd = (int)$stream;
if (!array_key_exists($fd, $ins->streams)) {
$fiber = Fiber::getCurrent();
if (!$fiber->isStarted()) {
$fiber->start();
}
if (!$fiber->isTerminated()) {
$ins->streams[$fd] = [$stream, $fiber];
}
}
while (true) {
$pair = Fiber::suspend();
$state = $pair[0];
$_stream = $pair[1];
// A fiber can be bound to multiple streams, so we need to check if
// the current active stream is the one we are waiting for.
if ($stream === $_stream) {
return $state;
}
}
}
private function addJob(callable $job): void
{
$fiber = new Fiber($job);
array_push($this->jobs, $fiber);
$fiber->start();
}
private function run(callable $main): void
{
if ($this->isRunning) {
return;
}
$this->isRunning = true;
$this->addJob($main);
while (true) {
// Prepare streams for stream_select
$read = array_map(fn ($item) => $item[0], $this->streams);
$write = [...$read];
$except = [...$read];
$timeout = 0; // No blocking
$hasStreamsEvents = false;
// Use stream_select to monitor streams
if ((!empty($read) || !empty($write) || !empty($except)) &&
stream_select($read, $write, $except, $timeout) > 0
) {
$hasStreamsEvents = true;
foreach ($read as $stream) {
$fd = (int)$stream;
$fiber = array_key_exists($fd, $this->streams) ? $this->streams[$fd][1] : null;
if ($fiber && $fiber->isSuspended()) {
$fiber->resume([1, $stream]);
if ($fiber->isTerminated()) {
unset($this->streams[$fd]);
}
}
}
foreach ($write as $stream) {
$fd = (int)$stream;
$fiber = array_key_exists($fd, $this->streams) ? $this->streams[$fd][1] : null;
if ($fiber && $fiber->isSuspended()) {
$fiber->resume([2, $stream]);
if ($fiber->isTerminated()) {
unset($this->streams[$fd]);
}
}
}
foreach ($except as $stream) {
$fd = (int)$stream;
$fiber = array_key_exists($fd, $this->streams) ? $this->streams[$fd][1] : null;
if ($fiber && $fiber->isSuspended()) {
$fiber->resume([3, $stream]);
if ($fiber->isTerminated()) {
unset($this->streams[$fd]);
}
}
}
foreach ($this->streams as $fd => [$stream, $fiber]) {
if (!is_resource($stream)) { // Stream closed
unset($this->streams[$fd]);
}
}
}
foreach ($this->jobs as $i => $fiber) {
if ($fiber->isTerminated()) {
unset($this->jobs[$i]);
} elseif (!$hasStreamsEvents && $fiber->isSuspended()) {
$fiber->resume([0, null]);
}
}
if (empty($this->jobs)) {
break;
} elseif (!$hasStreamsEvents) {
usleep(1000); // Prevent CPU hogging
}
}
}
}
/**
* A shorthand of `Goroutine::go()`.
*/
function go(callable $fn)
{
return Goroutine::go($fn);
}
/**
* Non-blocking HTTP GET request.
*/
function httpGet($url): string
{
$parts = parse_url($url);
$host = $parts['host'];
$port = isset($parts['port']) ? $parts['port'] : 80;
$path = isset($parts['path']) ? $parts['path'] : '/';
$request = "GET $path HTTP/1.1\r\nHost: $host\r\nConnection: close\r\n\r\n";
$socket = stream_socket_client(
"tcp://$host:$port",
$errno,
$errstr,
30,
STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT
);
if (!$socket) {
throw new Exception("Unable to connect to $host:$port. Error: $errstr ($errno)");
}
stream_set_blocking($socket, false); # Non-blocking
$response = '';
$written = false;
while (!feof($socket)) {
// Wait for the socket to be ready for reading or writing.
$state = Goroutine::wait($socket);
if (!$written && $state === 2) {
if (fwrite($socket, $request)) {
$written = true;
}
} elseif ($written && $state === 1) {
$response .= fread($socket, 8192);
}
}
fclose($socket);
return $response;
}
Goroutine::main(function () {
// These tow requests are sequential, but they are non-blocking.
$res1 = httpGet('https://example.com');
$res2 = httpGet('https://ayon.li/');
echo substr($res1, 0, 200) . "\n\n" . substr($res2, 0, 200) . "\n\n";
// These two tasks are concurrent and non-blocking.
$task1 = go(function () {
$res1 = httpGet('https://example.com');
return substr($res1, 0, 200);
});
$task2 = go(function () {
$res2 = httpGet('https://ayon.li/');
return substr($res2, 0, 200);
});
echo $task1->getResult() . "\n\n" . $task2->getResult() . "\n\n";
// No more tasks to run, the program will exit here.
});
PHP