summaryrefslogtreecommitdiffstats
path: root/src/rgw/driver/rados/rgw_notify.cc
blob: b1835016ec0eef21b9adc01ace1e43c67f74fc50 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab

#include "rgw_notify.h"
#include "cls/2pc_queue/cls_2pc_queue_client.h"
#include "cls/lock/cls_lock_client.h"
#include <memory>
#include <boost/algorithm/hex.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
#include <spawn/spawn.hpp>
#include "rgw_sal_rados.h"
#include "rgw_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_perf_counters.h"
#include "common/dout.h"
#include <chrono>

#define dout_subsys ceph_subsys_rgw

namespace rgw::notify {

struct event_entry_t {
  rgw_pubsub_s3_event event;
  std::string push_endpoint;
  std::string push_endpoint_args;
  std::string arn_topic;
  
  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(event, bl);
    encode(push_endpoint, bl);
    encode(push_endpoint_args, bl);
    encode(arn_topic, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(event, bl);
    decode(push_endpoint, bl);
    decode(push_endpoint_args, bl);
    decode(arn_topic, bl);
    DECODE_FINISH(bl);
  }
};
WRITE_CLASS_ENCODER(event_entry_t)

using queues_t = std::set<std::string>;

// use mmap/mprotect to allocate 128k coroutine stacks
auto make_stack_allocator() {
  return boost::context::protected_fixedsize_stack{128*1024};
}

const std::string Q_LIST_OBJECT_NAME = "queues_list_object";

class Manager : public DoutPrefixProvider {
  const size_t max_queue_size;
  const uint32_t queues_update_period_ms;
  const uint32_t queues_update_retry_ms;
  const uint32_t queue_idle_sleep_us;
  const utime_t failover_time;
  CephContext* const cct;
  static constexpr auto COOKIE_LEN = 16;
  const std::string lock_cookie;
  boost::asio::io_context io_context;
  boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard;
  const uint32_t worker_count;
  std::vector<std::thread> workers;
  const uint32_t stale_reservations_period_s;
  const uint32_t reservations_cleanup_period_s;
public:
  librados::IoCtx& rados_ioctx;
private:

  CephContext *get_cct() const override { return cct; }
  unsigned get_subsys() const override { return dout_subsys; }
  std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; }

  // read the list of queues from the queue list object
  int read_queue_list(queues_t& queues, optional_yield y) {
    constexpr auto max_chunk = 1024U;
    std::string start_after;
    bool more = true;
    int rval;
    while (more) {
      librados::ObjectReadOperation op;
      queues_t queues_chunk;
      op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
      const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
      if (ret == -ENOENT) {
        // queue list object was not created - nothing to do
        return 0;
      }
      if (ret < 0) {
        // TODO: do we need to check on rval as well as ret?
        ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
        return ret;
      }
      queues.merge(queues_chunk);
    }
    return 0;
  }

  // set m1 to be the minimum between m1 and m2
  static int set_min_marker(std::string& m1, const std::string m2) {
    cls_queue_marker mr1;
    cls_queue_marker mr2;
    if (mr1.from_str(m1.c_str()) < 0 || mr2.from_str(m2.c_str()) < 0) {
      return -EINVAL;
    }
    if (mr2.gen <= mr1.gen && mr2.offset < mr1.offset) {
      m1 = m2;
    }
    return 0;
  }

  using Clock = ceph::coarse_mono_clock;
  using Executor = boost::asio::io_context::executor_type;
  using Timer = boost::asio::basic_waitable_timer<Clock,
        boost::asio::wait_traits<Clock>, Executor>;

  class tokens_waiter {
    const std::chrono::hours infinite_duration;
    size_t pending_tokens;
    Timer timer;
 
    struct token {
      tokens_waiter& waiter;
      token(tokens_waiter& _waiter) : waiter(_waiter) {
        ++waiter.pending_tokens;
      }
      
      ~token() {
        --waiter.pending_tokens;
        if (waiter.pending_tokens == 0) {
          waiter.timer.cancel();
        }   
      }   
    };
  
  public:

    tokens_waiter(boost::asio::io_context& io_context) :
      infinite_duration(1000),
      pending_tokens(0),
      timer(io_context) {}  
 
    void async_wait(yield_context yield) {
      if (pending_tokens == 0) {
        return;
      }
      timer.expires_from_now(infinite_duration);
      boost::system::error_code ec; 
      timer.async_wait(yield[ec]);
      ceph_assert(ec == boost::system::errc::operation_canceled);
    }   
 
    token make_token() {    
      return token(*this);
    }   
  };

  // processing of a specific entry
  // return whether processing was successfull (true) or not (false)
  bool process_entry(const cls_queue_entry& entry, yield_context yield) {
    event_entry_t event_entry;
    auto iter = entry.data.cbegin();
    try {
      decode(event_entry, iter);
    } catch (buffer::error& err) {
      ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
      return false;
    }
    try {
      // TODO move endpoint creation to queue level
      const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
          RGWHTTPArgs(event_entry.push_endpoint_args, this), 
          cct);
      ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
        " for entry: " << entry.marker << dendl;
      const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
      if (ret < 0) {
        ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
          << " failed. error: " << ret << " (will retry)" << dendl;
        return false;
      } else {
        ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
          << " ok" <<  dendl;
        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
        return true;
      }
    } catch (const RGWPubSubEndpoint::configuration_error& e) {
      ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " 
          << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
      return false;
    }
  }

  // clean stale reservation from queue
  void cleanup_queue(const std::string& queue_name, yield_context yield) {
    while (true) {
      ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
      const auto now = ceph::coarse_real_time::clock::now();
      const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
      librados::ObjectWriteOperation op;
      op.assert_exists();
      rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
        ClsLockType::EXCLUSIVE,
        lock_cookie, 
        "" /*no tag*/);
      cls_2pc_queue_expire_reservations(op, stale_time);
      // check ownership and do reservation cleanup in one batch
      auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
      if (ret == -ENOENT) {
        // queue was deleted
        ldpp_dout(this, 5) << "INFO: queue: " 
          << queue_name << ". was removed. cleanup will stop" << dendl;
        return;
      }
      if (ret == -EBUSY) {
        ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
        return;
      }
      if (ret < 0) {
        ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
          << ". error: " << ret << dendl;
      }
      Timer timer(io_context);
      timer.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s));
      boost::system::error_code ec;
	    timer.async_wait(yield[ec]);
    }
  }

  // processing of a specific queue
  void process_queue(const std::string& queue_name, yield_context yield) {
    constexpr auto max_elements = 1024;
    auto is_idle = false;
    const std::string start_marker;

    // start a the cleanup coroutine for the queue
    spawn::spawn(io_context, [this, queue_name](yield_context yield) {
            cleanup_queue(queue_name, yield);
            }, make_stack_allocator());
    
    while (true) {
      // if queue was empty the last time, sleep for idle timeout
      if (is_idle) {
        Timer timer(io_context);
        timer.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us));
        boost::system::error_code ec;
	      timer.async_wait(yield[ec]);
      }

      // get list of entries in the queue
      is_idle = true;
      bool truncated = false;
      std::string end_marker;
      std::vector<cls_queue_entry> entries;
      auto total_entries = 0U;
      {
        librados::ObjectReadOperation op;
        op.assert_exists();
        bufferlist obl;
        int rval;
        rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
          ClsLockType::EXCLUSIVE,
          lock_cookie, 
          "" /*no tag*/);
        cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
        // check ownership and list entries in one batch
        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
        if (ret == -ENOENT) {
          // queue was deleted
          ldpp_dout(this, 5) << "INFO: queue: " 
            << queue_name << ". was removed. processing will stop" << dendl;
          return;
        }
        if (ret == -EBUSY) {
          ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
          return;
        }
        if (ret < 0) {
          ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: " 
            << queue_name << ". error: " << ret << " (will retry)" << dendl;
          continue;
        }
        ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker);
        if (ret < 0) {
          ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: " 
            << queue_name << ". error: " << ret << " (will retry)" << dendl;
          continue;
        }
      }
      total_entries = entries.size();
      if (total_entries == 0) {
        // nothing in the queue
        continue;
      }
      // log when queue is not idle
      ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
        ". end marker is: " << end_marker << dendl;
      
      is_idle = false;
      auto has_error = false;
      auto remove_entries = false;
      auto entry_idx = 1U;
      tokens_waiter waiter(io_context);
      for (auto& entry : entries) {
        if (has_error) {
          // bail out on first error
          break;
        }
        // TODO pass entry pointer instead of by-value
        spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
            const auto token = waiter.make_token();
            if (process_entry(entry, yield)) {
              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
              remove_entries = true;
            }  else {
              if (set_min_marker(end_marker, entry.marker) < 0) {
                ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
              } else {
                ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
              }
              has_error = true;
              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
            } 
        }, make_stack_allocator());
        ++entry_idx;
      }

      // wait for all pending work to finish
      waiter.async_wait(yield);

      // delete all published entries from queue
      if (remove_entries) {
        librados::ObjectWriteOperation op;
        op.assert_exists();
        rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
          ClsLockType::EXCLUSIVE,
          lock_cookie, 
          "" /*no tag*/);
        cls_2pc_queue_remove_entries(op, end_marker); 
        // check ownership and deleted entries in one batch
        const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); 
        if (ret == -ENOENT) {
          // queue was deleted
          ldpp_dout(this, 5) << "INFO: queue: " 
            << queue_name << ". was removed. processing will stop" << dendl;
          return;
        }
        if (ret == -EBUSY) {
          ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
          return;
        }
        if (ret < 0) {
          ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker <<  " from queue: " 
            << queue_name << ". error: " << ret << dendl;
        } else {
          ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker <<  " from queue: " 
          << queue_name << dendl;
        }
      }
    }
  }

  // lits of owned queues
  using owned_queues_t = std::unordered_set<std::string>;

  // process all queues
  // find which of the queues is owned by this daemon and process it
  void process_queues(yield_context yield) {
    auto has_error = false;
    owned_queues_t owned_queues;

    // add randomness to the duration between queue checking
    // to make sure that different daemons are not synced
    std::random_device seed;
    std::mt19937 rnd_gen(seed());
    const auto min_jitter = 100; // ms
    const auto max_jitter = 500; // ms
    std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);

    std::vector<std::string> queue_gc;
    std::mutex queue_gc_lock;
    while (true) {
      Timer timer(io_context);
      const auto duration = (has_error ? 
        std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + 
        std::chrono::milliseconds(duration_jitter(rnd_gen));
      timer.expires_from_now(duration);
      const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
      ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp)  << dendl;
      boost::system::error_code ec;
      timer.async_wait(yield[ec]);

      queues_t queues;
      auto ret = read_queue_list(queues, optional_yield(io_context, yield));
      if (ret < 0) {
        has_error = true;
        continue;
      }

      for (const auto& queue_name : queues) {
        // try to lock the queue to check if it is owned by this rgw
        // or if ownershif needs to be taken
        librados::ObjectWriteOperation op;
        op.assert_exists();
        rados::cls::lock::lock(&op, queue_name+"_lock", 
              ClsLockType::EXCLUSIVE,
              lock_cookie, 
              "" /*no tag*/,
              "" /*no description*/,
              failover_time,
              LOCK_FLAG_MAY_RENEW);

        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
        if (ret == -EBUSY) {
          // lock is already taken by another RGW
          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
          // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
          continue;
        }
        if (ret == -ENOENT) {
          // queue is deleted - processing will stop the next time we try to read from the queue
          ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
          continue;
        }
        if (ret < 0) {
          // failed to lock for another reason, continue to process other queues
          ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
          has_error = true;
          continue;
        }
        // add queue to list of owned queues
        if (owned_queues.insert(queue_name).second) {
          ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
          // start processing this queue
          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) {
            process_queue(queue_name, yield);
            // if queue processing ended, it measn that the queue was removed or not owned anymore
            // mark it for deletion
            std::lock_guard lock_guard(queue_gc_lock);
            queue_gc.push_back(queue_name);
            ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
          }, make_stack_allocator());
        } else {
          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
        }
      }
      // erase all queue that were deleted
      {
        std::lock_guard lock_guard(queue_gc_lock);
        std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
          owned_queues.erase(queue_name);
          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
        });
        queue_gc.clear();
      }
    }
  }

public:

  ~Manager() {
    work_guard.reset();
    io_context.stop();
    std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
  }

  // ctor: start all threads
  Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, 
          uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, 
          uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
          uint32_t _worker_count, rgw::sal::RadosStore* store) :
    max_queue_size(_max_queue_size),
    queues_update_period_ms(_queues_update_period_ms),
    queues_update_retry_ms(_queues_update_retry_ms),
    queue_idle_sleep_us(_queue_idle_sleep_us),
    failover_time(std::chrono::milliseconds(failover_time_ms)),
    cct(_cct),
    lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
    work_guard(boost::asio::make_work_guard(io_context)),
    worker_count(_worker_count),
    stale_reservations_period_s(_stale_reservations_period_s),
    reservations_cleanup_period_s(_reservations_cleanup_period_s),
    rados_ioctx(store->getRados()->get_notif_pool_ctx())
    {
      spawn::spawn(io_context, [this] (yield_context yield) {
            process_queues(yield);
          }, make_stack_allocator());

      // start the worker threads to do the actual queue processing
      const std::string WORKER_THREAD_NAME = "notif-worker";
      for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
        workers.emplace_back([this]() {
          try {
            io_context.run(); 
          } catch (const std::exception& err) {
            ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
            throw(err);
          }
        });
        const auto rc = ceph_pthread_setname(workers.back().native_handle(), 
          (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
        ceph_assert(rc == 0);
      }
      ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
    }

  int add_persistent_topic(const std::string& topic_name, optional_yield y) {
    if (topic_name == Q_LIST_OBJECT_NAME) {
      ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
      return -EINVAL;
    }
    librados::ObjectWriteOperation op;
    op.create(true);
    cls_2pc_queue_init(op, topic_name, max_queue_size);
    auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
    if (ret == -EEXIST) {
      // queue already exists - nothing to do
      ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
      return 0;
    }
    if (ret < 0) {
      // failed to create queue
      ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
      return ret;
    }
   
    bufferlist empty_bl;
    std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
    op.omap_set(new_topic);
    ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
    if (ret < 0) {
      ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
      return ret;
    } 
    ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list"  << dendl;
    return 0;
  }
};

// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;

constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30;     // check queue list every 30seconds
constexpr uint32_t Q_LIST_RETRY_MSEC = 1000;         // retry every second if queue list update failed
constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000;     // idle sleep 100ms
constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time
constexpr uint32_t WORKER_COUNT = 1;                 // 1 worker thread
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120;   // cleanup reservations that are more than 2 minutes old
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds

bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) {
  if (s_manager) {
    return false;
  }
  // TODO: take conf from CephContext
  s_manager = new Manager(cct, MAX_QUEUE_SIZE, 
      Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, 
      IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, 
      STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
      WORKER_COUNT,
      store);
  return true;
}

void shutdown() {
  delete s_manager;
  s_manager = nullptr;
}

int add_persistent_topic(const std::string& topic_name, optional_yield y) {
  if (!s_manager) {
    return -EAGAIN;
  }
  return s_manager->add_persistent_topic(topic_name, y);
}

int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) {
  librados::ObjectWriteOperation op;
  op.remove();
  auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y);
  if (ret == -ENOENT) {
    // queue already removed - nothing to do
    ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
    return 0;
  }
  if (ret < 0) {
    // failed to remove queue
    ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
    return ret;
  }

  std::set<std::string> topic_to_remove{{topic_name}};
  op.omap_rm_keys(topic_to_remove);
  ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
  if (ret < 0) {
    ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
    return ret;
  }
  ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list"  << dendl;
  return 0;
}

int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
  if (!s_manager) {
    return -EAGAIN;
  }
  return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y);
}

rgw::sal::Object* get_object_with_atttributes(
  const reservation_t& res, rgw::sal::Object* obj) {
  // in case of copy obj, the tags and metadata are taken from source
  const auto src_obj = res.src_object ? res.src_object : obj;
  if (src_obj->get_attrs().empty()) {
    if (!src_obj->get_bucket()) {
      src_obj->set_bucket(res.bucket);
    }
    const auto ret = src_obj->get_obj_attrs(res.yield, res.dpp);
    if (ret < 0) {
      ldpp_dout(res.dpp, 20) << "failed to get attributes from object: " << 
        src_obj->get_key() << ". ret = " << ret << dendl;
      return nullptr;
    }
  }
  return src_obj;
}

static inline void filter_amz_meta(meta_map_t& dest, const meta_map_t& src) {
  std::copy_if(src.cbegin(), src.cend(),
               std::inserter(dest, dest.end()),
               [](const auto& m) {
                 return (boost::algorithm::starts_with(m.first, RGW_AMZ_META_PREFIX));
               });
}


static inline void metadata_from_attributes(
  reservation_t& res, rgw::sal::Object* obj) {
  auto& metadata = res.x_meta_map;
  const auto src_obj = get_object_with_atttributes(res, obj);
  if (!src_obj) {
    return;
  }
  res.metadata_fetched_from_attributes = true;
  for (auto& attr : src_obj->get_attrs()) {
    if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
      std::string_view key(attr.first);
      key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
      // we want to pass a null terminated version
      // of the bufferlist, hence "to_str().c_str()"
      metadata.emplace(key, attr.second.to_str().c_str());
    }
  }
}

static inline void tags_from_attributes(
  const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
  const auto src_obj = get_object_with_atttributes(res, obj);
  if (!src_obj) {
    return;
  }
  const auto& attrs = src_obj->get_attrs();
  const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
  if (attr_iter != attrs.end()) {
    auto bliter = attr_iter->second.cbegin();
    RGWObjTags obj_tags;
    try {
      ::decode(obj_tags, bliter);
    } catch(buffer::error&) {
      // not able to decode tags
      return;
    }
    tags = std::move(obj_tags.get_tags());
  }
}

// populate event from request
static inline void populate_event(reservation_t& res,
        rgw::sal::Object* obj,
        uint64_t size,
        const ceph::real_time& mtime, 
        const std::string& etag, 
        const std::string& version, 
        EventType event_type,
        rgw_pubsub_s3_event& event) {
  event.eventTime = mtime;
  event.eventName = to_event_string(event_type);
  event.userIdentity = res.user_id;    // user that triggered the change
  event.x_amz_request_id = res.req_id; // request ID of the original change
  event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
  // configurationId is filled from notification configuration
  event.bucket_name = res.bucket->get_name();
  event.bucket_ownerIdentity = res.bucket->get_owner() ?
    res.bucket->get_owner()->get_id().id : res.bucket->get_info().owner.id;
  const auto region = res.store->get_zone()->get_zonegroup().get_api_name();
  rgw::ARN bucket_arn(res.bucket->get_key());
  bucket_arn.region = region; 
  event.bucket_arn = to_string(bucket_arn);
  event.object_key = res.object_name ? *res.object_name : obj->get_name();
  event.object_size = size;
  event.object_etag = etag;
  event.object_versionId = version;
  event.awsRegion = region;
  // use timestamp as per key sequence id (hex encoded)
  const utime_t ts(real_clock::now());
  boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
          std::back_inserter(event.object_sequencer));
  set_event_id(event.id, etag, ts);
  event.bucket_id = res.bucket->get_bucket_id();
  // pass meta data
  if (!res.metadata_fetched_from_attributes) {
    // either no metadata exist or no metadata filter was used
    metadata_from_attributes(res, obj);
  }
  event.x_meta_map = res.x_meta_map;
  // pass tags
  if (!res.tagset ||
      (*res.tagset).get_tags().empty()) {
    // try to fetch the tags from the attributes
    tags_from_attributes(res, obj, event.tags);
  } else {
    event.tags = (*res.tagset).get_tags();
  }
  // opaque data will be filled from topic configuration
}

static inline bool notification_match(reservation_t& res,
				      const rgw_pubsub_topic_filter& filter,
				      EventType event,
				      const RGWObjTags* req_tags) {
  if (!match(filter.events, event)) { 
    return false;
  }
  const auto obj = res.object;
  if (!match(filter.s3_filter.key_filter, 
        res.object_name ? *res.object_name : obj->get_name())) {
    return false;
  }

  if (!filter.s3_filter.metadata_filter.kv.empty()) {
    // metadata filter exists
    if (res.s) {
      filter_amz_meta(res.x_meta_map, res.s->info.x_meta_map);
    }
    metadata_from_attributes(res, obj);
    if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
      return false;
    }
  }

  if (!filter.s3_filter.tag_filter.kv.empty()) {
    // tag filter exists
    if (req_tags) {
      // tags in the request
      if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
        return false;
      }
    } else if (res.tagset && !(*res.tagset).get_tags().empty()) {
      // tags were cached in req_state
      if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) {
        return false;
      }
    } else {
      // try to fetch tags from the attributes
      KeyMultiValueMap tags;
      tags_from_attributes(res, obj, tags);
      if (!match(filter.s3_filter.tag_filter, tags)) {
        return false;
      }
    }
  }

  return true;
}

  int publish_reserve(const DoutPrefixProvider* dpp,
		      EventType event_type,
		      reservation_t& res,
		      const RGWObjTags* req_tags)
{
  const RGWPubSub ps(res.store, res.user_tenant);
  const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
  rgw_pubsub_bucket_topics bucket_topics;
  auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
  if (rc < 0) {
    // failed to fetch bucket topics
    return rc;
  }
  for (const auto& bucket_topic : bucket_topics.topics) {
    const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
    const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
    if (!notification_match(res, topic_filter, event_type, req_tags)) {
      // notification does not apply to req_state
      continue;
    }
    ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
        "' on topic: '" << topic_cfg.dest.arn_topic << 
        "' and bucket: '" << res.bucket->get_name() <<
        "' (unique topic: '" << topic_cfg.name <<
        "') apply to event of type: '" << to_string(event_type) << "'" << dendl;

    cls_2pc_reservation::id_t res_id;
    if (topic_cfg.dest.persistent) {
      // TODO: take default reservation size from conf
      constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K
      res.size = DEFAULT_RESERVATION;
      librados::ObjectWriteOperation op;
      bufferlist obl;
      int rval;
      const auto& queue_name = topic_cfg.dest.arn_topic;
      cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
      auto ret = rgw_rados_operate(
	res.dpp, res.store->getRados()->get_notif_pool_ctx(),
	queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
      if (ret < 0) {
        ldpp_dout(res.dpp, 1) <<
	  "ERROR: failed to reserve notification on queue: "
			      << queue_name << ". error: " << ret << dendl;
        // if no space is left in queue we ask client to slow down
        return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
      }
      ret = cls_2pc_queue_reserve_result(obl, res_id);
      if (ret < 0) {
        ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
        return ret;
      }
    }
    res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
  }
  return 0;
}

int publish_commit(rgw::sal::Object* obj,
		   uint64_t size,
		   const ceph::real_time& mtime,
		   const std::string& etag,
		   const std::string& version,
		   EventType event_type,
		   reservation_t& res,
		   const DoutPrefixProvider* dpp)
{
  for (auto& topic : res.topics) {
    if (topic.cfg.dest.persistent &&
	topic.res_id == cls_2pc_reservation::NO_ID) {
      // nothing to commit or already committed/aborted
      continue;
    }
    event_entry_t event_entry;
    populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
    event_entry.event.configurationId = topic.configurationId;
    event_entry.event.opaque_data = topic.cfg.opaque_data;
    if (topic.cfg.dest.persistent) { 
      event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
      event_entry.push_endpoint_args =
	std::move(topic.cfg.dest.push_endpoint_args);
      event_entry.arn_topic = topic.cfg.dest.arn_topic;
      bufferlist bl;
      encode(event_entry, bl);
      const auto& queue_name = topic.cfg.dest.arn_topic;
      if (bl.length() > res.size) {
        // try to make a larger reservation, fail only if this is not possible
        ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
			  << " exceeded reserved size: " << res.size
			  <<
          " . trying to make a larger reservation on queue:" << queue_name
			  << dendl;
        // first cancel the existing reservation
        librados::ObjectWriteOperation op;
        cls_2pc_queue_abort(op, topic.res_id);
        auto ret = rgw_rados_operate(
	  dpp, res.store->getRados()->get_notif_pool_ctx(),
	  topic.cfg.dest.arn_topic, &op,
	  res.yield);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
			    << topic.res_id << 
            " when trying to make a larger reservation on queue: " << queue_name
			    << ". error: " << ret << dendl;
          return ret;
        }
        // now try to make a bigger one
	buffer::list obl;
        int rval;
        cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
        ret = rgw_rados_operate(
	  dpp, res.store->getRados()->get_notif_pool_ctx(),
          queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
			    << queue_name
			    << ". error: " << ret << dendl;
          return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
        }
        ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for "
	    "extra space. error: " << ret << dendl;
          return ret;
        }
      }
      std::vector<buffer::list> bl_data_vec{std::move(bl)};
      librados::ObjectWriteOperation op;
      cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
      const auto ret = rgw_rados_operate(
	dpp, res.store->getRados()->get_notif_pool_ctx(),
	queue_name, &op, res.yield);
      topic.res_id = cls_2pc_reservation::NO_ID;
      if (ret < 0) {
        ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
			  << queue_name << ". error: " << ret
			  << dendl;
        return ret;
      }
    } else {
      try {
        // TODO add endpoint LRU cache
        const auto push_endpoint = RGWPubSubEndpoint::create(
	  topic.cfg.dest.push_endpoint,
	  topic.cfg.dest.arn_topic,
	  RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
	  dpp->get_cct());
        ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
			       << topic.cfg.dest.push_endpoint << dendl;
        const auto ret = push_endpoint->send_to_completion_async(
	  dpp->get_cct(), event_entry.event, res.yield);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: push to endpoint "
			    << topic.cfg.dest.push_endpoint
			    << " failed. error: " << ret << dendl;
          if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
          return ret;
        }
        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
      } catch (const RGWPubSubEndpoint::configuration_error& e) {
        ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: " 
            << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl;
        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
        return -EINVAL;
      }
    }
  }
  return 0;
}

int publish_abort(reservation_t& res) {
  for (auto& topic : res.topics) {
    if (!topic.cfg.dest.persistent ||
	topic.res_id == cls_2pc_reservation::NO_ID) {
      // nothing to abort or already committed/aborted
      continue;
    }
    const auto& queue_name = topic.cfg.dest.arn_topic;
    librados::ObjectWriteOperation op;
    cls_2pc_queue_abort(op, topic.res_id);
    const auto ret = rgw_rados_operate(
      res.dpp, res.store->getRados()->get_notif_pool_ctx(),
      queue_name, &op, res.yield);
    if (ret < 0) {
      ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
			    << topic.res_id <<
        " from queue: " << queue_name << ". error: " << ret << dendl;
      return ret;
    }
    topic.res_id = cls_2pc_reservation::NO_ID;
  }
  return 0;
}

reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
			     rgw::sal::RadosStore* _store,
			     const req_state* _s,
			     rgw::sal::Object* _object,
			     rgw::sal::Object* _src_object,
			     const std::string* _object_name,
			     optional_yield y) :
  dpp(_s), store(_store), s(_s), size(0) /* XXX */,
  object(_object), src_object(_src_object), bucket(_s->bucket.get()),
  object_name(_object_name),
  tagset(_s->tagset),
  metadata_fetched_from_attributes(false),
  user_id(_s->user->get_id().id),
  user_tenant(_s->user->get_id().tenant),
  req_id(_s->req_id),
  yield(y)
{
  filter_amz_meta(x_meta_map, _s->info.x_meta_map);
}

reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
			     rgw::sal::RadosStore* _store,
			     rgw::sal::Object* _object,
			     rgw::sal::Object* _src_object,
			     rgw::sal::Bucket* _bucket,
			     const std::string& _user_id,
			     const std::string& _user_tenant,
			     const std::string& _req_id,
			     optional_yield y) :
    dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */,
    object(_object), src_object(_src_object), bucket(_bucket),
    object_name(nullptr),
    metadata_fetched_from_attributes(false),
    user_id(_user_id),
    user_tenant(_user_tenant),
    req_id(_req_id),
    yield(y)
{}

reservation_t::~reservation_t() {
  publish_abort(*this);
}

} // namespace rgw::notify