summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_flight.h
blob: bb0a987d0a19e7b351f9611cff9c8558a5552b42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

/*
 * Ceph - scalable distributed file system
 *
 * Copyright 2023 IBM
 *
 * See file COPYING for licensing information.
 */

#pragma once

#include <map>
#include <mutex>
#include <atomic>

#include "include/common_fwd.h"
#include "common/ceph_context.h"
#include "common/Thread.h"
#include "common/ceph_time.h"
#include "rgw_frontend.h"
#include "arrow/type.h"
#include "arrow/flight/server.h"
#include "arrow/util/string_view.h"

#include "rgw_flight_frontend.h"


#define INFO_F(dp)   ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
#define WARN_F(dp)   ldpp_dout(&dp,  0) << "WARNING: " << __func__ << ": "
#define ERROR_F(dp)  ldpp_dout(&dp,  0) << "ERROR: " << __func__ << ": "

#define INFO   INFO_F(dp)
#define STATUS STATUS_F(dp)
#define WARN   WARN_F(dp)
#define ERROR  ERROR_F(dp)


namespace arw = arrow;
namespace flt = arrow::flight;


struct req_state;

namespace rgw::flight {

static const coarse_real_clock::duration lifespan = std::chrono::hours(1);

struct FlightData {
  FlightKey key;
  // coarse_real_clock::time_point expires;
  std::string uri;
  std::string tenant_name;
  std::string bucket_name;
  rgw_obj_key object_key;
  // NB: what about object's namespace and instance?
  uint64_t num_records;
  uint64_t obj_size;
  std::shared_ptr<arw::Schema> schema;
  std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;

  rgw_user user_id; // TODO: this should be removed when we do
  // proper flight authentication

  FlightData(const std::string& _uri,
	     const std::string& _tenant_name,
	     const std::string& _bucket_name,
	     const rgw_obj_key& _object_key,
	     uint64_t _num_records,
	     uint64_t _obj_size,
	     std::shared_ptr<arw::Schema>& _schema,
	     std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
	     rgw_user _user_id);
};

// stores flights that have been created and helps expire them
class FlightStore {

protected:

  const DoutPrefix& dp;

public:

  FlightStore(const DoutPrefix& dp);
  virtual ~FlightStore();
  virtual FlightKey add_flight(FlightData&& flight) = 0;

  // TODO consider returning const shared pointers to FlightData in
  // the following two functions
  virtual arw::Result<FlightData> get_flight(const FlightKey& key) const = 0;
  virtual std::optional<FlightData> after_key(const FlightKey& key) const = 0;

  virtual int remove_flight(const FlightKey& key) = 0;
  virtual int expire_flights() = 0;
};

class MemoryFlightStore : public FlightStore {
  std::map<FlightKey, FlightData> map;
  mutable std::mutex mtx; // for map

public:

  MemoryFlightStore(const DoutPrefix& dp);
  virtual ~MemoryFlightStore();
  FlightKey add_flight(FlightData&& flight) override;
  arw::Result<FlightData> get_flight(const FlightKey& key) const override;
  std::optional<FlightData> after_key(const FlightKey& key) const override;
  int remove_flight(const FlightKey& key) override;
  int expire_flights() override;
};

class FlightServer : public flt::FlightServerBase {

  using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;

  RGWProcessEnv& env;
  rgw::sal::Driver* driver;
  const DoutPrefix& dp;
  FlightStore* flight_store;

  std::map<std::string, Data1> data;

public:

  static constexpr int default_port = 8077;

  FlightServer(RGWProcessEnv& env,
	       FlightStore* flight_store,
	       const DoutPrefix& dp);
  ~FlightServer() override;

  FlightStore* get_flight_store() {
    return flight_store;
  }

  arw::Status ListFlights(const flt::ServerCallContext& context,
			  const flt::Criteria* criteria,
			  std::unique_ptr<flt::FlightListing>* listings) override;

  arw::Status GetFlightInfo(const flt::ServerCallContext &context,
			    const flt::FlightDescriptor &request,
			    std::unique_ptr<flt::FlightInfo> *info) override;

  arw::Status GetSchema(const flt::ServerCallContext &context,
			const flt::FlightDescriptor &request,
			std::unique_ptr<flt::SchemaResult> *schema) override;

  arw::Status DoGet(const flt::ServerCallContext &context,
		    const flt::Ticket &request,
		    std::unique_ptr<flt::FlightDataStream> *stream) override;
}; // class FlightServer

class OwningStringView : public arw::util::string_view {

  uint8_t* buffer;
  int64_t capacity;
  int64_t consumed;

  OwningStringView(uint8_t* _buffer, int64_t _size) :
    arw::util::string_view((const char*) _buffer, _size),
    buffer(_buffer),
    capacity(_size),
    consumed(_size)
    { }

  OwningStringView(OwningStringView&& from, int64_t new_size) :
    buffer(nullptr),
    capacity(from.capacity),
    consumed(new_size)
    {
      // should be impossible due to static function check
      ceph_assertf(consumed <= capacity, "new size cannot exceed capacity");

      std::swap(buffer, from.buffer);
      from.capacity = 0;
      from.consumed = 0;
    }

public:

  OwningStringView(OwningStringView&&) = default;
  OwningStringView& operator=(OwningStringView&&) = default;

  uint8_t* writeable_data() {
    return buffer;
  }

  ~OwningStringView() {
    if (buffer) {
      delete[] buffer;
    }
  }

  static arw::Result<OwningStringView> make(int64_t size) {
    uint8_t* buffer = new uint8_t[size];
    if (!buffer) {
      return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
    }
    return OwningStringView(buffer, size);
  }

  static arw::Result<OwningStringView> shrink(OwningStringView&& from,
					      int64_t new_size) {
    if (new_size > from.capacity) {
      return arw::Status::Invalid("new size cannot exceed capacity");
    } else {
      return OwningStringView(std::move(from), new_size);
    }
  }

};

// GLOBAL

flt::Ticket FlightKeyToTicket(const FlightKey& key);
arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key);

} // namespace rgw::flight