PHP Goroutine 实现示例(基于 Fiber)

基于 PHP 8.1+ Fiber 和事件循环的非阻塞 IO 设计,包含一个 Goroutine 调度器 和一个 go 函数,和 Golang 类似,不需要依赖生成器函数和 yield 显式切换上下文,不会改变原函数的返回值(没有函数颜色),并且可以直接获取协程返回值。

<?php

/**
 * An AsyncTask represents a stateful asynchronous task.
 */
class AsyncTask
{
    private mixed $result;
    private Throwable | null $error = null;
    private $resolved = false;

    public function setResult($result): void
    {
        $this->result = $result;
        $this->resolved = true;
    }

    public function setError(Throwable $error): void
    {
        $this->error = $error;
        $this->resolved = true;
    }

    public function getResult()
    {
        while (!$this->resolved) {
            // Yield control to the scheduler
            Fiber::suspend();
        }

        if ($this->error) {
            throw $this->error;
        } else {
            return $this->result;
        }
    }
}

/**
 * A simple goroutine scheduler.
 */
class Goroutine
{
    private static ?Goroutine $instance = null;
    private array $jobs = [];
    private array $streams = [];
    private bool $isRunning = false;

    private function __construct() // Prevent instantiation from outside
    {
    }

    /**
     * Starts the scheduler and runs the main function in a goroutine.
     */
    public static function main(callable $fn): void
    {
        if (self::$instance !== null) {
            throw new Exception("Goroutine already started", 1);
        }


        self::$instance = new Goroutine();
        self::$instance->run($fn);
    }

    /**
     * Runs the given job in a goroutine.
     */
    public static function go(callable $job): AsyncTask
    {
        if (self::$instance === null) {
            throw new Exception("Goroutine not started, make sure the application runs inside the Goroutine::main()", 1);
        }

        $task = new AsyncTask();
        self::$instance->addJob(function () use ($job, $task) {
            try {
                $result = $job();
                $task->setResult($result);
            } catch (Throwable $e) {
                $task->setError($e);
            }
        });
        return $task;
    }

    /**
     * Waits for a stream to be ready for reading or writing. When the stream is
     * ready to be read, the function returns `1`. When the stream is ready to be
     * written, the function returns `2`. When an error occurs, the function
     * returns `3`.
     */
    public static function wait($stream): int
    {
        $ins = self::$instance;
        $fd = (int)$stream;
        if (!array_key_exists($fd, $ins->streams)) {
            $fiber = Fiber::getCurrent();

            if (!$fiber->isStarted()) {
                $fiber->start();
            }

            if (!$fiber->isTerminated()) {
                $ins->streams[$fd] = [$stream, $fiber];
            }
        }

        while (true) {
            $pair = Fiber::suspend();
            $state = $pair[0];
            $_stream = $pair[1];

            // A fiber can be bound to multiple streams, so we need to check if
            // the current active stream is the one we are waiting for.
            if ($stream === $_stream) {
                return $state;
            }
        }
    }

    private function addJob(callable $job): void
    {
        $fiber = new Fiber($job);
        array_push($this->jobs, $fiber);
        $fiber->start();
    }

    private function run(callable $main): void
    {
        if ($this->isRunning) {
            return;
        }
        $this->isRunning = true;
        $this->addJob($main);

        while (true) {
            // Prepare streams for stream_select
            $read = array_map(fn ($item) => $item[0], $this->streams);
            $write = [...$read];
            $except = [...$read];
            $timeout = 0; // No blocking
            $hasStreamsEvents = false;

            // Use stream_select to monitor streams
            if ((!empty($read) || !empty($write) || !empty($except)) &&
                stream_select($read, $write, $except, $timeout) > 0
            ) {
                $hasStreamsEvents = true;
                foreach ($read as $stream) {
                    $fd = (int)$stream;
                    $fiber = array_key_exists($fd, $this->streams) ? $this->streams[$fd][1] : null;
                    if ($fiber && $fiber->isSuspended()) {
                        $fiber->resume([1, $stream]);
                        if ($fiber->isTerminated()) {
                            unset($this->streams[$fd]);
                        }
                    }
                }
                foreach ($write as $stream) {
                    $fd = (int)$stream;
                    $fiber = array_key_exists($fd, $this->streams) ? $this->streams[$fd][1] : null;
                    if ($fiber && $fiber->isSuspended()) {
                        $fiber->resume([2, $stream]);
                        if ($fiber->isTerminated()) {
                            unset($this->streams[$fd]);
                        }
                    }
                }
                foreach ($except as $stream) {
                    $fd = (int)$stream;
                    $fiber = array_key_exists($fd, $this->streams) ? $this->streams[$fd][1] : null;
                    if ($fiber && $fiber->isSuspended()) {
                        $fiber->resume([3, $stream]);
                        if ($fiber->isTerminated()) {
                            unset($this->streams[$fd]);
                        }
                    }
                }

                foreach ($this->streams as $fd => [$stream, $fiber]) {
                    if (!is_resource($stream)) { // Stream closed
                        unset($this->streams[$fd]);
                    }
                }
            }

            foreach ($this->jobs as $i => $fiber) {
                if ($fiber->isTerminated()) {
                    unset($this->jobs[$i]);
                } elseif (!$hasStreamsEvents && $fiber->isSuspended()) {
                    $fiber->resume([0, null]);
                }
            }

            if (empty($this->jobs)) {
                break;
            } elseif (!$hasStreamsEvents) {
                usleep(1000); // Prevent CPU hogging
            }
        }
    }
}

/**
 * A shorthand of `Goroutine::go()`.
 */
function go(callable $fn)
{
    return Goroutine::go($fn);
}

/**
 * Non-blocking HTTP GET request.
 */
function httpGet($url): string
{

    $parts = parse_url($url);
    $host = $parts['host'];
    $port = isset($parts['port']) ? $parts['port'] : 80;
    $path = isset($parts['path']) ? $parts['path'] : '/';
    $request = "GET $path HTTP/1.1\r\nHost: $host\r\nConnection: close\r\n\r\n";

    $socket = stream_socket_client(
        "tcp://$host:$port",
        $errno,
        $errstr,
        30,
        STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT
    );
    if (!$socket) {
        throw new Exception("Unable to connect to $host:$port. Error: $errstr ($errno)");
    }

    stream_set_blocking($socket, false); # Non-blocking

    $response = '';
    $written = false;

    while (!feof($socket)) {
        // Wait for the socket to be ready for reading or writing.
        $state = Goroutine::wait($socket);

        if (!$written && $state === 2) {
            if (fwrite($socket, $request)) {
                $written = true;
            }
        } elseif ($written && $state === 1) {
            $response .= fread($socket, 8192);
        }
    }

    fclose($socket);
    return $response;
}


Goroutine::main(function () {
    // These tow requests are sequential, but they are non-blocking.
    $res1 = httpGet('https://example.com');
    $res2 = httpGet('https://ayon.li/');
    echo substr($res1, 0, 200) . "\n\n" . substr($res2, 0, 200) . "\n\n";

    // These two tasks are concurrent and non-blocking.
    $task1 = go(function () {
        $res1 = httpGet('https://example.com');
        return substr($res1, 0, 200);
    });
    $task2 = go(function () {
        $res2 = httpGet('https://ayon.li/');
        return substr($res2, 0, 200);
    });
    echo $task1->getResult() . "\n\n" . $task2->getResult() . "\n\n";

    // No more tasks to run, the program will exit here.
});
PHP

Leave a comment