connector = $connector; $this->requestData = $requestData; } public function isWritable() { return self::STATE_END > $this->state && !$this->ended; } private function writeHead() { $this->state = self::STATE_WRITING_HEAD; $requestData = $this->requestData; $streamRef = &$this->stream; $stateRef = &$this->state; $pendingWrites = &$this->pendingWrites; $that = $this; $promise = $this->connect(); $promise->then( function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites, $that) { $streamRef = $stream; $stream->on('drain', array($that, 'handleDrain')); $stream->on('data', array($that, 'handleData')); $stream->on('end', array($that, 'handleEnd')); $stream->on('error', array($that, 'handleError')); $stream->on('close', array($that, 'handleClose')); $headers = (string) $requestData; $more = $stream->write($headers . $pendingWrites); $stateRef = Request::STATE_HEAD_WRITTEN; // clear pending writes if non-empty if ($pendingWrites !== '') { $pendingWrites = ''; if ($more) { $that->emit('drain'); } } }, array($this, 'closeError') ); $this->on('close', function() use ($promise) { $promise->cancel(); }); } public function write($data) { if (!$this->isWritable()) { return false; } // write directly to connection stream if already available if (self::STATE_HEAD_WRITTEN <= $this->state) { return $this->stream->write($data); } // otherwise buffer and try to establish connection $this->pendingWrites .= $data; if (self::STATE_WRITING_HEAD > $this->state) { $this->writeHead(); } return false; } public function end($data = null) { if (!$this->isWritable()) { return; } if (null !== $data) { $this->write($data); } else if (self::STATE_WRITING_HEAD > $this->state) { $this->writeHead(); } $this->ended = true; } /** @internal */ public function handleDrain() { $this->emit('drain'); } /** @internal */ public function handleData($data) { $this->buffer .= $data; // buffer until double CRLF (or double LF for compatibility with legacy servers) if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) { try { list($response, $bodyChunk) = $this->parseResponse($this->buffer); } catch (\InvalidArgumentException $exception) { $this->emit('error', array($exception)); } $this->buffer = null; $this->stream->removeListener('drain', array($this, 'handleDrain')); $this->stream->removeListener('data', array($this, 'handleData')); $this->stream->removeListener('end', array($this, 'handleEnd')); $this->stream->removeListener('error', array($this, 'handleError')); $this->stream->removeListener('close', array($this, 'handleClose')); if (!isset($response)) { return; } $response->on('close', array($this, 'close')); $that = $this; $response->on('error', function (\Exception $error) use ($that) { $that->closeError(new \RuntimeException( "An error occured in the response", 0, $error )); }); $this->emit('response', array($response, $this)); $this->stream->emit('data', array($bodyChunk)); } } /** @internal */ public function handleEnd() { $this->closeError(new \RuntimeException( "Connection ended before receiving response" )); } /** @internal */ public function handleError(\Exception $error) { $this->closeError(new \RuntimeException( "An error occurred in the underlying stream", 0, $error )); } /** @internal */ public function handleClose() { $this->close(); } /** @internal */ public function closeError(\Exception $error) { if (self::STATE_END <= $this->state) { return; } $this->emit('error', array($error)); $this->close(); } public function close() { if (self::STATE_END <= $this->state) { return; } $this->state = self::STATE_END; $this->pendingWrites = ''; if ($this->stream) { $this->stream->close(); } $this->emit('close'); $this->removeAllListeners(); } protected function parseResponse($data) { $psrResponse = gPsr\parse_response($data); $headers = array_map(function($val) { if (1 === count($val)) { $val = $val[0]; } return $val; }, $psrResponse->getHeaders()); $factory = $this->getResponseFactory(); $response = $factory( 'HTTP', $psrResponse->getProtocolVersion(), $psrResponse->getStatusCode(), $psrResponse->getReasonPhrase(), $headers ); return array($response, (string)($psrResponse->getBody())); } protected function connect() { $scheme = $this->requestData->getScheme(); if ($scheme !== 'https' && $scheme !== 'http') { return Promise\reject( new \InvalidArgumentException('Invalid request URL given') ); } $host = $this->requestData->getHost(); $port = $this->requestData->getPort(); if ($scheme === 'https') { $host = 'tls://' . $host; } return $this->connector ->connect($host . ':' . $port); } public function setResponseFactory($factory) { $this->responseFactory = $factory; } public function getResponseFactory() { if (null === $factory = $this->responseFactory) { $stream = $this->stream; $factory = function ($protocol, $version, $code, $reasonPhrase, $headers) use ($stream) { return new Response( $stream, $protocol, $version, $code, $reasonPhrase, $headers ); }; $this->responseFactory = $factory; } return $factory; } }