summaryrefslogtreecommitdiffstats
path: root/vendor/ipl/scheduler/src/Scheduler.php
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--vendor/ipl/scheduler/src/Scheduler.php323
1 files changed, 323 insertions, 0 deletions
diff --git a/vendor/ipl/scheduler/src/Scheduler.php b/vendor/ipl/scheduler/src/Scheduler.php
new file mode 100644
index 0000000..25ad3a1
--- /dev/null
+++ b/vendor/ipl/scheduler/src/Scheduler.php
@@ -0,0 +1,323 @@
+<?php
+
+namespace ipl\Scheduler;
+
+use DateTime;
+use InvalidArgumentException;
+use ipl\Scheduler\Common\Promises;
+use ipl\Scheduler\Common\Timers;
+use ipl\Scheduler\Contract\Frequency;
+use ipl\Scheduler\Contract\Task;
+use ipl\Stdlib\Events;
+use React\EventLoop\Loop;
+use React\Promise;
+use React\Promise\ExtendedPromiseInterface;
+use SplObjectStorage;
+use Throwable;
+
+class Scheduler
+{
+ use Events;
+ use Timers;
+ use Promises;
+
+ /**
+ * Event raised when a {@link Task task} is canceled
+ *
+ * The task and its pending operations as an array of canceled {@link ExtendedPromiseInterface promise}s
+ * are passed as parameters to the event callbacks.
+ *
+ * **Example usage:**
+ *
+ * ```php
+ * $scheduler->on($scheduler::ON_TASK_CANCEL, function (Task $task, array $_) use ($logger) {
+ * $logger->info(sprintf('Task %s cancelled', $task->getName()));
+ * });
+ * ```
+ */
+ public const ON_TASK_CANCEL = 'task-cancel';
+
+ /**
+ * Event raised when an operation of a {@link Task task} is done
+ *
+ * The task and the operation result are passed as parameters to the event callbacks.
+ *
+ * **Example usage:**
+ *
+ * ```php
+ * $scheduler->on($scheduler::ON_TASK_DONE, function (Task $task, $result) use ($logger) {
+ * $logger->info(sprintf('Operation of task %s done: %s', $task->getName(), $result));
+ * });
+ * ```
+ */
+ public const ON_TASK_DONE = 'task-done';
+
+ /**
+ * Event raised when an operation of a {@link Task task} failed
+ *
+ * The task and the {@link Throwable reason} why the operation failed
+ * are passed as parameters to the event callbacks.
+ *
+ * **Example usage:**
+ *
+ * ```php
+ * $scheduler->on($scheduler::ON_TASK_FAILED, function (Task $task, Throwable $e) use ($logger) {
+ * $logger->error(
+ * sprintf('Operation of task %s failed: %s', $task->getName(), $e),
+ * ['exception' => $e]
+ * );
+ * });
+ * ```
+ */
+ public const ON_TASK_FAILED = 'task-failed';
+
+ /**
+ * Event raised when a {@link Task task} operation is scheduled
+ *
+ * The task and the {@link DateTime time} when it should run
+ * are passed as parameters to the event callbacks.
+ *
+ * **Example usage:**
+ *
+ * ```php
+ * $scheduler->on($scheduler::ON_TASK_SCHEDULED, function (Task $task, DateTime $dateTime) use ($logger) {
+ * $logger->info(sprintf(
+ * 'Scheduling task %s to run at %s',
+ * $task->getName(),
+ * IntlDateFormatter::formatObject($dateTime)
+ * ));
+ * });
+ * ```
+ */
+ public const ON_TASK_SCHEDULED = 'task-scheduled';
+
+ /**
+ * Event raised upon operation of a {@link Task task}
+ *
+ * The task and the possibly not yet completed result of the operation as a {@link ExtendedPromiseInterface promise}
+ * are passed as parameters to the event callbacks.
+ *
+ * **Example usage:**
+ *
+ * ```php
+ * $scheduler->on($scheduler::ON_TASK_OPERATION, function (Task $task, ExtendedPromiseInterface $_) use ($logger) {
+ * $logger->info(sprintf('Task %s operating', $task->getName()));
+ * });
+ * ```
+ */
+ public const ON_TASK_RUN = 'task-run';
+
+ /**
+ * Event raised when a {@see Task task} is expired
+ *
+ * The task and the {@see DateTime expire time} are passed as parameters to the event callbacks.
+ * Note that the expiration time is the first time that is considered expired based on the frequency
+ * of the task and can be later than the specified end time.
+ *
+ * **Example usage:**
+ *
+ * ```php
+ * $scheduler->on(Scheduler::ON_TASK_EXPIRED, function (Task $task, DateTime $dateTime) use ($logger) {
+ * $logger->info(sprintf('Removing expired task %s at %s', $task->getName(), $dateTime->format('Y-m-d H:i:s')));
+ * });
+ * ```
+ */
+ public const ON_TASK_EXPIRED = 'task-expired';
+
+ /** @var SplObjectStorage<Task, null> The scheduled tasks of this scheduler */
+ protected $tasks;
+
+ public function __construct()
+ {
+ $this->tasks = new SplObjectStorage();
+
+ $this->promises = new SplObjectStorage();
+ $this->timers = new SplObjectStorage();
+
+ $this->init();
+ }
+
+ /**
+ * Initialize this scheduler
+ */
+ protected function init(): void
+ {
+ }
+
+ /**
+ * Remove and cancel the given task
+ *
+ * @param Task $task
+ *
+ * @return $this
+ *
+ * @throws InvalidArgumentException If the given task isn't scheduled
+ */
+ public function remove(Task $task): self
+ {
+ if (! $this->hasTask($task)) {
+ throw new InvalidArgumentException(sprintf('Task %s not scheduled', $task->getName()));
+ }
+
+ $this->cancelTask($task);
+
+ $this->tasks->detach($task);
+
+ return $this;
+ }
+
+ /**
+ * Remove and cancel all tasks
+ *
+ * @return $this
+ */
+ public function removeTasks(): self
+ {
+ foreach ($this->tasks as $task) {
+ $this->cancelTask($task);
+ }
+
+ $this->tasks = new SplObjectStorage();
+
+ return $this;
+ }
+
+ /**
+ * Get whether the specified task is scheduled
+ *
+ * @param Task $task
+ *
+ * @return bool
+ */
+ public function hasTask(Task $task): bool
+ {
+ return $this->tasks->contains($task);
+ }
+
+ /**
+ * Schedule the given task based on the specified frequency
+ *
+ * @param Task $task
+ * @param Frequency $frequency
+ *
+ * @return $this
+ */
+ public function schedule(Task $task, Frequency $frequency): self
+ {
+ $now = new DateTime();
+ if ($frequency->isExpired($now)) {
+ return $this;
+ }
+
+ if ($frequency->isDue($now)) {
+ Loop::futureTick(function () use ($task): void {
+ $promise = $this->runTask($task);
+ $this->emit(static::ON_TASK_RUN, [$task, $promise]);
+ });
+ $this->emit(static::ON_TASK_SCHEDULED, [$task, $now]);
+
+ if ($frequency instanceof OneOff) {
+ return $this;
+ }
+ }
+
+ $loop = function () use (&$loop, $task, $frequency): void {
+ $promise = $this->runTask($task);
+ $this->emit(static::ON_TASK_RUN, [$task, $promise]);
+
+ $now = new DateTime();
+ $nextDue = $frequency->getNextDue($now);
+ if ($frequency instanceof OneOff || $frequency->isExpired($nextDue)) {
+ $removeTask = function () use ($task, $nextDue): void {
+ $this->remove($task);
+ $this->emit(static::ON_TASK_EXPIRED, [$task, $nextDue]);
+ };
+
+ if ($this->promises->contains($task->getUuid())) {
+ $pendingPromises = (array) $this->promises->offsetGet($task->getUuid());
+ Promise\all($pendingPromises)->always($removeTask);
+ } else {
+ $removeTask();
+ }
+
+ return;
+ }
+
+ $this->attachTimer(
+ $task->getUuid(),
+ Loop::addTimer($nextDue->getTimestamp() - $now->getTimestamp(), $loop)
+ );
+ $this->emit(static::ON_TASK_SCHEDULED, [$task, $nextDue]);
+ };
+
+ $nextDue = $frequency->getNextDue($now);
+ $this->attachTimer(
+ $task->getUuid(),
+ Loop::addTimer($nextDue->getTimestamp() - $now->getTimestamp(), $loop)
+ );
+ $this->emit(static::ON_TASK_SCHEDULED, [$task, $nextDue]);
+
+ $this->tasks->attach($task);
+
+ return $this;
+ }
+
+ public function isValidEvent(string $event): bool
+ {
+ $events = array_flip([
+ static::ON_TASK_CANCEL,
+ static::ON_TASK_DONE,
+ static::ON_TASK_EXPIRED,
+ static::ON_TASK_FAILED,
+ static::ON_TASK_RUN,
+ static::ON_TASK_SCHEDULED
+ ]);
+
+ return isset($events[$event]);
+ }
+
+ /**
+ * Cancel the timer of the task and all pending operations
+ *
+ * @param Task $task
+ */
+ protected function cancelTask(Task $task): void
+ {
+ Loop::cancelTimer($this->detachTimer($task->getUuid()));
+
+ /** @var ExtendedPromiseInterface[] $promises */
+ $promises = $this->detachPromises($task->getUuid());
+ if (! empty($promises)) {
+ /** @var Promise\CancellablePromiseInterface $promise */
+ foreach ($promises as $promise) {
+ $promise->cancel();
+ }
+ $this->emit(self::ON_TASK_CANCEL, [$task, $promises]);
+ }
+ }
+
+ /**
+ * Runs the given task immediately and registers handlers for the returned promise
+ *
+ * @param Task $task
+ *
+ * @return ExtendedPromiseInterface
+ */
+ protected function runTask(Task $task): ExtendedPromiseInterface
+ {
+ $promise = $task->run();
+ $this->addPromise($task->getUuid(), $promise);
+
+ return $promise->then(
+ function ($result) use ($task): void {
+ $this->emit(self::ON_TASK_DONE, [$task, $result]);
+ },
+ function (Throwable $reason) use ($task): void {
+ $this->emit(self::ON_TASK_FAILED, [$task, $reason]);
+ }
+ )->always(function () use ($task, $promise): void {
+ // Unregister the promise without canceling it as it's already resolved
+ $this->removePromise($task->getUuid(), $promise);
+ });
+ }
+}