diff options
Diffstat (limited to 'src/arrow/cpp/src/arrow/flight/client.h')
-rw-r--r-- | src/arrow/cpp/src/arrow/flight/client.h | 330 |
1 files changed, 330 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/arrow/flight/client.h b/src/arrow/cpp/src/arrow/flight/client.h new file mode 100644 index 000000000..0a35b6d10 --- /dev/null +++ b/src/arrow/cpp/src/arrow/flight/client.h @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// \brief Implementation of Flight RPC client using gRPC. API should be +// considered experimental for now + +#pragma once + +#include <chrono> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/ipc/options.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/cancel.h" +#include "arrow/util/variant.h" + +#include "arrow/flight/types.h" // IWYU pragma: keep +#include "arrow/flight/visibility.h" + +namespace arrow { + +class RecordBatch; +class Schema; + +namespace flight { + +class ClientAuthHandler; +class ClientMiddleware; +class ClientMiddlewareFactory; + +/// \brief A duration type for Flight call timeouts. +typedef std::chrono::duration<double, std::chrono::seconds::period> TimeoutDuration; + +/// \brief Hints to the underlying RPC layer for Arrow Flight calls. +class ARROW_FLIGHT_EXPORT FlightCallOptions { + public: + /// Create a default set of call options. + FlightCallOptions(); + + /// \brief An optional timeout for this call. Negative durations + /// mean an implementation-defined default behavior will be used + /// instead. This is the default value. + TimeoutDuration timeout; + + /// \brief IPC reader options, if applicable for the call. + ipc::IpcReadOptions read_options; + + /// \brief IPC writer options, if applicable for the call. + ipc::IpcWriteOptions write_options; + + /// \brief Headers for client to add to context. + std::vector<std::pair<std::string, std::string>> headers; + + /// \brief A token to enable interactive user cancellation of long-running requests. + StopToken stop_token; +}; + +/// \brief Indicate that the client attempted to write a message +/// larger than the soft limit set via write_size_limit_bytes. +class ARROW_FLIGHT_EXPORT FlightWriteSizeStatusDetail : public arrow::StatusDetail { + public: + explicit FlightWriteSizeStatusDetail(int64_t limit, int64_t actual) + : limit_(limit), actual_(actual) {} + const char* type_id() const override; + std::string ToString() const override; + int64_t limit() const { return limit_; } + int64_t actual() const { return actual_; } + + /// \brief Extract this status detail from a status, or return + /// nullptr if the status doesn't contain this status detail. + static std::shared_ptr<FlightWriteSizeStatusDetail> UnwrapStatus( + const arrow::Status& status); + + private: + int64_t limit_; + int64_t actual_; +}; + +struct ARROW_FLIGHT_EXPORT FlightClientOptions { + /// \brief Root certificates to use for validating server + /// certificates. + std::string tls_root_certs; + /// \brief Override the hostname checked by TLS. Use with caution. + std::string override_hostname; + /// \brief The client certificate to use if using Mutual TLS + std::string cert_chain; + /// \brief The private key associated with the client certificate for Mutual TLS + std::string private_key; + /// \brief A list of client middleware to apply. + std::vector<std::shared_ptr<ClientMiddlewareFactory>> middleware; + /// \brief A soft limit on the number of bytes to write in a single + /// batch when sending Arrow data to a server. + /// + /// Used to help limit server memory consumption. Only enabled if + /// positive. When enabled, FlightStreamWriter.Write* may yield a + /// IOError with error detail FlightWriteSizeStatusDetail. + int64_t write_size_limit_bytes = 0; + + /// \brief Generic connection options, passed to the underlying + /// transport; interpretation is implementation-dependent. + std::vector<std::pair<std::string, util::Variant<int, std::string>>> generic_options; + + /// \brief Use TLS without validating the server certificate. Use with caution. + bool disable_server_verification = false; + + /// \brief Get default options. + static FlightClientOptions Defaults(); +}; + +/// \brief A RecordBatchReader exposing Flight metadata and cancel +/// operations. +class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader { + public: + /// \brief Try to cancel the call. + virtual void Cancel() = 0; + using MetadataRecordBatchReader::ReadAll; + /// \brief Consume entire stream as a vector of record batches + virtual Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches, + const StopToken& stop_token) = 0; + /// \brief Consume entire stream as a Table + Status ReadAll(std::shared_ptr<Table>* table, const StopToken& stop_token); +}; + +// Silence warning +// "non dll-interface class RecordBatchReader used as base for dll-interface class" +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4275) +#endif + +/// \brief A RecordBatchWriter that also allows sending +/// application-defined metadata via the Flight protocol. +class ARROW_FLIGHT_EXPORT FlightStreamWriter : public MetadataRecordBatchWriter { + public: + /// \brief Indicate that the application is done writing to this stream. + /// + /// The application may not write to this stream after calling + /// this. This differs from closing the stream because this writer + /// may represent only one half of a readable and writable stream. + virtual Status DoneWriting() = 0; +}; + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +/// \brief A reader for application-specific metadata sent back to the +/// client during an upload. +class ARROW_FLIGHT_EXPORT FlightMetadataReader { + public: + virtual ~FlightMetadataReader(); + /// \brief Read a message from the server. + virtual Status ReadMetadata(std::shared_ptr<Buffer>* out) = 0; +}; + +/// \brief Client class for Arrow Flight RPC services (gRPC-based). +/// API experimental for now +class ARROW_FLIGHT_EXPORT FlightClient { + public: + ~FlightClient(); + + /// \brief Connect to an unauthenticated flight service + /// \param[in] location the URI + /// \param[out] client the created FlightClient + /// \return Status OK status may not indicate that the connection was + /// successful + static Status Connect(const Location& location, std::unique_ptr<FlightClient>* client); + + /// \brief Connect to an unauthenticated flight service + /// \param[in] location the URI + /// \param[in] options Other options for setting up the client + /// \param[out] client the created FlightClient + /// \return Status OK status may not indicate that the connection was + /// successful + static Status Connect(const Location& location, const FlightClientOptions& options, + std::unique_ptr<FlightClient>* client); + + /// \brief Authenticate to the server using the given handler. + /// \param[in] options Per-RPC options + /// \param[in] auth_handler The authentication mechanism to use + /// \return Status OK if the client authenticated successfully + Status Authenticate(const FlightCallOptions& options, + std::unique_ptr<ClientAuthHandler> auth_handler); + + /// \brief Authenticate to the server using basic HTTP style authentication. + /// \param[in] options Per-RPC options + /// \param[in] username Username to use + /// \param[in] password Password to use + /// \return Arrow result with bearer token and status OK if client authenticated + /// sucessfully + arrow::Result<std::pair<std::string, std::string>> AuthenticateBasicToken( + const FlightCallOptions& options, const std::string& username, + const std::string& password); + + /// \brief Perform the indicated action, returning an iterator to the stream + /// of results, if any + /// \param[in] options Per-RPC options + /// \param[in] action the action to be performed + /// \param[out] results an iterator object for reading the returned results + /// \return Status + Status DoAction(const FlightCallOptions& options, const Action& action, + std::unique_ptr<ResultStream>* results); + Status DoAction(const Action& action, std::unique_ptr<ResultStream>* results) { + return DoAction({}, action, results); + } + + /// \brief Retrieve a list of available Action types + /// \param[in] options Per-RPC options + /// \param[out] actions the available actions + /// \return Status + Status ListActions(const FlightCallOptions& options, std::vector<ActionType>* actions); + Status ListActions(std::vector<ActionType>* actions) { + return ListActions({}, actions); + } + + /// \brief Request access plan for a single flight, which may be an existing + /// dataset or a command to be executed + /// \param[in] options Per-RPC options + /// \param[in] descriptor the dataset request, whether a named dataset or + /// command + /// \param[out] info the FlightInfo describing where to access the dataset + /// \return Status + Status GetFlightInfo(const FlightCallOptions& options, + const FlightDescriptor& descriptor, + std::unique_ptr<FlightInfo>* info); + Status GetFlightInfo(const FlightDescriptor& descriptor, + std::unique_ptr<FlightInfo>* info) { + return GetFlightInfo({}, descriptor, info); + } + + /// \brief Request schema for a single flight, which may be an existing + /// dataset or a command to be executed + /// \param[in] options Per-RPC options + /// \param[in] descriptor the dataset request, whether a named dataset or + /// command + /// \param[out] schema_result the SchemaResult describing the dataset schema + /// \return Status + Status GetSchema(const FlightCallOptions& options, const FlightDescriptor& descriptor, + std::unique_ptr<SchemaResult>* schema_result); + Status GetSchema(const FlightDescriptor& descriptor, + std::unique_ptr<SchemaResult>* schema_result) { + return GetSchema({}, descriptor, schema_result); + } + + /// \brief List all available flights known to the server + /// \param[out] listing an iterator that returns a FlightInfo for each flight + /// \return Status + Status ListFlights(std::unique_ptr<FlightListing>* listing); + + /// \brief List available flights given indicated filter criteria + /// \param[in] options Per-RPC options + /// \param[in] criteria the filter criteria (opaque) + /// \param[out] listing an iterator that returns a FlightInfo for each flight + /// \return Status + Status ListFlights(const FlightCallOptions& options, const Criteria& criteria, + std::unique_ptr<FlightListing>* listing); + + /// \brief Given a flight ticket and schema, request to be sent the + /// stream. Returns record batch stream reader + /// \param[in] options Per-RPC options + /// \param[in] ticket The flight ticket to use + /// \param[out] stream the returned RecordBatchReader + /// \return Status + Status DoGet(const FlightCallOptions& options, const Ticket& ticket, + std::unique_ptr<FlightStreamReader>* stream); + Status DoGet(const Ticket& ticket, std::unique_ptr<FlightStreamReader>* stream) { + return DoGet({}, ticket, stream); + } + + /// \brief Upload data to a Flight described by the given + /// descriptor. The caller must call Close() on the returned stream + /// once they are done writing. + /// + /// The reader and writer are linked; closing the writer will also + /// close the reader. Use \a DoneWriting to only close the write + /// side of the channel. + /// + /// \param[in] options Per-RPC options + /// \param[in] descriptor the descriptor of the stream + /// \param[in] schema the schema for the data to upload + /// \param[out] stream a writer to write record batches to + /// \param[out] reader a reader for application metadata from the server + /// \return Status + Status DoPut(const FlightCallOptions& options, const FlightDescriptor& descriptor, + const std::shared_ptr<Schema>& schema, + std::unique_ptr<FlightStreamWriter>* stream, + std::unique_ptr<FlightMetadataReader>* reader); + Status DoPut(const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema, + std::unique_ptr<FlightStreamWriter>* stream, + std::unique_ptr<FlightMetadataReader>* reader) { + return DoPut({}, descriptor, schema, stream, reader); + } + + Status DoExchange(const FlightCallOptions& options, const FlightDescriptor& descriptor, + std::unique_ptr<FlightStreamWriter>* writer, + std::unique_ptr<FlightStreamReader>* reader); + Status DoExchange(const FlightDescriptor& descriptor, + std::unique_ptr<FlightStreamWriter>* writer, + std::unique_ptr<FlightStreamReader>* reader) { + return DoExchange({}, descriptor, writer, reader); + } + + private: + FlightClient(); + class FlightClientImpl; + std::unique_ptr<FlightClientImpl> impl_; +}; + +} // namespace flight +} // namespace arrow |