diff options
Diffstat (limited to 'src/arrow/cpp/src/arrow/python/flight.h')
-rw-r--r-- | src/arrow/cpp/src/arrow/python/flight.h | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/arrow/python/flight.h b/src/arrow/cpp/src/arrow/python/flight.h new file mode 100644 index 000000000..45a090ef4 --- /dev/null +++ b/src/arrow/cpp/src/arrow/python/flight.h @@ -0,0 +1,357 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> + +#include "arrow/flight/api.h" +#include "arrow/ipc/dictionary.h" +#include "arrow/python/common.h" + +#if defined(_WIN32) || defined(__CYGWIN__) // Windows +#if defined(_MSC_VER) +#pragma warning(disable : 4251) +#else +#pragma GCC diagnostic ignored "-Wattributes" +#endif + +#ifdef ARROW_STATIC +#define ARROW_PYFLIGHT_EXPORT +#elif defined(ARROW_PYFLIGHT_EXPORTING) +#define ARROW_PYFLIGHT_EXPORT __declspec(dllexport) +#else +#define ARROW_PYFLIGHT_EXPORT __declspec(dllimport) +#endif + +#else // Not Windows +#ifndef ARROW_PYFLIGHT_EXPORT +#define ARROW_PYFLIGHT_EXPORT __attribute__((visibility("default"))) +#endif +#endif // Non-Windows + +namespace arrow { + +namespace py { + +namespace flight { + +ARROW_PYFLIGHT_EXPORT +extern const char* kPyServerMiddlewareName; + +/// \brief A table of function pointers for calling from C++ into +/// Python. +class ARROW_PYFLIGHT_EXPORT PyFlightServerVtable { + public: + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + const arrow::flight::Criteria*, + std::unique_ptr<arrow::flight::FlightListing>*)> + list_flights; + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + const arrow::flight::FlightDescriptor&, + std::unique_ptr<arrow::flight::FlightInfo>*)> + get_flight_info; + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + const arrow::flight::FlightDescriptor&, + std::unique_ptr<arrow::flight::SchemaResult>*)> + get_schema; + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + const arrow::flight::Ticket&, + std::unique_ptr<arrow::flight::FlightDataStream>*)> + do_get; + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + std::unique_ptr<arrow::flight::FlightMessageReader>, + std::unique_ptr<arrow::flight::FlightMetadataWriter>)> + do_put; + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + std::unique_ptr<arrow::flight::FlightMessageReader>, + std::unique_ptr<arrow::flight::FlightMessageWriter>)> + do_exchange; + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + const arrow::flight::Action&, + std::unique_ptr<arrow::flight::ResultStream>*)> + do_action; + std::function<Status(PyObject*, const arrow::flight::ServerCallContext&, + std::vector<arrow::flight::ActionType>*)> + list_actions; +}; + +class ARROW_PYFLIGHT_EXPORT PyServerAuthHandlerVtable { + public: + std::function<Status(PyObject*, arrow::flight::ServerAuthSender*, + arrow::flight::ServerAuthReader*)> + authenticate; + std::function<Status(PyObject*, const std::string&, std::string*)> is_valid; +}; + +class ARROW_PYFLIGHT_EXPORT PyClientAuthHandlerVtable { + public: + std::function<Status(PyObject*, arrow::flight::ClientAuthSender*, + arrow::flight::ClientAuthReader*)> + authenticate; + std::function<Status(PyObject*, std::string*)> get_token; +}; + +/// \brief A helper to implement an auth mechanism in Python. +class ARROW_PYFLIGHT_EXPORT PyServerAuthHandler + : public arrow::flight::ServerAuthHandler { + public: + explicit PyServerAuthHandler(PyObject* handler, + const PyServerAuthHandlerVtable& vtable); + Status Authenticate(arrow::flight::ServerAuthSender* outgoing, + arrow::flight::ServerAuthReader* incoming) override; + Status IsValid(const std::string& token, std::string* peer_identity) override; + + private: + OwnedRefNoGIL handler_; + PyServerAuthHandlerVtable vtable_; +}; + +/// \brief A helper to implement an auth mechanism in Python. +class ARROW_PYFLIGHT_EXPORT PyClientAuthHandler + : public arrow::flight::ClientAuthHandler { + public: + explicit PyClientAuthHandler(PyObject* handler, + const PyClientAuthHandlerVtable& vtable); + Status Authenticate(arrow::flight::ClientAuthSender* outgoing, + arrow::flight::ClientAuthReader* incoming) override; + Status GetToken(std::string* token) override; + + private: + OwnedRefNoGIL handler_; + PyClientAuthHandlerVtable vtable_; +}; + +class ARROW_PYFLIGHT_EXPORT PyFlightServer : public arrow::flight::FlightServerBase { + public: + explicit PyFlightServer(PyObject* server, const PyFlightServerVtable& vtable); + + // Like Serve(), but set up signals and invoke Python signal handlers + // if necessary. This function may return with a Python exception set. + Status ServeWithSignals(); + + Status ListFlights(const arrow::flight::ServerCallContext& context, + const arrow::flight::Criteria* criteria, + std::unique_ptr<arrow::flight::FlightListing>* listings) override; + Status GetFlightInfo(const arrow::flight::ServerCallContext& context, + const arrow::flight::FlightDescriptor& request, + std::unique_ptr<arrow::flight::FlightInfo>* info) override; + Status GetSchema(const arrow::flight::ServerCallContext& context, + const arrow::flight::FlightDescriptor& request, + std::unique_ptr<arrow::flight::SchemaResult>* result) override; + Status DoGet(const arrow::flight::ServerCallContext& context, + const arrow::flight::Ticket& request, + std::unique_ptr<arrow::flight::FlightDataStream>* stream) override; + Status DoPut(const arrow::flight::ServerCallContext& context, + std::unique_ptr<arrow::flight::FlightMessageReader> reader, + std::unique_ptr<arrow::flight::FlightMetadataWriter> writer) override; + Status DoExchange(const arrow::flight::ServerCallContext& context, + std::unique_ptr<arrow::flight::FlightMessageReader> reader, + std::unique_ptr<arrow::flight::FlightMessageWriter> writer) override; + Status DoAction(const arrow::flight::ServerCallContext& context, + const arrow::flight::Action& action, + std::unique_ptr<arrow::flight::ResultStream>* result) override; + Status ListActions(const arrow::flight::ServerCallContext& context, + std::vector<arrow::flight::ActionType>* actions) override; + + private: + OwnedRefNoGIL server_; + PyFlightServerVtable vtable_; +}; + +/// \brief A callback that obtains the next result from a Flight action. +typedef std::function<Status(PyObject*, std::unique_ptr<arrow::flight::Result>*)> + PyFlightResultStreamCallback; + +/// \brief A ResultStream built around a Python callback. +class ARROW_PYFLIGHT_EXPORT PyFlightResultStream : public arrow::flight::ResultStream { + public: + /// \brief Construct a FlightResultStream from a Python object and callback. + /// Must only be called while holding the GIL. + explicit PyFlightResultStream(PyObject* generator, + PyFlightResultStreamCallback callback); + Status Next(std::unique_ptr<arrow::flight::Result>* result) override; + + private: + OwnedRefNoGIL generator_; + PyFlightResultStreamCallback callback_; +}; + +/// \brief A wrapper around a FlightDataStream that keeps alive a +/// Python object backing it. +class ARROW_PYFLIGHT_EXPORT PyFlightDataStream : public arrow::flight::FlightDataStream { + public: + /// \brief Construct a FlightDataStream from a Python object and underlying stream. + /// Must only be called while holding the GIL. + explicit PyFlightDataStream(PyObject* data_source, + std::unique_ptr<arrow::flight::FlightDataStream> stream); + + std::shared_ptr<Schema> schema() override; + Status GetSchemaPayload(arrow::flight::FlightPayload* payload) override; + Status Next(arrow::flight::FlightPayload* payload) override; + + private: + OwnedRefNoGIL data_source_; + std::unique_ptr<arrow::flight::FlightDataStream> stream_; +}; + +class ARROW_PYFLIGHT_EXPORT PyServerMiddlewareFactory + : public arrow::flight::ServerMiddlewareFactory { + public: + /// \brief A callback to create the middleware instance in Python + typedef std::function<Status( + PyObject*, const arrow::flight::CallInfo& info, + const arrow::flight::CallHeaders& incoming_headers, + std::shared_ptr<arrow::flight::ServerMiddleware>* middleware)> + StartCallCallback; + + /// \brief Must only be called while holding the GIL. + explicit PyServerMiddlewareFactory(PyObject* factory, StartCallCallback start_call); + + Status StartCall(const arrow::flight::CallInfo& info, + const arrow::flight::CallHeaders& incoming_headers, + std::shared_ptr<arrow::flight::ServerMiddleware>* middleware) override; + + private: + OwnedRefNoGIL factory_; + StartCallCallback start_call_; +}; + +class ARROW_PYFLIGHT_EXPORT PyServerMiddleware : public arrow::flight::ServerMiddleware { + public: + typedef std::function<Status(PyObject*, + arrow::flight::AddCallHeaders* outgoing_headers)> + SendingHeadersCallback; + typedef std::function<Status(PyObject*, const Status& status)> CallCompletedCallback; + + struct Vtable { + SendingHeadersCallback sending_headers; + CallCompletedCallback call_completed; + }; + + /// \brief Must only be called while holding the GIL. + explicit PyServerMiddleware(PyObject* middleware, Vtable vtable); + + void SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers) override; + void CallCompleted(const Status& status) override; + std::string name() const override; + /// \brief Get the underlying Python object. + PyObject* py_object() const; + + private: + OwnedRefNoGIL middleware_; + Vtable vtable_; +}; + +class ARROW_PYFLIGHT_EXPORT PyClientMiddlewareFactory + : public arrow::flight::ClientMiddlewareFactory { + public: + /// \brief A callback to create the middleware instance in Python + typedef std::function<Status( + PyObject*, const arrow::flight::CallInfo& info, + std::unique_ptr<arrow::flight::ClientMiddleware>* middleware)> + StartCallCallback; + + /// \brief Must only be called while holding the GIL. + explicit PyClientMiddlewareFactory(PyObject* factory, StartCallCallback start_call); + + void StartCall(const arrow::flight::CallInfo& info, + std::unique_ptr<arrow::flight::ClientMiddleware>* middleware) override; + + private: + OwnedRefNoGIL factory_; + StartCallCallback start_call_; +}; + +class ARROW_PYFLIGHT_EXPORT PyClientMiddleware : public arrow::flight::ClientMiddleware { + public: + typedef std::function<Status(PyObject*, + arrow::flight::AddCallHeaders* outgoing_headers)> + SendingHeadersCallback; + typedef std::function<Status(PyObject*, + const arrow::flight::CallHeaders& incoming_headers)> + ReceivedHeadersCallback; + typedef std::function<Status(PyObject*, const Status& status)> CallCompletedCallback; + + struct Vtable { + SendingHeadersCallback sending_headers; + ReceivedHeadersCallback received_headers; + CallCompletedCallback call_completed; + }; + + /// \brief Must only be called while holding the GIL. + explicit PyClientMiddleware(PyObject* factory, Vtable vtable); + + void SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers) override; + void ReceivedHeaders(const arrow::flight::CallHeaders& incoming_headers) override; + void CallCompleted(const Status& status) override; + + private: + OwnedRefNoGIL middleware_; + Vtable vtable_; +}; + +/// \brief A callback that obtains the next payload from a Flight result stream. +typedef std::function<Status(PyObject*, arrow::flight::FlightPayload*)> + PyGeneratorFlightDataStreamCallback; + +/// \brief A FlightDataStream built around a Python callback. +class ARROW_PYFLIGHT_EXPORT PyGeneratorFlightDataStream + : public arrow::flight::FlightDataStream { + public: + /// \brief Construct a FlightDataStream from a Python object and underlying stream. + /// Must only be called while holding the GIL. + explicit PyGeneratorFlightDataStream(PyObject* generator, + std::shared_ptr<arrow::Schema> schema, + PyGeneratorFlightDataStreamCallback callback, + const ipc::IpcWriteOptions& options); + std::shared_ptr<Schema> schema() override; + Status GetSchemaPayload(arrow::flight::FlightPayload* payload) override; + Status Next(arrow::flight::FlightPayload* payload) override; + + private: + OwnedRefNoGIL generator_; + std::shared_ptr<arrow::Schema> schema_; + ipc::DictionaryFieldMapper mapper_; + ipc::IpcWriteOptions options_; + PyGeneratorFlightDataStreamCallback callback_; +}; + +ARROW_PYFLIGHT_EXPORT +Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema, + const arrow::flight::FlightDescriptor& descriptor, + const std::vector<arrow::flight::FlightEndpoint>& endpoints, + int64_t total_records, int64_t total_bytes, + std::unique_ptr<arrow::flight::FlightInfo>* out); + +ARROW_PYFLIGHT_EXPORT +Status DeserializeBasicAuth(const std::string& buf, + std::unique_ptr<arrow::flight::BasicAuth>* out); + +ARROW_PYFLIGHT_EXPORT +Status SerializeBasicAuth(const arrow::flight::BasicAuth& basic_auth, std::string* out); + +/// \brief Create a SchemaResult from schema. +ARROW_PYFLIGHT_EXPORT +Status CreateSchemaResult(const std::shared_ptr<arrow::Schema>& schema, + std::unique_ptr<arrow::flight::SchemaResult>* out); + +} // namespace flight +} // namespace py +} // namespace arrow |