// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp #pragma once #include #include #include "include/ceph_assert.h" // boost header clobbers our assert.h #include "rgw_coroutine.h" #include "rgw_rest_conn.h" struct rgw_rest_obj { rgw_obj_key key; uint64_t content_len; std::map attrs; std::map custom_attrs; RGWAccessControlPolicy acls; void init(const rgw_obj_key& _key) { key = _key; } }; class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine { bufferlist *result; protected: RGWRESTConn *conn; RGWHTTPManager *http_manager; string path; param_vec_t params; param_vec_t extra_headers; public: boost::intrusive_ptr http_op; RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *params, bufferlist *_result) : RGWSimpleCoroutine(_cct), result(_result), conn(_conn), http_manager(_http_manager), path(_path), params(make_param_list(params)) {} RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *params) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), path(_path), params(make_param_list(params)) {} RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *params, param_vec_t &hdrs) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), path(_path), params(make_param_list(params)), extra_headers(hdrs) {} RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *params, std::map *hdrs) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), path(_path), params(make_param_list(params)), extra_headers(make_param_list(hdrs)) {} ~RGWReadRawRESTResourceCR() override { request_cleanup(); } int send_request(const DoutPrefixProvider *dpp) override { auto op = boost::intrusive_ptr( new RGWRESTReadResource(conn, path, params, &extra_headers, http_manager)); init_new_io(op.get()); int ret = op->aio_read(dpp); if (ret < 0) { log_error() << "failed to send http operation: " << op->to_str() << " ret=" << ret << std::endl; op->put(); return ret; } std::swap(http_op, op); // store reference in http_op on success return 0; } virtual int wait_result() { return http_op->wait(result, null_yield); } int request_complete() override { int ret; ret = wait_result(); auto op = std::move(http_op); // release ref on return if (ret < 0) { error_stream << "http operation failed: " << op->to_str() << " status=" << op->get_http_status() << std::endl; op->put(); return ret; } op->put(); return 0; } void request_cleanup() override { if (http_op) { http_op->put(); http_op = NULL; } } }; template class RGWReadRESTResourceCR : public RGWReadRawRESTResourceCR { T *result; public: RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *params, T *_result) : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params), result(_result) {} RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *params, std::map *hdrs, T *_result) : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params, hdrs), result(_result) {} int wait_result() override { return http_op->wait(result, null_yield); } }; template class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine { protected: RGWRESTConn *conn; RGWHTTPManager *http_manager; string method; string path; param_vec_t params; param_vec_t headers; map *attrs; T *result; E *err_result; bufferlist input_bl; bool send_content_length=false; boost::intrusive_ptr http_op; public: RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _method, const string& _path, rgw_http_param_pair *_params, map *_attrs, bufferlist& _input, T *_result, bool _send_content_length, E *_err_result = nullptr) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_attrs)), attrs(_attrs), result(_result), err_result(_err_result), input_bl(_input), send_content_length(_send_content_length) {} RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _method, const string& _path, rgw_http_param_pair *_params, map *_attrs, T *_result, E *_err_result = nullptr) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_attrs)), attrs(_attrs), result(_result), err_result(_err_result) {} ~RGWSendRawRESTResourceCR() override { request_cleanup(); } int send_request(const DoutPrefixProvider *dpp) override { auto op = boost::intrusive_ptr( new RGWRESTSendResource(conn, method, path, params, &headers, http_manager)); init_new_io(op.get()); int ret = op->aio_send(dpp, input_bl); if (ret < 0) { ldpp_subdout(dpp, rgw, 0) << "ERROR: failed to send request" << dendl; op->put(); return ret; } std::swap(http_op, op); // store reference in http_op on success return 0; } int request_complete() override { int ret; if (result || err_result) { ret = http_op->wait(result, null_yield, err_result); } else { bufferlist bl; ret = http_op->wait(&bl, null_yield); } auto op = std::move(http_op); // release ref on return if (ret < 0) { error_stream << "http operation failed: " << op->to_str() << " status=" << op->get_http_status() << std::endl; lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret << ": " << op->to_str() << dendl; op->put(); return ret; } op->put(); return 0; } void request_cleanup() override { if (http_op) { http_op->put(); http_op = NULL; } } }; template class RGWSendRESTResourceCR : public RGWSendRawRESTResourceCR { public: RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _method, const string& _path, rgw_http_param_pair *_params, map *_attrs, S& _input, T *_result, E *_err_result = nullptr) : RGWSendRawRESTResourceCR(_cct, _conn, _http_manager, _method, _path, _params, _attrs, _result, _err_result) { JSONFormatter jf; encode_json("data", _input, &jf); std::stringstream ss; jf.flush(ss); //bufferlist bl; this->input_bl.append(ss.str()); } }; template class RGWPostRESTResourceCR : public RGWSendRESTResourceCR { public: RGWPostRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *_params, S& _input, T *_result, E *_err_result = nullptr) : RGWSendRESTResourceCR(_cct, _conn, _http_manager, "POST", _path, _params, nullptr, _input, _result, _err_result) {} }; template class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR { public: RGWPutRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *_params, bufferlist& _input, T *_result, E *_err_result = nullptr) : RGWSendRawRESTResourceCR(_cct, _conn, _http_manager, "PUT", _path, _params, nullptr, _input, _result, true, _err_result) {} }; template class RGWPostRawRESTResourceCR: public RGWSendRawRESTResourceCR { public: RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *_params, map * _attrs, bufferlist& _input, T *_result, E *_err_result = nullptr) : RGWSendRawRESTResourceCR(_cct, _conn, _http_manager, "POST", _path, _params, _attrs, _input, _result, true, _err_result) {} }; template class RGWPutRESTResourceCR : public RGWSendRESTResourceCR { public: RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *_params, S& _input, T *_result, E *_err_result = nullptr) : RGWSendRESTResourceCR(_cct, _conn, _http_manager, "PUT", _path, _params, nullptr, _input, _result, _err_result) {} RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *_params, map *_attrs, S& _input, T *_result, E *_err_result = nullptr) : RGWSendRESTResourceCR(_cct, _conn, _http_manager, "PUT", _path, _params, _attrs, _input, _result, _err_result) {} }; class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine { RGWRESTConn *conn; RGWHTTPManager *http_manager; string path; param_vec_t params; boost::intrusive_ptr http_op; public: RGWDeleteRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, const string& _path, rgw_http_param_pair *_params) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), path(_path), params(make_param_list(_params)) {} ~RGWDeleteRESTResourceCR() override { request_cleanup(); } int send_request(const DoutPrefixProvider *dpp) override { auto op = boost::intrusive_ptr( new RGWRESTDeleteResource(conn, path, params, nullptr, http_manager)); init_new_io(op.get()); bufferlist bl; int ret = op->aio_send(dpp, bl); if (ret < 0) { ldpp_subdout(dpp, rgw, 0) << "ERROR: failed to send DELETE request" << dendl; op->put(); return ret; } std::swap(http_op, op); // store reference in http_op on success return 0; } int request_complete() override { int ret; bufferlist bl; ret = http_op->wait(&bl, null_yield); auto op = std::move(http_op); // release ref on return if (ret < 0) { error_stream << "http operation failed: " << op->to_str() << " status=" << op->get_http_status() << std::endl; lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret << ": " << op->to_str() << dendl; op->put(); return ret; } op->put(); return 0; } void request_cleanup() override { if (http_op) { http_op->put(); http_op = NULL; } } }; class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB { ceph::mutex lock = ceph::make_mutex("RGWCRHTTPGetDataCB"); RGWCoroutinesEnv *env; RGWCoroutine *cr; RGWHTTPStreamRWRequest *req; rgw_io_id io_id; bufferlist data; bufferlist extra_data; bool got_all_extra_data{false}; bool paused{false}; bool notified{false}; public: RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req); int handle_data(bufferlist& bl, bool *pause) override; void claim_data(bufferlist *dest, uint64_t max); bufferlist& get_extra_data() { return extra_data; } bool has_data() { return (data.length() > 0); } bool has_all_extra_data() { return got_all_extra_data; } }; class RGWStreamReadResourceCRF { protected: boost::asio::coroutine read_state; public: virtual int init(const DoutPrefixProvider *dpp) = 0; virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ virtual int decode_rest_obj(map& headers, bufferlist& extra_data) = 0; virtual bool has_attrs() = 0; virtual void get_attrs(std::map *attrs) = 0; virtual ~RGWStreamReadResourceCRF() = default; }; class RGWStreamWriteResourceCRF { protected: boost::asio::coroutine write_state; boost::asio::coroutine drain_state; public: virtual int init() = 0; virtual void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) = 0; virtual int send() = 0; virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */ virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ virtual ~RGWStreamWriteResourceCRF() = default; }; class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { CephContext *cct; RGWCoroutinesEnv *env; RGWCoroutine *caller; RGWHTTPManager *http_manager; RGWHTTPStreamRWRequest *req{nullptr}; std::optional in_cb; bufferlist extra_data; bool got_attrs{false}; bool got_extra_data{false}; rgw_io_id io_read_mask; protected: rgw_rest_obj rest_obj; struct range_info { bool is_set{false}; uint64_t ofs; uint64_t size; } range; ceph::real_time mtime; string etag; public: RGWStreamReadHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWHTTPManager *_http_manager, const rgw_obj_key& _src_key) : cct(_cct), env(_env), caller(_caller), http_manager(_http_manager) { rest_obj.init(_src_key); } ~RGWStreamReadHTTPResourceCRF(); int init(const DoutPrefixProvider *dpp) override; int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ int decode_rest_obj(map& headers, bufferlist& extra_data) override; bool has_attrs() override; void get_attrs(std::map *attrs) override; bool is_done(); virtual bool need_extra_data() { return false; } void set_req(RGWHTTPStreamRWRequest *r) { req = r; } rgw_rest_obj& get_rest_obj() { return rest_obj; } void set_range(uint64_t ofs, uint64_t size) { range.is_set = true; range.ofs = ofs; range.size = size; } }; class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { protected: RGWCoroutinesEnv *env; RGWCoroutine *caller; RGWHTTPManager *http_manager; using lock_guard = std::lock_guard; std::mutex blocked_lock; bool is_blocked; RGWHTTPStreamRWRequest *req{nullptr}; struct multipart_info { bool is_multipart{false}; string upload_id; int part_num{0}; uint64_t part_size; } multipart; class WriteDrainNotify : public RGWWriteDrainCB { RGWStreamWriteHTTPResourceCRF *crf; public: explicit WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {} void notify(uint64_t pending_size) override; } write_drain_notify_cb; public: RGWStreamWriteHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWHTTPManager *_http_manager) : env(_env), caller(_caller), http_manager(_http_manager), write_drain_notify_cb(this) {} virtual ~RGWStreamWriteHTTPResourceCRF(); int init() override { return 0; } void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override; int send() override; int write(bufferlist& data, bool *need_retry) override; /* reentrant */ void write_drain_notify(uint64_t pending_size); int drain_writes(bool *need_retry) override; /* reentrant */ virtual void handle_headers(const std::map& headers) {} void set_req(RGWHTTPStreamRWRequest *r) { req = r; } void set_multipart(const string& upload_id, int part_num, uint64_t part_size) { multipart.is_multipart = true; multipart.upload_id = upload_id; multipart.part_num = part_num; multipart.part_size = part_size; } }; class RGWStreamSpliceCR : public RGWCoroutine { CephContext *cct; RGWHTTPManager *http_manager; string url; std::shared_ptr in_crf; std::shared_ptr out_crf; bufferlist bl; bool need_retry{false}; bool sent_attrs{false}; uint64_t total_read{0}; int ret{0}; public: RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, std::shared_ptr& _in_crf, std::shared_ptr& _out_crf); ~RGWStreamSpliceCR(); int operate(const DoutPrefixProvider *dpp) override; };