// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

#ifndef CEPH_RGW_PUBSUB_H
#define CEPH_RGW_PUBSUB_H

#include "rgw_common.h"
#include "rgw_tools.h"
#include "rgw_zone.h"
#include "rgw_rados.h"
#include "rgw_notify_event_type.h"
#include "services/svc_sys_obj.h"
#include <boost/container/flat_map.hpp>

class XMLObj;

struct rgw_s3_key_filter {
  std::string prefix_rule;
  std::string suffix_rule;
  std::string regex_rule;

  bool has_content() const;

  bool decode_xml(XMLObj *obj);
  void dump_xml(Formatter *f) const;
  
  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(prefix_rule, bl);
    encode(suffix_rule, bl);
    encode(regex_rule, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(prefix_rule, bl);
    decode(suffix_rule, bl);
    decode(regex_rule, bl);
    DECODE_FINISH(bl);
  }
};
WRITE_CLASS_ENCODER(rgw_s3_key_filter)

using KeyValueList = boost::container::flat_map<std::string, std::string>;

struct rgw_s3_key_value_filter {
  KeyValueList kvl;
  
  bool has_content() const;
  
  bool decode_xml(XMLObj *obj);
  void dump_xml(Formatter *f) const;
  
  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(kvl, bl);
    ENCODE_FINISH(bl);
  }
  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(kvl, bl);
    DECODE_FINISH(bl);
  }
};
WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)

struct rgw_s3_filter {
  rgw_s3_key_filter key_filter;
  rgw_s3_key_value_filter metadata_filter;
  rgw_s3_key_value_filter tag_filter;

  bool has_content() const;
  
  bool decode_xml(XMLObj *obj);
  void dump_xml(Formatter *f) const;
  
  void encode(bufferlist& bl) const {
    ENCODE_START(2, 1, bl);
    encode(key_filter, bl);
    encode(metadata_filter, bl);
    encode(tag_filter, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(2, bl);
    decode(key_filter, bl);
    decode(metadata_filter, bl);
    if (struct_v >= 2) {
        decode(tag_filter, bl);
    }
    DECODE_FINISH(bl);
  }
};
WRITE_CLASS_ENCODER(rgw_s3_filter)

using OptionalFilter = std::optional<rgw_s3_filter>;

struct rgw_pubsub_topic_filter;
/* S3 notification configuration
 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
  <TopicConfiguration>
    <Filter>
      <S3Key>
        <FilterRule>
          <Name>suffix</Name>
          <Value>jpg</Value>
        </FilterRule>
      </S3Key>
      <S3Metadata>
        <FilterRule>
          <Name></Name>
          <Value></Value>
        </FilterRule>
      </S3Metadata>
      <S3Tags>
        <FilterRule>
          <Name></Name>
          <Value></Value>
        </FilterRule>
      </S3Tags>
    </Filter>
    <Id>notification1</Id>
    <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
    <Event>s3:ObjectCreated:*</Event>
    <Event>s3:ObjectRemoved:*</Event>
  </TopicConfiguration>
</NotificationConfiguration>
*/
struct rgw_pubsub_s3_notification {
  // notification id
  std::string id;
  // types of events
  rgw::notify::EventTypeList events;
  // topic ARN
  std::string topic_arn;
  // filter rules
  rgw_s3_filter filter;

  bool decode_xml(XMLObj *obj);
  void dump_xml(Formatter *f) const;

  rgw_pubsub_s3_notification() = default;
  // construct from rgw_pubsub_topic_filter (used by get/list notifications)
  explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
};

// return true if the key matches the prefix/suffix/regex rules of the key filter
bool match(const rgw_s3_key_filter& filter, const std::string& key);
// return true if the key matches the metadata/tags rules of the metadata/tags filter
bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl);
// return true if the event type matches (equal or contained in) one of the events in the list
bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);

struct rgw_pubsub_s3_notifications {
  std::list<rgw_pubsub_s3_notification> list;
  bool decode_xml(XMLObj *obj);
  void dump_xml(Formatter *f) const;
};

/* S3 event records structure
 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
{  
"Records":[  
  {
    "eventVersion":""
    "eventSource":"",
    "awsRegion":"",
    "eventTime":"",
    "eventName":"",
    "userIdentity":{  
      "principalId":""
    },
    "requestParameters":{
      "sourceIPAddress":""
    },
    "responseElements":{
      "x-amz-request-id":"",
      "x-amz-id-2":""
    },
    "s3":{
      "s3SchemaVersion":"1.0",
      "configurationId":"",
      "bucket":{
        "name":"",
        "ownerIdentity":{
          "principalId":""
        },
        "arn":""
        "id": ""
      },
      "object":{
        "key":"",
        "size": ,
        "eTag":"",
        "versionId":"",
        "sequencer": "",
        "metadata": ""
        "tags": ""
      }
    },
    "eventId":"",
  }
]
}*/

struct rgw_pubsub_s3_record {
  constexpr static const char* const json_type_plural = "Records";
  std::string eventVersion = "2.2";
  // aws:s3
  std::string eventSource = "ceph:s3";
  // zonegroup
  std::string awsRegion;
  // time of the request
  ceph::real_time eventTime;
  // type of the event
  std::string eventName;
  // user that sent the request
  std::string userIdentity;
  // IP address of source of the request (not implemented)
  std::string sourceIPAddress;
  // request ID (not implemented)
  std::string x_amz_request_id;
  // radosgw that received the request
  std::string x_amz_id_2;
  std::string s3SchemaVersion = "1.0";
  // ID received in the notification request
  std::string configurationId;
  // bucket name
  std::string bucket_name;
  // bucket owner
  std::string bucket_ownerIdentity;
  // bucket ARN
  std::string bucket_arn;
  // object key
  std::string object_key;
  // object size
  uint64_t object_size = 0;
  // object etag
  std::string object_etag;
  // object version id bucket is versioned
  std::string object_versionId;
  // hexadecimal value used to determine event order for specific key
  std::string object_sequencer;
  // this is an rgw extension (not S3 standard)
  // used to store a globally unique identifier of the event
  // that could be used for acking or any other identification of the event
  std::string id;
  // this is an rgw extension holding the internal bucket id
  std::string bucket_id;
  // meta data
  KeyValueList x_meta_map;
  // tags
  KeyValueList tags;
  // opaque data received from the topic
  // could be used to identify the gateway
  std::string opaque_data;

  void encode(bufferlist& bl) const {
    ENCODE_START(4, 1, bl);
    encode(eventVersion, bl);
    encode(eventSource, bl);
    encode(awsRegion, bl);
    encode(eventTime, bl);
    encode(eventName, bl);
    encode(userIdentity, bl);
    encode(sourceIPAddress, bl);
    encode(x_amz_request_id, bl);
    encode(x_amz_id_2, bl);
    encode(s3SchemaVersion, bl);
    encode(configurationId, bl);
    encode(bucket_name, bl);
    encode(bucket_ownerIdentity, bl);
    encode(bucket_arn, bl);
    encode(object_key, bl);
    encode(object_size, bl);
    encode(object_etag, bl);
    encode(object_versionId, bl);
    encode(object_sequencer, bl);
    encode(id, bl);
    encode(bucket_id, bl);
    encode(x_meta_map, bl);
    encode(tags, bl);
    encode(opaque_data, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(4, bl);
    decode(eventVersion, bl);
    decode(eventSource, bl);
    decode(awsRegion, bl);
    decode(eventTime, bl);
    decode(eventName, bl);
    decode(userIdentity, bl);
    decode(sourceIPAddress, bl);
    decode(x_amz_request_id, bl);
    decode(x_amz_id_2, bl);
    decode(s3SchemaVersion, bl);
    decode(configurationId, bl);
    decode(bucket_name, bl);
    decode(bucket_ownerIdentity, bl);
    decode(bucket_arn, bl);
    decode(object_key, bl);
    decode(object_size, bl);
    decode(object_etag, bl);
    decode(object_versionId, bl);
    decode(object_sequencer, bl);
    decode(id, bl);
    if (struct_v >= 2) {
      decode(bucket_id, bl);
      decode(x_meta_map, bl);
    }
    if (struct_v >= 3) {
      decode(tags, bl);
    }
    if (struct_v >= 4) {
      decode(opaque_data, bl);
    }
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)

struct rgw_pubsub_event {
  constexpr static const char* const json_type_plural = "events";
  std::string id;
  std::string event_name;
  std::string source;
  ceph::real_time timestamp;
  JSONFormattable info;

  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(id, bl);
    encode(event_name, bl);
    encode(source, bl);
    encode(timestamp, bl);
    encode(info, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(id, bl);
    decode(event_name, bl);
    decode(source, bl);
    decode(timestamp, bl);
    decode(info, bl);
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_event)

// settign a unique ID for an event/record based on object hash and timestamp
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);

struct rgw_pubsub_sub_dest {
  std::string bucket_name;
  std::string oid_prefix;
  std::string push_endpoint;
  std::string push_endpoint_args;
  std::string arn_topic;
  bool stored_secret = false;

  void encode(bufferlist& bl) const {
    ENCODE_START(4, 1, bl);
    encode(bucket_name, bl);
    encode(oid_prefix, bl);
    encode(push_endpoint, bl);
    encode(push_endpoint_args, bl);
    encode(arn_topic, bl);
    encode(stored_secret, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(4, bl);
    decode(bucket_name, bl);
    decode(oid_prefix, bl);
    decode(push_endpoint, bl);
    if (struct_v >= 2) {
        decode(push_endpoint_args, bl);
    }
    if (struct_v >= 3) {
        decode(arn_topic, bl);
    }
    if (struct_v >= 4) {
        decode(stored_secret, bl);
    }
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
  void dump_xml(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)

struct rgw_pubsub_sub_config {
  rgw_user user;
  std::string name;
  std::string topic;
  rgw_pubsub_sub_dest dest;
  std::string s3_id;

  void encode(bufferlist& bl) const {
    ENCODE_START(2, 1, bl);
    encode(user, bl);
    encode(name, bl);
    encode(topic, bl);
    encode(dest, bl);
    encode(s3_id, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(2, bl);
    decode(user, bl);
    decode(name, bl);
    decode(topic, bl);
    decode(dest, bl);
    if (struct_v >= 2) {
      decode(s3_id, bl);
    }
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)

struct rgw_pubsub_topic {
  rgw_user user;
  std::string name;
  rgw_pubsub_sub_dest dest;
  std::string arn;
  std::string opaque_data;

  void encode(bufferlist& bl) const {
    ENCODE_START(3, 1, bl);
    encode(user, bl);
    encode(name, bl);
    encode(dest, bl);
    encode(arn, bl);
    encode(opaque_data, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(3, bl);
    decode(user, bl);
    decode(name, bl);
    if (struct_v >= 2) {
      decode(dest, bl);
      decode(arn, bl);
    }
    if (struct_v >= 3) {
      decode(opaque_data, bl);
    }
    DECODE_FINISH(bl);
  }

  string to_str() const {
    return user.to_str() + "/" + name;
  }

  void dump(Formatter *f) const;
  void dump_xml(Formatter *f) const;

  bool operator<(const rgw_pubsub_topic& t) const {
    return to_str().compare(t.to_str());
  }
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic)

struct rgw_pubsub_topic_subs {
  rgw_pubsub_topic topic;
  std::set<std::string> subs;

  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(topic, bl);
    encode(subs, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(topic, bl);
    decode(subs, bl);
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)

struct rgw_pubsub_topic_filter {
  rgw_pubsub_topic topic;
  rgw::notify::EventTypeList events;
  std::string s3_id;
  rgw_s3_filter s3_filter;

  void encode(bufferlist& bl) const {
    ENCODE_START(3, 1, bl);
    encode(topic, bl);
    // events are stored as a vector of strings
    std::vector<std::string> tmp_events;
    const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
    std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
    encode(tmp_events, bl);
    encode(s3_id, bl);
    encode(s3_filter, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(3, bl);
    decode(topic, bl);
    // events are stored as a vector of strings
    events.clear();
    std::vector<std::string> tmp_events;
    decode(tmp_events, bl);
    std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
    if (struct_v >= 2) {
      decode(s3_id, bl);
    }
    if (struct_v >= 3) {
      decode(s3_filter, bl);
    }
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)

struct rgw_pubsub_bucket_topics {
  std::map<std::string, rgw_pubsub_topic_filter> topics;

  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(topics, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(topics, bl);
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)

struct rgw_pubsub_user_topics {
  std::map<std::string, rgw_pubsub_topic_subs> topics;

  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(topics, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(topics, bl);
    DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
  void dump_xml(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)

static std::string pubsub_user_oid_prefix = "pubsub.user.";

class RGWUserPubSub
{
  friend class Bucket;

  RGWRados *store;
  rgw_user user;
  RGWSysObjectCtx obj_ctx;

  rgw_raw_obj user_meta_obj;

  std::string user_meta_oid() const {
    return pubsub_user_oid_prefix + user.to_str();
  }

  std::string bucket_meta_oid(const rgw_bucket& bucket) const {
    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
  }

  std::string sub_meta_oid(const string& name) const {
    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
  }

  template <class T>
  int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);

  template <class T>
  int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);

  int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);

  int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
  int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);

public:
  RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
                                                           user(_user),
                                                           obj_ctx(store->svc.sysobj->init_obj_ctx()) {
    get_user_meta_obj(&user_meta_obj);
  }

  class Bucket {
    friend class RGWUserPubSub;
    RGWUserPubSub *ps;
    rgw_bucket bucket;
    rgw_raw_obj bucket_meta_obj;

    // read the list of topics associated with a bucket and populate into result
    // use version tacker to enforce atomicity between read/write
    // return 0 on success or if no topic was associated with the bucket, error code otherwise
    int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
    // set the list of topics associated with a bucket
    // use version tacker to enforce atomicity between read/write
    // return 0 on success, error code otherwise
    int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
  public:
    Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
      ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
    }

    // read the list of topics associated with a bucket and populate into result
    // return 0 on success or if no topic was associated with the bucket, error code otherwise
    int get_topics(rgw_pubsub_bucket_topics *result);
    // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
    // assigning a notification name is optional (needed for S3 compatible notifications)
    // if the topic already exist on the bucket, the filter event list may be updated
    // for S3 compliant notifications the version with: s3_filter and notif_name should be used
    // return -ENOENT if the topic does not exists
    // return 0 on success, error code otherwise
    int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events);
    int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name);
    // remove a topic and filter from bucket
    // if the topic does not exists on the bucket it is a no-op (considered success)
    // return -ENOENT if the topic does not exists
    // return 0 on success, error code otherwise
    int remove_notification(const string& topic_name);
  };

  // base class for subscription
  class Sub {
    friend class RGWUserPubSub;
  protected:
    RGWUserPubSub* const ps;
    const std::string sub;
    rgw_raw_obj sub_meta_obj;

    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
    int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
    int remove_sub(RGWObjVersionTracker *objv_tracker);
  public:
    Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
      ps->get_sub_meta_obj(sub, &sub_meta_obj);
    }

    virtual ~Sub() = default;

    int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id="");
    int unsubscribe(const string& topic_name);
    int get_conf(rgw_pubsub_sub_config* result);
    
    static const int DEFAULT_MAX_EVENTS = 100;
    // followint virtual methods should only be called in derived
    virtual int list_events(const string& marker, int max_events) {ceph_assert(false);}
    virtual int remove_event(const string& event_id) {ceph_assert(false);}
    virtual void dump(Formatter* f) const {ceph_assert(false);}
  };

  // subscription with templated list of events to support both S3 compliant and Ceph specific events
  template<typename EventType>
  class SubWithEvents : public Sub {
  private:
    struct list_events_result {
      std::string next_marker;
      bool is_truncated{false};
      void dump(Formatter *f) const;
      std::vector<EventType> events;
    } list;

  public:
    SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}

    virtual ~SubWithEvents() = default;
    
    int list_events(const string& marker, int max_events) override;
    int remove_event(const string& event_id) override;
    void dump(Formatter* f) const override;
  };

  using BucketRef = std::shared_ptr<Bucket>;
  using SubRef = std::shared_ptr<Sub>;

  BucketRef get_bucket(const rgw_bucket& bucket) {
    return std::make_shared<Bucket>(this, bucket);
  }

  SubRef get_sub(const string& sub) {
    return std::make_shared<Sub>(this, sub);
  }
  
  SubRef get_sub_with_events(const string& sub) {
    auto tmpsub = Sub(this, sub);
    rgw_pubsub_sub_config conf;
    if (tmpsub.get_conf(&conf) < 0) {
      return nullptr;
    }
    if (conf.s3_id.empty()) {
      return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
    }
    return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
  }
  
  void get_user_meta_obj(rgw_raw_obj *obj) const {
    *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid());
  }

  void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
    *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
  }

  void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
    *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name));
  }

  // get all topics defined for the user and populate them into "result"
  // return 0 on success or if no topics exist, error code otherwise
  int get_user_topics(rgw_pubsub_user_topics *result);
  // get a topic with its subscriptions by its name and populate it into "result"
  // return -ENOENT if the topic does not exists 
  // return 0 on success, error code otherwise
  int get_topic(const string& name, rgw_pubsub_topic_subs *result);
  // get a topic with by its name and populate it into "result"
  // return -ENOENT if the topic does not exists 
  // return 0 on success, error code otherwise
  int get_topic(const string& name, rgw_pubsub_topic *result);
  // create a topic with a name only
  // if the topic already exists it is a no-op (considered success)
  // return 0 on success, error code otherwise
  int create_topic(const string& name);
  // create a topic with push destination information and ARN
  // if the topic already exists the destination and ARN values may be updated (considered succsess)
  // return 0 on success, error code otherwise
  int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data);
  // remove a topic according to its name
  // if the topic does not exists it is a no-op (considered success)
  // return 0 on success, error code otherwise
  int remove_topic(const string& name);
};

template <class T>
int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
{
  bufferlist bl;
  int ret = rgw_get_system_obj(store, obj_ctx,
                               obj.pool, obj.oid,
                               bl,
                               objv_tracker,
                               nullptr, nullptr, nullptr);
  if (ret < 0) {
    return ret;
  }

  auto iter = bl.cbegin();
  try {
    decode(*result, iter);
  } catch (buffer::error& err) {
    return -EIO;
  }

  return 0;
}

template <class T>
int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
{
  bufferlist bl;
  encode(info, bl);

  int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
                           bl, false, objv_tracker,
                           real_time());
  if (ret < 0) {
    return ret;
  }

  obj_ctx.invalidate(const_cast<rgw_raw_obj&>(obj));
  return 0;
}

#endif