// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp #pragma once #include "rgw_http_client.h" class RGWGetDataCB; class RGWHTTPSimpleRequest : public RGWHTTPClient { protected: int http_status; int status; using unique_lock = std::unique_lock<std::mutex>; std::mutex out_headers_lock; std::map<std::string, std::string> out_headers; param_vec_t params; bufferlist::iterator *send_iter; size_t max_response; /* we need this as we don't stream out response */ bufferlist response; virtual int handle_header(const std::string& name, const std::string& val); void get_params_str(std::map<std::string, std::string>& extra_args, std::string& dest); public: RGWHTTPSimpleRequest(CephContext *_cct, const std::string& _method, const std::string& _url, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPClient(_cct, _method, _url), http_status(0), status(0), send_iter(NULL), max_response(0) { set_headers(_headers); set_params(_params); } void set_headers(param_vec_t *_headers) { if (_headers) headers = *_headers; } void set_params(param_vec_t *_params) { if (_params) params = *_params; } int receive_header(void *ptr, size_t len) override; int receive_data(void *ptr, size_t len, bool *pause) override; int send_data(void *ptr, size_t len, bool* pause=nullptr) override; bufferlist& get_response() { return response; } void get_out_headers(std::map<std::string, std::string> *pheaders); /* modifies out_headers */ int get_http_status() { return http_status; } int get_status(); }; class RGWRESTSimpleRequest : public RGWHTTPSimpleRequest { std::optional<std::string> api_name; public: RGWRESTSimpleRequest(CephContext *_cct, const std::string& _method, const std::string& _url, param_vec_t *_headers, param_vec_t *_params, std::optional<std::string> _api_name) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), api_name(_api_name) {} int forward_request(const DoutPrefixProvider *dpp, const RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl, optional_yield y, std::string service=""); }; class RGWWriteDrainCB { public: RGWWriteDrainCB() = default; virtual ~RGWWriteDrainCB() = default; virtual void notify(uint64_t pending_size) = 0; }; class RGWRESTGenerateHTTPHeaders : public DoutPrefix { CephContext *cct; RGWEnv *new_env; req_info *new_info; std::string region; std::string service; std::string method; std::string url; std::string resource; public: RGWRESTGenerateHTTPHeaders(CephContext *_cct, RGWEnv *_env, req_info *_info); void init(const std::string& method, const std::string& host, const std::string& resource_prefix, const std::string& url, const std::string& resource, const param_vec_t& params, std::optional<std::string> api_name); void set_extra_headers(const std::map<std::string, std::string>& extra_headers); int set_obj_attrs(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist>& rgw_attrs); void set_http_attrs(const std::map<std::string, std::string>& http_attrs); void set_policy(RGWAccessControlPolicy& policy); int sign(const DoutPrefixProvider *dpp, RGWAccessKey& key, const bufferlist *opt_content); const std::string& get_url() { return url; } }; class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest { public: class ReceiveCB; private: ceph::mutex lock = ceph::make_mutex("RGWHTTPStreamRWRequest"); ceph::mutex write_lock = ceph::make_mutex("RGWHTTPStreamRWRequest::write_lock"); ReceiveCB *cb{nullptr}; RGWWriteDrainCB *write_drain_cb{nullptr}; bufferlist in_data; size_t chunk_ofs{0}; size_t ofs{0}; uint64_t write_ofs{0}; bool read_paused{false}; bool send_paused{false}; bool stream_writes{false}; bool write_stream_complete{false}; protected: bufferlist outbl; int handle_header(const std::string& name, const std::string& val) override; public: int send_data(void *ptr, size_t len, bool *pause) override; int receive_data(void *ptr, size_t len, bool *pause) override; class ReceiveCB { protected: uint64_t extra_data_len{0}; public: ReceiveCB() = default; virtual ~ReceiveCB() = default; virtual int handle_data(bufferlist& bl, bool *pause = nullptr) = 0; virtual void set_extra_data_len(uint64_t len) { extra_data_len = len; } }; RGWHTTPStreamRWRequest(CephContext *_cct, const std::string& _method, const std::string& _url, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params) { } RGWHTTPStreamRWRequest(CephContext *_cct, const std::string& _method, const std::string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), cb(_cb) { } virtual ~RGWHTTPStreamRWRequest() override {} void set_outbl(bufferlist& _outbl) { outbl.swap(_outbl); } void set_in_cb(ReceiveCB *_cb) { cb = _cb; } void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; } void unpause_receive(); void add_send_data(bufferlist& bl); void set_stream_write(bool s); uint64_t get_pending_send_size(); /* finish streaming writes */ void finish_write(); virtual int send(RGWHTTPManager *mgr); int complete_request(optional_yield y, std::string *etag = nullptr, real_time *mtime = nullptr, uint64_t *psize = nullptr, std::map<std::string, std::string> *pattrs = nullptr, std::map<std::string, std::string> *pheaders = nullptr); }; class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest { std::optional<RGWAccessKey> sign_key; std::optional<RGWRESTGenerateHTTPHeaders> headers_gen; RGWEnv new_env; req_info new_info; protected: std::optional<std::string> api_name; HostStyle host_style; public: RGWRESTStreamRWRequest(CephContext *_cct, const std::string& _method, const std::string& _url, RGWHTTPStreamRWRequest::ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params, std::optional<std::string> _api_name, HostStyle _host_style = PathStyle) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params), new_info(_cct, &new_env), api_name(_api_name), host_style(_host_style) { } virtual ~RGWRESTStreamRWRequest() override {} int send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, std::map<std::string, std::string>& extra_headers, const std::string& resource, bufferlist *send_data = nullptr /* optional input data */); int send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey& key, std::map<std::string, std::string>& extra_headers, const rgw_obj& obj); int send(RGWHTTPManager *mgr) override; int send_request(const DoutPrefixProvider *dpp, RGWAccessKey& key, std::map<std::string, std::string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr); int send_request(const DoutPrefixProvider *dpp, RGWAccessKey *key, std::map<std::string, std::string>& extra_headers, const std::string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */); void add_params(param_vec_t *params); private: int do_send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, std::map<std::string, std::string>& extra_headers, const std::string& resource, bufferlist *send_data = nullptr /* optional input data */); }; class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest { public: RGWRESTStreamReadRequest(CephContext *_cct, const std::string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params, std::optional<std::string> _api_name, HostStyle _host_style = PathStyle) : RGWRESTStreamRWRequest(_cct, "GET", _url, _cb, _headers, _params, _api_name, _host_style) {} }; class RGWRESTStreamHeadRequest : public RGWRESTStreamRWRequest { public: RGWRESTStreamHeadRequest(CephContext *_cct, const std::string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params, std::optional<std::string> _api_name) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params, _api_name) {} }; class RGWRESTStreamSendRequest : public RGWRESTStreamRWRequest { public: RGWRESTStreamSendRequest(CephContext *_cct, const std::string& method, const std::string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params, std::optional<std::string> _api_name, HostStyle _host_style = PathStyle) : RGWRESTStreamRWRequest(_cct, method, _url, _cb, _headers, _params, _api_name, _host_style) {} }; class RGWRESTStreamS3PutObj : public RGWHTTPStreamRWRequest { std::optional<std::string> api_name; HostStyle host_style; RGWGetDataCB *out_cb; RGWEnv new_env; req_info new_info; RGWRESTGenerateHTTPHeaders headers_gen; public: RGWRESTStreamS3PutObj(CephContext *_cct, const std::string& _method, const std::string& _url, param_vec_t *_headers, param_vec_t *_params, std::optional<std::string> _api_name, HostStyle _host_style) : RGWHTTPStreamRWRequest(_cct, _method, _url, nullptr, _headers, _params), api_name(_api_name), host_style(_host_style), out_cb(NULL), new_info(cct, &new_env), headers_gen(_cct, &new_env, &new_info) {} ~RGWRESTStreamS3PutObj() override; void send_init(const rgw_obj& obj); void send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, std::map<std::string, bufferlist>& rgw_attrs); void send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, const std::map<std::string, std::string>& http_attrs, RGWAccessControlPolicy& policy); void send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key); void put_obj_init(const DoutPrefixProvider *dpp, RGWAccessKey& key, const rgw_obj& obj, std::map<std::string, bufferlist>& attrs); RGWGetDataCB *get_out_cb() { return out_cb; } };