on('data', $this->expectCallableOnceWith('hello')); * * $through->write('hello'); * ``` * * Similarly, the [`end()` method](#end) will end the stream and emit an * [`end` event](#end-event) and then [`close()`](#close-1) the stream. * The [`close()` method](#close-1) will close the stream and emit a * [`close` event](#close-event). * Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this: * * ```php * $through = new ThroughStream(); * $source->pipe($through)->pipe($dest); * ``` * * Optionally, its constructor accepts any callable function which will then be * used to *filter* any data written to it. This function receives a single data * argument as passed to the writable side and must return the data as it will be * passed to its readable end: * * ```php * $through = new ThroughStream('strtoupper'); * $source->pipe($through)->pipe($dest); * ``` * * Note that this class makes no assumptions about any data types. This can be * used to convert data, for example for transforming any structured data into * a newline-delimited JSON (NDJSON) stream like this: * * ```php * $through = new ThroughStream(function ($data) { * return json_encode($data) . PHP_EOL; * }); * $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); * * $through->write(array(2, true)); * ``` * * The callback function is allowed to throw an `Exception`. In this case, * the stream will emit an `error` event and then [`close()`](#close-1) the stream. * * ```php * $through = new ThroughStream(function ($data) { * if (!is_string($data)) { * throw new \UnexpectedValueException('Only strings allowed'); * } * return $data; * }); * $through->on('error', $this->expectCallableOnce())); * $through->on('close', $this->expectCallableOnce())); * $through->on('data', $this->expectCallableNever())); * * $through->write(2); * ``` * * @see WritableStreamInterface::write() * @see WritableStreamInterface::end() * @see DuplexStreamInterface::close() * @see WritableStreamInterface::pipe() */ final class ThroughStream extends EventEmitter implements DuplexStreamInterface { private $readable = true; private $writable = true; private $closed = false; private $paused = false; private $drain = false; private $callback; public function __construct($callback = null) { if ($callback !== null && !\is_callable($callback)) { throw new InvalidArgumentException('Invalid transformation callback given'); } $this->callback = $callback; } public function pause() { $this->paused = true; } public function resume() { if ($this->drain) { $this->drain = false; $this->emit('drain'); } $this->paused = false; } public function pipe(WritableStreamInterface $dest, array $options = array()) { return Util::pipe($this, $dest, $options); } public function isReadable() { return $this->readable; } public function isWritable() { return $this->writable; } public function write($data) { if (!$this->writable) { return false; } if ($this->callback !== null) { try { $data = \call_user_func($this->callback, $data); } catch (\Exception $e) { $this->emit('error', array($e)); $this->close(); return false; } } $this->emit('data', array($data)); if ($this->paused) { $this->drain = true; return false; } return true; } public function end($data = null) { if (!$this->writable) { return; } if (null !== $data) { $this->write($data); // return if write() already caused the stream to close if (!$this->writable) { return; } } $this->readable = false; $this->writable = false; $this->paused = true; $this->drain = false; $this->emit('end'); $this->close(); } public function close() { if ($this->closed) { return; } $this->readable = false; $this->writable = false; $this->closed = true; $this->paused = true; $this->drain = false; $this->callback = null; $this->emit('close'); $this->removeAllListeners(); } }