Skip to content

Instantly share code, notes, and snippets.

@nmeri17
Created July 13, 2022 18:02
Show Gist options
  • Save nmeri17/082d6ca1f82b048c13b25902d250584d to your computer and use it in GitHub Desktop.
Save nmeri17/082d6ca1f82b048c13b25902d250584d to your computer and use it in GitHub Desktop.
<?php
namespace Tilwa\Modules;
use Tilwa\Contracts\{Presentation\BaseRenderer, Queues\Adapter as QueueAdapter};
use Tilwa\Adapters\Queues\BoltDbQueue;
use Spiral\RoadRunner\{Worker, Http\PSR7Worker, Environment\Mode};
use Nyholm\Psr7\Factory\Psr17Factory;
use Symfony\Bridge\PsrHttpMessage\Factory\PsrHttpFactory;
use Symfony\Component\HttpFoundation\Response as SymfonyResponse;
use Psr\Http\Message\{ServerRequestInterface, ResponseInterface};
use Throwable;
/**
* RoadRunner will spin this up multiple times for each worker it has to create to service a request type
*/
class ModuleWorkerAccessor {
private $handlerIdentifier, $httpWorker, $queueWorker, $mode;
public function __construct (ModuleHandlerIdentifier $handlerIdentifier) {
$this->handlerIdentifier = $handlerIdentifier;
}
public function setWorkerMode (string $mode):void {
$this->mode = $mode;
}
protected function isTaskMode ():bool {
return $this->mode === Mode::MODE_JOBS;
}
public function setActiveWorker ():self {
if ($this->isTaskMode())
$this->queueWorker = $this->handlerIdentifier->firstContainer()
->getClass(QueueAdapter::class);
else {
$psrFactory = new Psr17Factory;
$this->httpWorker = new PSR7Worker(
Worker::create(), $psrFactory, $psrFactory, $psrFactory
);
}
return $this;
}
public function buildIdentifier ():self {
$this->handlerIdentifier->bootModules();
$this->handlerIdentifier->extractFromContainer();
return $this;
}
public function openEventLoop ():void {
if ($this->isTaskMode())
$this->queueWorker->processTasks();
else $this->processHttpTasks();
}
protected function processHttpTasks ():void {
while ($newRequest = $this->httpWorker->waitRequest()) {
try {
$this->flushResponse($newRequest);
}
catch (Throwable $exception) { // only roadRunner specific errors are expected here, since our own errors are fully handled internally
$this->httpWorker->getWorker()
->error($exception->getMessage());
}
}
}
protected function flushResponse (?ServerRequestInterface $incomingRequest):void {
$this->handlerIdentifier->setRequestPath(
$incomingRequest->getRequestTarget()
); // this depends on stdInputReader, so it's assumed that headers are equally set, possibly from here
$this->handlerIdentifier->diffuseSetResponse(false);
$psrResponse = $this->getPsrResponse(
$this->handlerIdentifier->underlyingRenderer()
);
$this->httpWorker->respond($psrResponse);
}
protected function getPsrResponse (BaseRenderer $renderer):ResponseInterface {
$symfonyResponse = new SymfonyResponse(
$renderer->render(), $renderer->getStatusCode(),
$renderer->getHeaders()
);
$psr17Factory = new Psr17Factory;
$psrHttpFactory = new PsrHttpFactory($psr17Factory, $psr17Factory, $psr17Factory, $psr17Factory);
return $psrHttpFactory->createResponse($symfonyResponse);
}
}
?>
version: "2.7"
rpc:
# TCP address:port for listening.
#
# Default: "tcp://127.0.0.1:6001"
listen: tcp://127.0.0.1:6001
server:
command: "php suphle-test-worker.php" # This file should be identical to dev-rr.yaml, with the exception of this line
http:
address: localhost:8080
pool:
num_workers: 4
debug: true
# Automatically detect PHP file changes and reload connected services (docs:
# https://roadrunner.dev/docs/beep-beep-reload). Drop this section for this feature disabling.
reload:
# Sync interval.
#
# Default: "1s"
interval: 1s
# Global patterns to sync.
#
# Default: [".php"]
patterns: [ ".php" ]
# List of included for sync services (this is a map, where key name is a plugin name).
#
# Default: <empty map>
services:
http:
# Directories to sync. If recursive is set to true, recursive sync will be applied only to the directories in
# "dirs" section. Dot (.) means "current working directory".
#
# Default: []
dirs: [ "." ]
# Recursive search for file patterns to add.
#
# Default: false
recursive: true
# Ignored folders.
#
# Default: []
ignore: [ "vendor" ]
# Service specific file pattens to sync.
#
# Default: []
patterns: [ ".php", ".go", ".md" ]
# Any number of sections can be defined here.
kv:
# User defined name of the section
#
# Default: none
boltdb-cache:
# Driver which should be used for the storage
#
# This option is required.
driver: boltdb
# Local configuration section
#
# This option is required to use local section, otherwise (boltdb-south) global configuration will be used.
config:
# File name for the DB
#
# Default: "rr.db"
file: "rr-cache.db"
# Access permission for the DB file.
#
# Default: "0777"
permissions: 0777
# TTL keys check interval in seconds. It's safe to use 1 second here, but can be a little costly to performance.
#
# Default: "60" seconds
interval: 40
# User defined name of the section
#
# Default: none
# jobs:
# # Number of threads which will try to obtain the job from the priority queue
# #
# # Default: number of the logical CPU cores
# num_pollers: 32
# # Size of the internal priority queue
# #
# # Default: 1_000_000
# pipeline_size: 100000
# timeout: 60
# # worker pool configuration
# pool:
# command: ""
# max_jobs: 0
# num_workers: 10
# allocate_timeout: 60s
# destroy_timeout: 60s
# # List of broker pipelines associated with the drivers.
# #
# # This option is not required since you can declare pipelines in the runtime. Pipeline driver should exist.
# pipelines:
# # Pipeline name
# #
# # This option is required when defining pipelines via configuration.
# boltdb-queue:
# # Driver associated with the pipeline
# #
# # This option is required. Possible values: amqp, memory, sqs, beanstalk, boltdb
# driver: boltdb
# # Driver's configuration
# #
# # Should not be empty
# config:
# # BoldDB file to create or DB to use
# #
# # Default: "rr.db"
# file: "rr-queues.db"
# # Pipeline priority
# #
# # If the job has priority set to 0, it will inherit the pipeline's priority. Default: 10.
# priority: 10
# # Number of job to prefetch from the driver.
# #
# # Default: 100_000.
# prefetch: 10000
# # list of pipelines to be consumed by the server automatically at the start, keep empty if you want to start consuming manually
# consume:
# [
# "boltdb-queue",
# #"test-local-1",
# ]
<?php
use Tilwa\Modules\ModuleWorkerAccessor;
use Spiral\RoadRunner\Environment;
use Tilwa\Tests\Mocks\PublishedTestModules;
require_once "vendor/autoload.php";
$accessor = new ModuleWorkerAccessor(new PublishedTestModules);
$accessor->setWorkerMode(Environment::fromGlobals()->getMode());
$accessor->buildIdentifier()->setActiveWorker()->openEventLoop();
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment