WorkerFactory
in package
implements
WorkerFactoryInterface, LoopInterface
uses
EventEmitterTrait
WorkerFactory is primary entry point for the temporal application. This class is responsible for the communication with parent RoadRunner process and can be used to create taskQueue workflow and activity workers.
$factory = WorkerFactory::create();
$worker = $factory->newWorker('default');
$worker->registerWorkflowTypes(WorkflowType::class);
$worker->registerActivityImplementations(new MyActivityImplementation());
Table of Contents
Interfaces
- WorkerFactoryInterface
- The interface is responsible for providing an interface for registering all dependencies and creating a global event loop.
- LoopInterface
- The interface is responsible for providing an interface for creating an event loop.
Constants
- ERROR_HEADER_NOT_STRING_TYPE = 'Header "%s" argument type must be a string, but %s given'
- ERROR_HEADERS_TYPE = 'Received headers type must be a string, but %s given'
- ERROR_MESSAGE_TYPE = 'Received message type must be a string, but %s given'
- ERROR_QUEUE_NOT_FOUND = 'Cannot find a worker for task queue "%s"'
- HEADER_TASK_QUEUE = 'taskQueue'
Properties
- $client : ClientInterface
- $codec : CodecInterface
- $converter : DataConverterInterface
- $env : EnvironmentInterface
- $marshaller : MarshallerInterface<string|int, array<string|int, mixed>>
- $queues : RepositoryInterface<string|int, WorkerInterface>
- $reader : ReaderInterface
- $responses : QueueInterface
- $router : RouterInterface
- $rpc : RPCConnectionInterface
- $server : ServerInterface
Methods
- __construct() : mixed
- create() : WorkerFactoryInterface
- getClient() : ClientInterface
- getDataConverter() : DataConverterInterface
- getEnvironment() : EnvironmentInterface
- getMarshaller() : MarshallerInterface<string|int, array<string|int, mixed>>
- getQueue() : QueueInterface
- getReader() : ReaderInterface
- newWorker() : WorkerInterface
- Create a new Temporal Worker with the name of the task queue and register in worker.
- run() : int
- Start processing workflows and activities processing.
- tick() : void
- createClient() : ClientInterface
- createMarshaller() : MarshallerInterface<string|int, array<string|int, mixed>>
- createQueue() : QueueInterface
- createReader() : ReaderInterface
- createRouter() : RouterInterface
- createServer() : ServerInterface
- createTaskQueue() : RepositoryInterface<string|int, WorkerInterface>
- boot() : void
- createCodec() : CodecInterface
- dispatch() : string
- findTaskQueueNameOrFail() : string
- findTaskQueueOrFail() : WorkerInterface
- onRequest() : PromiseInterface
Constants
ERROR_HEADER_NOT_STRING_TYPE
private
mixed
ERROR_HEADER_NOT_STRING_TYPE
= 'Header "%s" argument type must be a string, but %s given'
ERROR_HEADERS_TYPE
private
mixed
ERROR_HEADERS_TYPE
= 'Received headers type must be a string, but %s given'
ERROR_MESSAGE_TYPE
private
mixed
ERROR_MESSAGE_TYPE
= 'Received message type must be a string, but %s given'
ERROR_QUEUE_NOT_FOUND
private
mixed
ERROR_QUEUE_NOT_FOUND
= 'Cannot find a worker for task queue "%s"'
HEADER_TASK_QUEUE
private
mixed
HEADER_TASK_QUEUE
= 'taskQueue'
Properties
$client
protected
ClientInterface
$client
$codec
protected
CodecInterface
$codec
$converter
protected
DataConverterInterface
$converter
$env
protected
EnvironmentInterface
$env
$marshaller
protected
MarshallerInterface<string|int, array<string|int, mixed>>
$marshaller
$queues
protected
RepositoryInterface<string|int, WorkerInterface>
$queues
$reader
protected
ReaderInterface
$reader
$responses
protected
QueueInterface
$responses
$router
protected
RouterInterface
$router
$rpc
protected
RPCConnectionInterface
$rpc
$server
protected
ServerInterface
$server
Methods
__construct()
public
__construct(DataConverterInterface $dataConverter, RPCConnectionInterface $rpc, ServiceCredentials $credentials) : mixed
Parameters
- $dataConverter : DataConverterInterface
- $rpc : RPCConnectionInterface
- $credentials : ServiceCredentials
create()
public
static create([DataConverterInterface|null $converter = null ][, RPCConnectionInterface|null $rpc = null ][, ServiceCredentials|null $credentials = null ]) : WorkerFactoryInterface
Parameters
- $converter : DataConverterInterface|null = null
- $rpc : RPCConnectionInterface|null = null
- $credentials : ServiceCredentials|null = null
Return values
WorkerFactoryInterfacegetClient()
public
getClient() : ClientInterface
Return values
ClientInterfacegetDataConverter()
public
getDataConverter() : DataConverterInterface
Return values
DataConverterInterfacegetEnvironment()
public
getEnvironment() : EnvironmentInterface
Return values
EnvironmentInterfacegetMarshaller()
public
getMarshaller() : MarshallerInterface<string|int, array<string|int, mixed>>
Return values
MarshallerInterface<string|int, array<string|int, mixed>>getQueue()
public
getQueue() : QueueInterface
Return values
QueueInterfacegetReader()
public
getReader() : ReaderInterface
Return values
ReaderInterfacenewWorker()
Create a new Temporal Worker with the name of the task queue and register in worker.
public
newWorker([string $taskQueue = self::DEFAULT_TASK_QUEUE ][, WorkerOptions|null $options = null ][, ExceptionInterceptorInterface|null $exceptionInterceptor = null ][, PipelineProvider|null $interceptorProvider = null ]) : WorkerInterface
Parameters
- $taskQueue : string = self::DEFAULT_TASK_QUEUE
- $options : WorkerOptions|null = null
- $exceptionInterceptor : ExceptionInterceptorInterface|null = null
- $interceptorProvider : PipelineProvider|null = null
Return values
WorkerInterfacerun()
Start processing workflows and activities processing.
public
run([HostConnectionInterface|null $host = null ]) : int
Parameters
- $host : HostConnectionInterface|null = null
Return values
inttick()
public
tick() : void
createClient()
protected
createClient() : ClientInterface
Return values
ClientInterfacecreateMarshaller()
protected
createMarshaller(ReaderInterface $reader) : MarshallerInterface<string|int, array<string|int, mixed>>
Parameters
- $reader : ReaderInterface
Return values
MarshallerInterface<string|int, array<string|int, mixed>>createQueue()
protected
createQueue() : QueueInterface
Return values
QueueInterfacecreateReader()
protected
createReader() : ReaderInterface
Return values
ReaderInterfacecreateRouter()
protected
createRouter(ServiceCredentials $credentials) : RouterInterface
Parameters
- $credentials : ServiceCredentials
Return values
RouterInterfacecreateServer()
protected
createServer() : ServerInterface
Return values
ServerInterfacecreateTaskQueue()
protected
createTaskQueue() : RepositoryInterface<string|int, WorkerInterface>
Return values
RepositoryInterface<string|int, WorkerInterface>boot()
private
boot(ServiceCredentials $credentials) : void
Parameters
- $credentials : ServiceCredentials
createCodec()
private
createCodec() : CodecInterface
Return values
CodecInterfacedispatch()
private
dispatch(string $messages, array<string|int, mixed> $headers) : string
Parameters
- $messages : string
- $headers : array<string|int, mixed>
Return values
stringfindTaskQueueNameOrFail()
private
findTaskQueueNameOrFail(array<string|int, mixed> $headers) : string
Parameters
- $headers : array<string|int, mixed>
Return values
stringfindTaskQueueOrFail()
private
findTaskQueueOrFail(string $taskQueueName) : WorkerInterface
Parameters
- $taskQueueName : string
Return values
WorkerInterfaceonRequest()
private
onRequest(ServerRequestInterface $request, array<string|int, mixed> $headers) : PromiseInterface
Parameters
- $request : ServerRequestInterface
- $headers : array<string|int, mixed>