summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_s3select_private.h
blob: fa595b0da599fa2dc8a78f17e98e652638b389b3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
//
#pragma once

#include <errno.h>
#include <array>
#include <string.h>
#include <string_view>

#include "common/ceph_crypto.h"
#include "common/split.h"
#include "common/Formatter.h"
#include "common/utf8.h"
#include "common/ceph_json.h"
#include "common/safe_io.h"
#include "common/errno.h"
#include "auth/Crypto.h"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/tokenizer.hpp>
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
#endif
#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
#pragma clang diagnostic pop
#endif
#undef BOOST_BIND_GLOBAL_PLACEHOLDERS

#include <liboath/oath.h>


#pragma GCC diagnostic push
#pragma clang diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated"
#pragma clang diagnostic ignored "-Wdeprecated"
#include <s3select/include/s3select.h>
#pragma GCC diagnostic pop
#pragma clang diagnostic pop

#include "rgw_rest_s3.h"
#include "rgw_s3select.h"

class aws_response_handler
{

private:
  std::string sql_result;
  req_state* s;
  uint32_t header_size;
  // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
  boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
  RGWOp* m_rgwop;
  std::string m_buff_header;
  uint64_t total_bytes_returned;
  uint64_t processed_size;

  enum class header_name_En {
    EVENT_TYPE,
    CONTENT_TYPE,
    MESSAGE_TYPE,
    ERROR_CODE,
    ERROR_MESSAGE
  };

  enum class header_value_En {
    RECORDS,
    OCTET_STREAM,
    EVENT,
    CONT,
    PROGRESS,
    END,
    XML,
    STATS,
    ENGINE_ERROR,
    ERROR_TYPE
  };

  const char* PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
  const char* END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
  const char* header_name_str[5] =  {":event-type", ":content-type", ":message-type", ":error-code", ":error-message"};
  const char* header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error", "error"};
  static constexpr size_t header_crc_size = 12;

  void push_header(const char* header_name, const char* header_value);

  int create_message(u_int32_t header_len);

public:
  aws_response_handler(req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0}
  {}

  aws_response_handler() : s(nullptr), m_rgwop(nullptr), total_bytes_returned{0}, processed_size{0}
  {}

  bool is_set()
  {
    if(s==nullptr || m_rgwop == nullptr){
      return false;
    } 
    return true;
  }

  void set(req_state* ps, RGWOp* rgwop)
  {
    s = ps;
    m_rgwop = rgwop;
  }

  std::string& get_sql_result();

  uint64_t get_processed_size();

  void update_processed_size(uint64_t value);

  uint64_t get_total_bytes_returned();

  void update_total_bytes_returned(uint64_t value);

  int create_header_records();

  int create_header_continuation();

  int create_header_progress();

  int create_header_stats();

  int create_header_end();

  int create_error_header_records(const char* error_message);

  void init_response();

  void init_success_response();

  void send_continuation_response();

  void init_progress_response();

  void init_end_response();

  void init_stats_response();

  void init_error_response(const char* error_message);

  void send_success_response();

  void send_progress_response();

  void send_stats_response();

  void send_error_response(const char* error_code,
                           const char* error_message,
                           const char* resource_id);

}; //end class aws_response_handler

class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
{

private:
  s3selectEngine::s3select s3select_syntax;
  std::string m_s3select_query;
  std::string m_s3select_input;
  std::string m_s3select_output;
  s3selectEngine::csv_object m_s3_csv_object;
#ifdef _ARROW_EXIST
  s3selectEngine::parquet_object m_s3_parquet_object;
#endif
  s3selectEngine::json_object m_s3_json_object;
  std::string m_column_delimiter;
  std::string m_quot;
  std::string m_row_delimiter;
  std::string m_compression_type;
  std::string m_escape_char;
  std::unique_ptr<char[]>  m_buff_header;
  std::string m_header_info;
  std::string m_sql_query;
  std::string m_enable_progress;
  std::string output_column_delimiter;
  std::string output_quot;
  std::string output_escape_char;
  std::string output_quote_fields;
  std::string output_row_delimiter;
  std::string m_start_scan;
  std::string m_end_scan;
  bool m_scan_range_ind;
  int64_t m_start_scan_sz;
  int64_t m_end_scan_sz;
  int64_t m_object_size_for_processing;
  aws_response_handler m_aws_response_handler;
  bool enable_progress;

  //parquet request
  bool m_parquet_type;
  //json request
  std::string m_json_datatype;
  bool m_json_type;
#ifdef _ARROW_EXIST
  s3selectEngine::rgw_s3select_api m_rgw_api;
#endif
  //a request for range may statisfy by several calls to send_response_date;
  size_t m_request_range;
  std::string requested_buffer;
  std::string range_req_str;
  std::function<int(std::string&)> fp_result_header_format;
  std::function<int(std::string&)> fp_s3select_result_format;
  std::function<void(const char*)> fp_debug_mesg;
  std::function<void(void)> fp_chunked_transfer_encoding;
  int m_header_size;

public:
  unsigned int chunk_number;
  size_t m_requested_range;
  size_t m_scan_offset;
  bool m_skip_next_chunk;
  bool m_is_trino_request;

  RGWSelectObj_ObjStore_S3();
  virtual ~RGWSelectObj_ObjStore_S3();

  virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;

  virtual int get_params(optional_yield y) override;

  virtual void execute(optional_yield) override;

private:

  int csv_processing(bufferlist& bl, off_t ofs, off_t len);

  int parquet_processing(bufferlist& bl, off_t ofs, off_t len);

  int json_processing(bufferlist& bl, off_t ofs, off_t len);

  int run_s3select_on_csv(const char* query, const char* input, size_t input_length);

  int run_s3select_on_parquet(const char* query);

  int run_s3select_on_json(const char* query, const char* input, size_t input_length);

  int extract_by_tag(std::string input, std::string tag_name, std::string& result);

  void convert_escape_seq(std::string& esc);

  int handle_aws_cli_parameters(std::string& sql_query);

  int range_request(int64_t start, int64_t len, void*, optional_yield);

  size_t get_obj_size();
  std::function<int(int64_t, int64_t, void*, optional_yield*)> fp_range_req;
  std::function<size_t(void)> fp_get_obj_size;

  void shape_chunk_per_trino_requests(const char*, off_t& ofs, off_t& len);
};