input = $input; $this->input->on('data', array($this, 'handleData')); $this->input->on('end', array($this, 'handleEnd')); $this->input->on('error', array($this, 'handleError')); $this->input->on('close', array($this, 'close')); } public function isReadable() { return !$this->closed && $this->input->isReadable(); } public function pause() { $this->input->pause(); } public function resume() { $this->input->resume(); } public function pipe(WritableStreamInterface $dest, array $options = array()) { return Util::pipe($this, $dest, $options); } public function close() { if ($this->closed) { return; } $this->closed = true; $this->input->close(); $this->emit('close'); $this->removeAllListeners(); } /** @internal */ public function handleData($data) { if ($data !== '') { $this->emit('data', array( \dechex(\strlen($data)) . "\r\n" . $data . "\r\n" )); } } /** @internal */ public function handleError(\Exception $e) { $this->emit('error', array($e)); $this->close(); } /** @internal */ public function handleEnd() { $this->emit('data', array("0\r\n\r\n")); if (!$this->closed) { $this->emit('end'); $this->close(); } } }