Skip to content

A modern, type-safe wrapper for PHP's parallel extension API for parallel programming.

License

Notifications You must be signed in to change notification settings

duyler/parallel

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Parallel Extension Wrapper

A modern, type-safe wrapper for PHP's parallel extension providing a clean object-oriented API for parallel programming.

Requirements

  • PHP 8.4 or higher with ZTS (Zend Thread Safety) enabled
  • ext-parallel installed and enabled

Installation

composer require duyler/parallel

Features

  • Type-safe interfaces for all components
  • Exception handling with custom exception hierarchy
  • Clean OOP API following SOLID principles
  • Full support for Runtime, Future, Channel, and Events
  • Comprehensive test coverage

Quick Start

Basic Runtime Usage

use Duyler\Parallel\Runtime;

$runtime = new Runtime();

$future = $runtime->run(function () {
    return 42;
});

$result = $future->value();
echo $result; // 42

$runtime->close();

Using the Facade

use Duyler\Parallel\Parallel;

$future = Parallel::run(function ($x, $y) {
    return $x + $y;
}, [10, 20]);

echo $future->value(); // 30

Using Runtime Pool

use Duyler\Parallel\RuntimePool;

$pool = new RuntimePool(maxRuntimes: 4);

$futures = [];
for ($i = 0; $i < 10; $i++) {
    $futures[] = $pool->run(fn($n) => $n * $n, [$i]);
}

foreach ($futures as $future) {
    echo $future->value() . PHP_EOL;
}

$pool->closeAll();

Using Workflow Builder

use Duyler\Parallel\WorkflowBuilder;

$result = (new WorkflowBuilder())
    ->addChannel('input', 10)
    ->addTask('task1', fn($ch) => processTask1($ch), [$inputChannel])
    ->addTask('task2', fn($ch) => processTask2($ch), [$inputChannel])
    ->execute();

$results = $result->waitAll();
$result->closeAll();

Working with Channels

use Duyler\Parallel\Channel;
use Duyler\Parallel\Runtime;

$channel = Channel::createBuffered(10);

$runtime = new Runtime();
$future = $runtime->run(function ($ch) {
    for ($i = 0; $i < 5; $i++) {
        $ch->send($i * $i);
    }
}, [$channel]);

for ($i = 0; $i < 5; $i++) {
    echo $channel->recv() . PHP_EOL;
}

$runtime->close();

Event Loop

use Duyler\Parallel\Events;
use Duyler\Parallel\Runtime;

$runtime1 = new Runtime();
$runtime2 = new Runtime();

$future1 = $runtime1->run(function () {
    return 'task1';
});

$future2 = $runtime2->run(function () {
    return 'task2';
});

$events = new Events();
$events->addFuture('f1', $future1);
$events->addFuture('f2', $future2);

while ($event = $events->poll()) {
    echo "Task {$event->source} completed with: {$event->value}" . PHP_EOL;
}

Named Channels

use Duyler\Parallel\Channel;

$channel1 = Channel::make('my-channel', 10);
$channel2 = Channel::open('my-channel');

$channel1->send('Hello from channel 1');
echo $channel2->recv(); // Hello from channel 1

API Documentation

Runtime

Creates and manages a parallel execution context.

$runtime = new Runtime(?string $bootstrap = null);
$future = $runtime->run(Closure $task, array $argv = []): FutureInterface;
$runtime->close(): void;
$runtime->kill(): void;

RuntimePool

Manages a pool of Runtime instances for better performance.

$pool = new RuntimePool(int $maxRuntimes = 4, ?string $bootstrap = null);
$future = $pool->run(Closure $task, array $argv = []): FutureInterface;
$pool->closeAll(): void;
$pool->killAll(): void;
$pool->getSize(): int;
$pool->getMaxSize(): int;

WorkflowBuilder

Builds complex workflows with multiple tasks and channels.

$builder = new WorkflowBuilder();
$builder->withBootstrap(string $bootstrap): WorkflowBuilder;
$builder->withRuntime(RuntimeInterface $runtime): WorkflowBuilder;
$builder->addTask(string $name, Closure $task, array $argv = []): WorkflowBuilder;
$builder->addChannel(string $name, int $capacity = 0): WorkflowBuilder;
$result = $builder->execute(): WorkflowResult;

// WorkflowResult methods
$result->getFuture(string $name): ?FutureInterface;
$result->getChannel(string $name): ?ChannelInterface;
$result->getFutures(): array;
$result->getChannels(): array;
$result->waitAll(): array;
$result->closeAll(): void;

Future

Represents the result of a parallel task.

$value = $future->value(): mixed;
$isDone = $future->done(): bool;
$isCancelled = $future->cancelled(): bool;
$cancelled = $future->cancel(): bool;

Channel

Bidirectional communication channel between tasks.

$channel = Channel::create(): ChannelInterface;
$channel = Channel::createBuffered(int $capacity): ChannelInterface;
$channel = Channel::make(string $name, int $capacity = 0): ChannelInterface;
$channel = Channel::open(string $name): ChannelInterface;

$channel->send(mixed $value): void;
$value = $channel->recv(): mixed;
$channel->close(): void;

Events

Event loop for monitoring multiple Futures and Channels.

$events = new Events();
$events->addFuture(string $name, FutureInterface $future): void;
$events->addChannel(ChannelInterface $channel): void;
$events->remove(string $name): void;
$events->setBlocking(bool $blocking): void;
$events->setTimeout(int $timeout): void;
$event = $events->poll(): ?Event;

Event

Result from Events::poll().

$event->type: Type; // Read, Write, Close, Cancel, Kill, Error
$event->source: string;
$event->object: object; // Future or Channel
$event->value: mixed;

Exception Handling

All exceptions from ext-parallel are wrapped in custom exception classes:

  • ParallelException - Base exception class
  • CancellationException - Task was cancelled
  • ClosedException - Runtime or Channel was closed
  • ForeignException - Exception thrown in parallel task
  • IllegalValueException - Invalid or non-serializable value
  • TimeoutException - Operation timed out
  • BootstrapException - Bootstrap file error
use Duyler\Parallel\Exception\ForeignException;
use Duyler\Parallel\Runtime;

try {
    $runtime = new Runtime();
    $future = $runtime->run(function () {
        throw new \Exception('Error in task');
    });
    $future->value();
} catch (ForeignException $e) {
    echo "Task failed: " . $e->getMessage();
}

Type Safety

All components implement interfaces for better type safety and testability:

  • RuntimeInterface
  • FutureInterface
  • ChannelInterface
  • EventsInterface
  • WrapperInterface
use Duyler\Parallel\Contract\RuntimeInterface;

function processTask(RuntimeInterface $runtime): void {
    $future = $runtime->run(function () {
        return 'result';
    });
    echo $future->value();
}

Testing

Run tests using PHPUnit:

vendor/bin/phpunit

Run static analysis:

vendor/bin/psalm

Run code style fixer:

vendor/bin/php-cs-fixer fix

Important Notes

  1. PHP must be compiled with ZTS (Zend Thread Safety) support
  2. ext-parallel must be installed and enabled
  3. Closures passed to parallel tasks cannot use yield, declare classes, or declare named functions
  4. Values passed between tasks must be serializable
  5. Always close Runtime instances when done to free resources

Checking ZTS Support

php -v | grep ZTS

If ZTS is not shown, you need to recompile PHP with the --enable-zts flag.

License

MIT License. See LICENSE file for details.

Links

About

A modern, type-safe wrapper for PHP's parallel extension API for parallel programming.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages