summaryrefslogtreecommitdiffstats
path: root/examples/rgw/lua/nats_adapter.lua
blob: 38264dd46859fc208d482c3457c758ba0ab796a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
  local json = require ("lunajson")
  local nats = require ("nats")

  function nats_connect(nats_host, nats_port)
      local nats_params = {
          host = nats_host,
          port = nats_port,
      }
      client = nats.connect(nats_params)
      client:connect()
  end

  function toJson(request, eventName, opaqueData, configure)
      supported_event = true
  	local notification = {
                      ["Records"] = {
                          ["eventVersion"] = "2.1",
                          ["eventSource"] = "ceph:s3",
                          ["awsRegion"] = request.ZoneGroup.Name,
                          ["eventTime"] = request.Time,
                          ["eventName"] = eventName,
                          ["userIdentity"] = {
                              ["principalId"] = request.User.Id
                          },
                          ["requestParameters"] = {
                              ["sourceIPAddress"] = ""
                          },
                          ["responseElements"] = {
                              ["x-amz-request-id"] =  request.Id,
                              ["x-amz-id-2"] = request.RGWId
                          },
                          ["s3"] = {
                              ["s3SchemaVersion"] = "1.0",
                              ["configurationId"] = configure,
                              ["bucket"] = {
                                  ["name"] = request.Bucket.Name,
                                  ["ownerIdentity"] = {
                                      ["principalId"] = request.Bucket.User.Id
                                  },
                                  ["arn"] = "arn:aws:s3:" .. request.ZoneGroup.Name .. "::" .. request.Bucket.Name,
                                  ["id"] = request.Bucket.Id
                              },
                              ["object"] = {
                                  ["key"] = request.Object.Name,
                                  ["size"] = request.Object.Size,
                                  ["eTag"] = "", -- eTag is not supported yet
                                  ["versionId"] = request.Object.Instance,
                                  ["sequencer"] = string.format("%x", os.time()), 
                                  ["metadata"] = {
                                      json.encode(request.HTTP.Metadata)
                                  },
                                  ["tags"] = {
                                      json.encode(request.Tags)
                                  }
                              }
                          },
                          ["eventId"] = "",
                          ["opaqueData"] = opaqueData
                      }
      }
      return notification
  end
  
  supported_event = false
  configure = "mynotif1"
  opaqueData = "me@example.com"
  topic = "Bucket_Notification"
  bucket_name = "mybucket"
  nats_host = '0.0.0.0'
  nats_port = 4222
  
  if bucket_name == Request.Bucket.Name then
    --Object Created
    if Request.RGWOp == "put_obj" then
        notification = toJson(Request ,'ObjectCreated:Put', opaqueData, configure)
    elseif Request.RGWOp == "post_obj" then
        notification = toJson(Request ,'ObjectCreated:Post', opaqueData, configure)
    
    elseif Request.RGWOp == "copy_obj" then
        notification = toJson(Request ,'ObjectCreated:Copy', opaqueData, configure)
    
    --Object Removed
    elseif Request.RGWOp == "delete_obj" then
        notification = toJson(Request ,'ObjectRemoved:Delete', opaqueData, configure)
    end
    
    if supported_event == true then
        nats_connect()
        local payload = json.encode(notification)
        client:publish(topic, payload) 
        RGWDebugLog("bucket notification sent to nats://" .. nats_host .. ":" .. nats_port .. "/" .. topic)
    end
  end