summaryrefslogtreecommitdiffstats
path: root/nselib/amqp.lua
blob: 5ee65cd08c803fc5d0f0803348aa3fdc25889bc6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
---
-- The AMQP library provides some basic functionality for retrieving information
-- about an AMQP server's properties.
--
-- Summary
-- -------
-- The library currently supports the AMQP 0-9 and 0-8 protocol specifications.
--
-- Overview
-- --------
-- The library contains the following classes:
--
--  o AMQP
--    - This class contains the core functions needed to communicate with AMQP
--
-- @args amqp.version Can be used to specify the client version to use (currently, 0-8, 0-9 or 0-9-1)
--
-- @copyright Same as Nmap--See https://nmap.org/book/man-legal.html
-- @author Sebastian Dragomir <velorien@gmail.com>

-- Version 0.1

-- Created 05/04/2011 - v0.1 - created by Sebastian Dragomir <velorien@gmail.com>

local match = require "match"
local nmap = require "nmap"
local stdnse = require "stdnse"
local string = require "string"
local table = require "table"
_ENV = stdnse.module("amqp", stdnse.seeall);


AMQP = {

  -- protocol versions sent by the server
  versions = {
    [0x0800] = "0-8",
    [0x0009] = "0-9"
  },

  -- version strings the client supports
  client_version_strings = {
    ["0-8"] = "\x01\x01\x08\x00",
    ["0-9"] = "\x00\x00\x09\x00",
    ["0-9-1"] = "\x00\x00\x09\x01"
  },

  new = function(self, host, port)
    local o = {}
    setmetatable(o, self)
    self.__index = self
    o.host = host
    o.port = port
    o.amqpsocket = nmap.new_socket()
    o.cli_version = self.client_version_strings[nmap.registry.args['amqp.version']] or self.client_version_strings["0-9-1"]
    o.protover = nil
    o.server_version = nil
    o.server_product = nil
    o.serer_properties = nil
    return o
  end,

  --- Connects the AMQP socket
  connect = function(self)
    local data, status, msg

    status, msg = self.amqpsocket:connect(self.host, self.port, "tcp")
    return status, msg
  end,

  --- Disconnects the AMQP socket
  disconnect = function(self)
    self.amqpsocket:close()
  end,

  --- Decodes a table value in the server properties field.
  --
  -- @param tbl the decoded table
  -- @param tsize number, the table size in bytes
  -- @return status, true on success, false on failure
  -- @return error string containing error message if status is false
  -- @return decoded value
  decodeTable = function(self, tbl, tsize)
    local status, err, tmp, read, value
    read = 0

    while read < tsize do
      local key, value

      status, tmp = self.amqpsocket:receive_buf(match.numbytes(1), true)
      if ( not(status) ) then
        return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading key length", nil
      end
      read = read + 1

      tmp = string.unpack("B", tmp)
      status, key = self.amqpsocket:receive_buf(match.numbytes(tmp), true)
      if ( not(status) ) then
        return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading key", nil
      end
      read = read + tmp

      status, tmp = self.amqpsocket:receive_buf(match.numbytes(1), true)
      if ( not(status) ) then
        return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading value type for " .. key, nil
      end
      read = read + 1

      if ( tmp == 'F' ) then -- table type
        status, tmp = self.amqpsocket:receive_buf(match.numbytes(4), true)
        if ( not(status) ) then
          return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading table size", nil
        end

        read = read + 4
        value = {}
        tmp = string.unpack(">I4", tmp)
        status, err, value = self:decodeTable(value, tmp)
        read = read + tmp
        table.insert(tbl, key .. ": ")
        table.insert(tbl, value)
      elseif ( tmp == 'S' ) then -- string type
        status, err, value, read = self:decodeString(key, read)
        if ( key == "product" ) then
          self.server_product = value
        elseif ( key == "version" ) then
          self.server_version = value
        end
        table.insert(tbl, key .. ": " .. value)
      elseif ( tmp == 't' ) then -- boolean type
        status, err, value, read = self:decodeBoolean(key, read)
        table.insert(tbl, key .. ": " .. value)
      end

      if ( not(status) ) then
        return status, err, nil
      end

    end

    return true, nil, tbl
  end,

  --- Decodes a string value in the server properties field.
  --
  -- @param key string, the key being read
  -- @param read number, number of bytes already read
  -- @return status, true on success, false on failure
  -- @return error string containing error message if status is false
  -- @return decoded value
  -- @return number of bytes read after decoding this value
  decodeString = function(self, key, read)
    local value, status, tmp
    status, tmp = self.amqpsocket:receive_buf(match.numbytes(4), true)
    if ( not(status) ) then
      return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading value size for " .. key, nil, 0
    end

    read = read + 4
    tmp = string.unpack(">I4", tmp)
    status, value = self.amqpsocket:receive_buf(match.numbytes(tmp), true)

    if ( not(status) ) then
      return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading value for " .. key, nil, 0
    end
    read = read + tmp

    return true, nil, value, read
  end,

  --- Decodes a boolean value in the server properties field.
  --
  -- @param key string, the key being read
  -- @param read number, number of bytes already read
  -- @return status, true on success, false on failure
  -- @return error string containing error message if status is false
  -- @return decoded value
  -- @return number of bytes read after decoding this value
  decodeBoolean = function(self, key, read)
    local status, value
    status, value = self.amqpsocket:receive_buf(match.numbytes(1), true)
    if ( not(status) ) then
      return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading value for " .. key, nil, 0
    end

    value = string.unpack("B", value)
    read = read + 1

    return true, nil, value == 0x01 and "YES" or "NO", read
  end,

  --- Performs the AMQP handshake and determines
  -- * The AMQP protocol version
  -- * The server properties/capabilities
  --
  -- @return status, true on success, false on failure
  -- @return error string containing error message if status is false
  handshake = function(self)
    local _, status, err, version, tmp, value, properties

    status = self.amqpsocket:send( "AMQP" .. self.cli_version )
    if ( not(status) ) then
      return false, "ERROR: AMQP:handshake failed while sending client version"
    end

    status, tmp = self.amqpsocket:receive_buf(match.numbytes(11), true)
    if ( not(status) ) then
      return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading frame header"
    end

    -- check if the server rejected our proposed version
    if ( #tmp ~= 11 ) then
      if ( #tmp == 8 and string.unpack(">I4", tmp) == 0x414D5150 ) then
        local vi, vii, v1, v2, v3, v4, found
        vi = string.unpack(">I4", tmp, 5)
        found = false

        -- check if we support the server's version
        for _, v in pairs( self.client_version_strings ) do
          vii = string.unpack(">I4", v)
          if ( vii == vi ) then
            version = v
            found = true
            break
          end
        end

        -- try again with new version string
        if ( found and version ~= self.cli_version ) then
          self.cli_version = version
          self:disconnect()
          status, err = self:connect()

          if ( not(status) ) then
            return status, err
          end

          return self:handshake()
        end

        -- version unsupported
        v1, v2, v3, v4 = string.unpack(">BBBB", tmp, 5)
        return false, ("ERROR: AMQP:handshake unsupported version (%d.%d.%d.%d)"):format( v1, v2, v3, v4 )
      else
        return false, ("ERROR: AMQP:handshake server might not be AMQP, received: %s"):format( tmp )
      end
    end

    -- parse frame header
    local frametype, chnumber, framesize, method = string.unpack(">BI2I4I4", tmp)
    stdnse.debug1("frametype: %d, chnumber: %d, framesize: %d, method: %d", frametype, chnumber, framesize, method)

    if (frametype ~= 1) then
      return false, ("ERROR: AQMP:handshake expected header (1) frame, but was %d"):format(frametype)
    end

    if (method ~= 0x000A000A) then
      return false, ("ERROR: AQMP:handshake expected connection.start (0x000A000A) method, but was %x"):format(method)
    end

    -- parse protocol version
    status, tmp = self.amqpsocket:receive_buf(match.numbytes(2), true)
    if ( not(status) ) then
      return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading version"
    end
    version = string.unpack(">I2", tmp)
    self.protover = AMQP.versions[version]

    if ( not(self.protover) ) then
      return false, ("ERROR: AMQP:handshake unsupported version (%x)"):format(version)
    end

    -- parse server properties
    status, tmp = self.amqpsocket:receive_buf(match.numbytes(4), true)
    if ( not(status) ) then
      return status, "ERROR: AMQP:handshake connection closed unexpectedly while reading server properties size"
    end

    local tablesize = string.unpack(">I4", tmp)
    properties = {}
    status, err, properties = self:decodeTable(properties, tablesize)

    if ( not(status) ) then
      return status, err
    end

    status, err, value, tmp = self:decodeString("mechanisms", 0)
    if ( not(status) ) then
      return status, err
    end
    table.insert(properties, "mechanisms: " .. value)

    status, err, value, tmp = self:decodeString("locales", 0)
    if ( not(status) ) then
      return status, err
    end
    table.insert(properties, "locales: " .. value)

    self.server_properties = properties

    return true
  end,

  --- Returns the protocol version reported by the server
  --
  -- @return string containing the version number
  getProtocolVersion = function( self )
    return self.protover
  end,

  --- Returns the product version reported by the server
  --
  -- @return string containing the version number
  getServerVersion = function( self )
    return self.server_version
  end,

  --- Returns the product name reported by the server
  --
  -- @return string containing the product name
  getServerProduct = function( self )
    return self.server_product
  end,

  --- Returns the properties reported by the server
  --
  -- @return table containing server properties
  getServerProperties = function( self )
    return self.server_properties
  end,
}

return _ENV;