summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_http_client.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rgw/rgw_http_client.h348
1 files changed, 348 insertions, 0 deletions
diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h
new file mode 100644
index 000000000..dbd705a18
--- /dev/null
+++ b/src/rgw/rgw_http_client.h
@@ -0,0 +1,348 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include "common/async/yield_context.h"
+#include "common/Cond.h"
+#include "rgw_common.h"
+#include "rgw_string.h"
+#include "rgw_http_client_types.h"
+
+#include <atomic>
+
+using param_pair_t = std::pair<std::string, std::string>;
+using param_vec_t = std::vector<param_pair_t>;
+
+void rgw_http_client_init(CephContext *cct);
+void rgw_http_client_cleanup();
+
+struct rgw_http_req_data;
+class RGWHTTPManager;
+
+class RGWHTTPClient : public RGWIOProvider,
+ public NoDoutPrefix
+{
+ friend class RGWHTTPManager;
+
+ bufferlist send_bl;
+ bufferlist::iterator send_iter;
+ bool has_send_len;
+ long http_status;
+ bool send_data_hint{false};
+ size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
+ due to being paused */
+
+ void *user_info{nullptr};
+
+ rgw_http_req_data *req_data;
+
+ bool verify_ssl; // Do not validate self signed certificates, default to false
+
+ std::string ca_path;
+
+ std::string client_cert;
+
+ std::string client_key;
+
+ std::atomic<unsigned> stopped { 0 };
+
+
+protected:
+ CephContext *cct;
+
+ std::string method;
+ std::string url;
+
+ std::string protocol;
+ std::string host;
+ std::string resource_prefix;
+
+ size_t send_len{0};
+
+ param_vec_t headers;
+
+ long req_timeout{0L};
+
+ void init();
+
+ RGWHTTPManager *get_manager();
+
+ int init_request(rgw_http_req_data *req_data);
+
+ virtual int receive_header(void *ptr, size_t len) {
+ return 0;
+ }
+ virtual int receive_data(void *ptr, size_t len, bool *pause) {
+ return 0;
+ }
+
+ virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
+ return 0;
+ }
+
+ /* Callbacks for libcurl. */
+ static size_t receive_http_header(void *ptr,
+ size_t size,
+ size_t nmemb,
+ void *_info);
+
+ static size_t receive_http_data(void *ptr,
+ size_t size,
+ size_t nmemb,
+ void *_info);
+
+ static size_t send_http_data(void *ptr,
+ size_t size,
+ size_t nmemb,
+ void *_info);
+
+ ceph::mutex& get_req_lock();
+
+ /* needs to be called under req_lock() */
+ void _set_write_paused(bool pause);
+ void _set_read_paused(bool pause);
+public:
+ static const long HTTP_STATUS_NOSTATUS = 0;
+ static const long HTTP_STATUS_UNAUTHORIZED = 401;
+ static const long HTTP_STATUS_NOTFOUND = 404;
+
+ static constexpr int HTTPCLIENT_IO_READ = 0x1;
+ static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
+ static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
+
+ virtual ~RGWHTTPClient();
+ explicit RGWHTTPClient(CephContext *cct,
+ const std::string& _method,
+ const std::string& _url);
+
+ std::ostream& gen_prefix(std::ostream& out) const override;
+
+
+ void append_header(const std::string& name, const std::string& val) {
+ headers.push_back(std::pair<std::string, std::string>(name, val));
+ }
+
+ void set_send_length(size_t len) {
+ send_len = len;
+ has_send_len = true;
+ }
+
+ void set_send_data_hint(bool hint) {
+ send_data_hint = hint;
+ }
+
+ long get_http_status() const {
+ return http_status;
+ }
+
+ void set_http_status(long _http_status) {
+ http_status = _http_status;
+ }
+
+ void set_verify_ssl(bool flag) {
+ verify_ssl = flag;
+ }
+
+ // set request timeout in seconds
+ // zero (default) mean that request will never timeout
+ void set_req_timeout(long timeout) {
+ req_timeout = timeout;
+ }
+
+ int process(optional_yield y);
+
+ int wait(optional_yield y);
+ void cancel();
+ bool is_done();
+
+ rgw_http_req_data *get_req_data() { return req_data; }
+
+ std::string to_str();
+
+ int get_req_retcode();
+
+ void set_url(const std::string& _url) {
+ url = _url;
+ }
+
+ void set_method(const std::string& _method) {
+ method = _method;
+ }
+
+ void set_io_user_info(void *_user_info) override {
+ user_info = _user_info;
+ }
+
+ void *get_io_user_info() override {
+ return user_info;
+ }
+
+ void set_ca_path(const std::string& _ca_path) {
+ ca_path = _ca_path;
+ }
+
+ void set_client_cert(const std::string& _client_cert) {
+ client_cert = _client_cert;
+ }
+
+ void set_client_key(const std::string& _client_key) {
+ client_key = _client_key;
+ }
+};
+
+
+class RGWHTTPHeadersCollector : public RGWHTTPClient {
+public:
+ typedef std::string header_name_t;
+ typedef std::string header_value_t;
+ typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
+
+ RGWHTTPHeadersCollector(CephContext * const cct,
+ const std::string& method,
+ const std::string& url,
+ const header_spec_t &relevant_headers)
+ : RGWHTTPClient(cct, method, url),
+ relevant_headers(relevant_headers) {
+ }
+
+ std::map<header_name_t, header_value_t, ltstr_nocase> get_headers() const {
+ return found_headers;
+ }
+
+ /* Throws std::out_of_range */
+ const header_value_t& get_header_value(const header_name_t& name) const {
+ return found_headers.at(name);
+ }
+
+protected:
+ int receive_header(void *ptr, size_t len) override;
+
+private:
+ const std::set<header_name_t, ltstr_nocase> relevant_headers;
+ std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
+};
+
+
+class RGWHTTPTransceiver : public RGWHTTPHeadersCollector {
+ bufferlist * const read_bl;
+ std::string post_data;
+ size_t post_data_index;
+
+public:
+ RGWHTTPTransceiver(CephContext * const cct,
+ const std::string& method,
+ const std::string& url,
+ bufferlist * const read_bl,
+ const header_spec_t intercept_headers = {})
+ : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
+ read_bl(read_bl),
+ post_data_index(0) {
+ }
+
+ RGWHTTPTransceiver(CephContext * const cct,
+ const std::string& method,
+ const std::string& url,
+ bufferlist * const read_bl,
+ const bool verify_ssl,
+ const header_spec_t intercept_headers = {})
+ : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
+ read_bl(read_bl),
+ post_data_index(0) {
+ set_verify_ssl(verify_ssl);
+ }
+
+ void set_post_data(const std::string& _post_data) {
+ this->post_data = _post_data;
+ }
+
+protected:
+ int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
+
+ int receive_data(void *ptr, size_t len, bool *pause) override {
+ read_bl->append((char *)ptr, len);
+ return 0;
+ }
+};
+
+typedef RGWHTTPTransceiver RGWPostHTTPData;
+
+
+class RGWCompletionManager;
+
+enum RGWHTTPRequestSetState {
+ SET_NOP = 0,
+ SET_WRITE_PAUSED = 1,
+ SET_WRITE_RESUME = 2,
+ SET_READ_PAUSED = 3,
+ SET_READ_RESUME = 4,
+};
+
+class RGWHTTPManager {
+ struct set_state {
+ rgw_http_req_data *req;
+ int bitmask;
+
+ set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
+ };
+ CephContext *cct;
+ RGWCompletionManager *completion_mgr;
+ void *multi_handle;
+ bool is_started = false;
+ std::atomic<unsigned> going_down { 0 };
+ std::atomic<unsigned> is_stopped { 0 };
+
+ ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
+ std::map<uint64_t, rgw_http_req_data *> reqs;
+ std::list<rgw_http_req_data *> unregistered_reqs;
+ std::list<set_state> reqs_change_state;
+ std::map<uint64_t, rgw_http_req_data *> complete_reqs;
+ int64_t num_reqs = 0;
+ int64_t max_threaded_req = 0;
+ int thread_pipe[2];
+
+ void register_request(rgw_http_req_data *req_data);
+ void complete_request(rgw_http_req_data *req_data);
+ void _complete_request(rgw_http_req_data *req_data);
+ bool unregister_request(rgw_http_req_data *req_data);
+ void _unlink_request(rgw_http_req_data *req_data);
+ void unlink_request(rgw_http_req_data *req_data);
+ void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
+ void _finish_request(rgw_http_req_data *req_data, int r);
+ void _set_req_state(set_state& ss);
+ int link_request(rgw_http_req_data *req_data);
+
+ void manage_pending_requests();
+
+ class ReqsThread : public Thread {
+ RGWHTTPManager *manager;
+
+ public:
+ explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
+ void *entry() override;
+ };
+
+ ReqsThread *reqs_thread = nullptr;
+
+ void *reqs_thread_entry();
+
+ int signal_thread();
+
+public:
+ RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
+ ~RGWHTTPManager();
+
+ int start();
+ void stop();
+
+ int add_request(RGWHTTPClient *client);
+ int remove_request(RGWHTTPClient *client);
+ int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
+};
+
+class RGWHTTP
+{
+public:
+ static int send(RGWHTTPClient *req);
+ static int process(RGWHTTPClient *req, optional_yield y);
+};