summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/test-tools/IoT-APP-Store-Demo/wasm_django/server/wasm_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/test-tools/IoT-APP-Store-Demo/wasm_django/server/wasm_server.py')
-rwxr-xr-xfluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/test-tools/IoT-APP-Store-Demo/wasm_django/server/wasm_server.py621
1 files changed, 621 insertions, 0 deletions
diff --git a/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/test-tools/IoT-APP-Store-Demo/wasm_django/server/wasm_server.py b/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/test-tools/IoT-APP-Store-Demo/wasm_django/server/wasm_server.py
new file mode 100755
index 000000000..970ec6f60
--- /dev/null
+++ b/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/test-tools/IoT-APP-Store-Demo/wasm_django/server/wasm_server.py
@@ -0,0 +1,621 @@
+'''
+ /* Copyright (C) 2019 Intel Corporation. All rights reserved.
+ * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+ */
+'''
+import select
+import socket
+import queue
+from time import sleep
+import struct
+import threading
+import time
+from ctypes import *
+import json
+import logging
+import os
+
+attr_type_list = [
+ "ATTR_TYPE_BYTE", # = ATTR_TYPE_INT8
+ "ATTR_TYPE_SHORT",# = ATTR_TYPE_INT16
+ "ATTR_TYPE_INT", # = ATTR_TYPE_INT32
+ "ATTR_TYPE_INT64",
+ "ATTR_TYPE_UINT8",
+ "ATTR_TYPE_UINT16",
+ "ATTR_TYPE_UINT32",
+ "ATTR_TYPE_UINT64",
+ "ATTR_TYPE_FLOAT",
+ "ATTR_TYPE_DOUBLE",
+ "ATTR_NONE",
+ "ATTR_NONE",
+ "ATTR_TYPE_BOOLEAN",
+ "ATTR_TYPE_STRING",
+ "ATTR_TYPE_BYTEARRAY"
+]
+
+
+Phase_Non_Start = 0
+Phase_Leading = 1
+Phase_Type = 2
+Phase_Size = 3
+Phase_Payload = 4
+
+
+
+class imrt_link_message(object):
+ def __init__(self):
+ self.leading = bytes([0x12, 0x34])
+ self.phase = Phase_Non_Start
+ self.size_in_phase = 0
+ self.message_type = bytes()
+ self.message_size = bytes()
+ self.payload = bytes()
+ self.msg = bytes()
+
+ def set_recv_phase(self, phase):
+ self.phase = phase
+
+ def on_imrt_link_byte_arrive(self, ch):
+ self.msg += ch
+ if self.phase == Phase_Non_Start:
+ if ch == b'\x12':
+ self.set_recv_phase(Phase_Leading)
+ else:
+ return -1
+ elif self.phase == Phase_Leading:
+ if ch == b'\x34':
+ self.set_recv_phase(Phase_Type)
+ else:
+ self.set_recv_phase(Phase_Non_Start)
+ return -1
+ elif self.phase == Phase_Type:
+ self.message_type += ch
+ self.size_in_phase += 1
+
+ if self.size_in_phase == 2:
+ (self.message_type, ) = struct.unpack('!H', self.message_type)
+ self.size_in_phase = 0
+ self.set_recv_phase(Phase_Size)
+ elif self.phase == Phase_Size:
+ self.message_size += ch
+ self.size_in_phase += 1
+
+ if self.size_in_phase == 4:
+ (self.message_size, ) = struct.unpack('!I', self.message_size)
+ self.size_in_phase = 0
+ self.set_recv_phase(Phase_Payload)
+
+ if self.message_size == b'\x00':
+ self.set_recv_phase(Phase_Non_Start)
+ return 0
+
+ self.set_recv_phase(Phase_Payload)
+
+ elif self.phase == Phase_Payload:
+ self.payload += ch
+ self.size_in_phase += 1
+
+ if self.size_in_phase == self.message_size:
+ self.set_recv_phase(Phase_Non_Start)
+ return 0
+
+ return 2
+
+ return 1
+
+
+
+def read_file_to_buffer(file_name):
+ file_object = open(file_name, 'rb')
+ buffer = None
+
+ if not os.path.exists(file_name):
+ logging.error("file {} not found.".format(file_name))
+ return "file not found"
+
+ try:
+ buffer = file_object.read()
+ finally:
+ file_object.close()
+
+ return buffer
+
+def decode_attr_container(msg):
+
+ attr_dict = {}
+
+ buf = msg[26 : ]
+ (total_len, tag_len) = struct.unpack('@IH', buf[0 : 6])
+ tag_name = buf[6 : 6 + tag_len].decode()
+ buf = buf[6 + tag_len : ]
+ (attr_num, ) = struct.unpack('@H', buf[0 : 2])
+ buf = buf[2 : ]
+
+ logging.info("parsed attr:")
+ logging.info("total_len:{}, tag_len:{}, tag_name:{}, attr_num:{}"
+ .format(str(total_len), str(tag_len), str(tag_name), str(attr_num)))
+
+ for i in range(attr_num):
+ (key_len, ) = struct.unpack('@H', buf[0 : 2])
+ key_name = buf[2 : 2 + key_len - 1].decode()
+ buf = buf[2 + key_len : ]
+ (type_index, ) = struct.unpack('@c', buf[0 : 1])
+
+ attr_type = attr_type_list[int(type_index[0])]
+ buf = buf[1 : ]
+
+ if attr_type == "ATTR_TYPE_BYTE": # = ATTR_TYPE_INT8
+ (attr_value, ) = struct.unpack('@c', buf[0 : 1])
+ buf = buf[1 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_SHORT": # = ATTR_TYPE_INT16
+ (attr_value, ) = struct.unpack('@h', buf[0 : 2])
+ buf = buf[2 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_INT": # = ATTR_TYPE_INT32
+ (attr_value, ) = struct.unpack('@i', buf[0 : 4])
+ buf = buf[4 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_INT64":
+ (attr_value, ) = struct.unpack('@q', buf[0 : 8])
+ buf = buf[8 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_UINT8":
+ (attr_value, ) = struct.unpack('@B', buf[0 : 1])
+ buf = buf[1 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_UINT16":
+ (attr_value, ) = struct.unpack('@H', buf[0 : 2])
+ buf = buf[2 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_UINT32":
+ (attr_value, ) = struct.unpack('@I', buf[0 : 4])
+ buf = buf[4 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_UINT64":
+ (attr_value, ) = struct.unpack('@Q', buf[0 : 8])
+ buf = buf[8 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_FLOAT":
+ (attr_value, ) = struct.unpack('@f', buf[0 : 4])
+ buf = buf[4 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_DOUBLE":
+ (attr_value, ) = struct.unpack('@d', buf[0 : 8])
+ buf = buf[8 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_BOOLEAN":
+ (attr_value, ) = struct.unpack('@?', buf[0 : 1])
+ buf = buf[1 : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_STRING":
+ (str_len, ) = struct.unpack('@H', buf[0 : 2])
+ attr_value = buf[2 : 2 + str_len - 1].decode()
+ buf = buf[2 + str_len : ]
+ # continue
+ elif attr_type == "ATTR_TYPE_BYTEARRAY":
+ (byte_len, ) = struct.unpack('@I', buf[0 : 4])
+ attr_value = buf[4 : 4 + byte_len]
+ buf = buf[4 + byte_len : ]
+ # continue
+
+ attr_dict[key_name] = attr_value
+
+ logging.info(str(attr_dict))
+ return attr_dict
+
+class Request():
+ mid = 0
+ url = ""
+ action = 0
+ fmt = 0
+ payload = ""
+ payload_len = 0
+ sender = 0
+
+ def __init__(self, url, action, fmt, payload, payload_len):
+ self.url = url
+ self.action = action
+ self.fmt = fmt
+ # if type(payload) == bytes:
+ # self.payload = bytes(payload, encoding = "utf8")
+ # else:
+ self.payload_len = payload_len
+ if self.payload_len > 0:
+ self.payload = payload
+
+
+ def pack_request(self):
+ url_len = len(self.url) + 1
+ buffer_len = url_len + self.payload_len
+
+ req_buffer = struct.pack('!2BH2IHI',1, self.action, self.fmt, self.mid, self.sender, url_len, self.payload_len)
+ for i in range(url_len - 1):
+ req_buffer += struct.pack('!c', bytes(self.url[i], encoding = "utf8"))
+ req_buffer += bytes([0])
+ for i in range(self.payload_len):
+ req_buffer += struct.pack('!B', self.payload[i])
+
+ return req_buffer, len(req_buffer)
+
+
+ def send(self, conn, is_install):
+ leading = struct.pack('!2B', 0x12, 0x34)
+
+ if not is_install:
+ msg_type = struct.pack('!H', 0x0002)
+ else:
+ msg_type = struct.pack('!H', 0x0004)
+ buff, buff_len = self.pack_request()
+ lenth = struct.pack('!I', buff_len)
+
+ try:
+ conn.send(leading)
+ conn.send(msg_type)
+ conn.send(lenth)
+ conn.send(buff)
+ except socket.error as e:
+ logging.error("device closed")
+ for dev in tcpserver.devices:
+ if dev.conn == conn:
+ tcpserver.devices.remove(dev)
+ return -1
+
+
+def query(conn):
+ req = Request("/applet", 1, 0, "", 0)
+ if req.send(conn, False) == -1:
+ return "fail"
+ time.sleep(0.05)
+ try:
+ receive_context = imrt_link_message()
+ start = time.time()
+ while True:
+ if receive_context.on_imrt_link_byte_arrive(conn.recv(1)) == 0:
+ break
+ elif time.time() - start >= 5.0:
+ return "fail"
+ query_resp = receive_context.msg
+ print(query_resp)
+ except OSError as e:
+ logging.error("OSError exception occur")
+ return "fail"
+
+ res = decode_attr_container(query_resp)
+
+ logging.info('Query device infomation success')
+ return res
+
+def install(conn, app_name, wasm_file):
+ wasm = read_file_to_buffer(wasm_file)
+ if wasm == "file not found":
+ return "failed to install: file not found"
+
+ print("wasm file len:")
+ print(len(wasm))
+ req = Request("/applet?name=" + app_name, 3, 98, wasm, len(wasm))
+ if req.send(conn, True) == -1:
+ return "fail"
+ time.sleep(0.05)
+ try:
+ receive_context = imrt_link_message()
+ start = time.time()
+ while True:
+ if receive_context.on_imrt_link_byte_arrive(conn.recv(1)) == 0:
+ break
+ elif time.time() - start >= 5.0:
+ return "fail"
+ msg = receive_context.msg
+ except OSError as e:
+ logging.error("OSError exception occur")
+ # TODO: check return message
+
+ if len(msg) == 24 and msg[8 + 1] == 65:
+ logging.info('Install application success')
+ return "success"
+ else:
+ res = decode_attr_container(msg)
+ logging.warning('Install application failed: %s' % (str(res)))
+ print(str(res))
+
+ return str(res)
+
+
+def uninstall(conn, app_name):
+ req = Request("/applet?name=" + app_name, 4, 99, "", 0)
+ if req.send(conn, False) == -1:
+ return "fail"
+ time.sleep(0.05)
+ try:
+ receive_context = imrt_link_message()
+ start = time.time()
+ while True:
+ if receive_context.on_imrt_link_byte_arrive(conn.recv(1)) == 0:
+ break
+ elif time.time() - start >= 5.0:
+ return "fail"
+ msg = receive_context.msg
+ except OSError as e:
+ logging.error("OSError exception occur")
+ # TODO: check return message
+
+ if len(msg) == 24 and msg[8 + 1] == 66:
+ logging.info('Uninstall application success')
+ return "success"
+ else:
+ res = decode_attr_container(msg)
+ logging.warning('Uninstall application failed: %s' % (str(res)))
+ print(str(res))
+
+ return str(res)
+
+class Device:
+ def __init__(self, conn, addr, port):
+ self.conn = conn
+ self.addr = addr
+ self.port = port
+ self.app_num = 0
+ self.apps = []
+
+cmd = []
+
+class TCPServer:
+ def __init__(self, server, server_address, inputs, outputs, message_queues):
+ # Create a TCP/IP
+ self.server = server
+ self.server.setblocking(False)
+
+ # Bind the socket to the port
+ self.server_address = server_address
+ print('starting up on %s port %s' % self.server_address)
+ self.server.bind(self.server_address)
+
+ # Listen for incoming connections
+ self.server.listen(10)
+
+ self.cmd_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.cmd_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
+
+ self.cmd_sock.bind(('127.0.0.1', 8889))
+ self.cmd_sock.listen(5)
+
+
+ # Sockets from which we expect to read
+ self.inputs = inputs
+ self.inputs.append(self.cmd_sock)
+
+ # Sockets to which we expect to write
+ # 处理要发送的消息
+ self.outputs = outputs
+ # Outgoing message queues (socket: Queue)
+ self.message_queues = message_queues
+
+ self.devices = []
+ self.conn_dict = {}
+
+ def handler_recever(self, readable):
+ # Handle inputs
+ for s in readable:
+ if s is self.server:
+ # A "readable" socket is ready to accept a connection
+ connection, client_address = s.accept()
+ self.client_address = client_address
+ print('connection from', client_address)
+ # this is connection not server
+ # connection.setblocking(0)
+ self.inputs.append(connection)
+
+ # Give the connection a queue for data we want to send
+ # self.message_queues[connection] = queue.Queue()
+
+ res = query(connection)
+
+ if res != "fail":
+ dev = Device(connection, client_address[0], client_address[1])
+ self.devices.append(dev)
+ self.conn_dict[client_address] = connection
+
+ dev_info = {}
+ dev_info['addr'] = dev.addr
+ dev_info['port'] = dev.port
+ dev_info['apps'] = 0
+
+ logging.info('A new client connected from ("%s":"%s")' % (dev.conn, dev.port))
+
+ elif s is self.cmd_sock:
+ connection, client_address = s.accept()
+ print("web server socket connected")
+ logging.info("Django server connected")
+ self.inputs.append(connection)
+ self.message_queues[connection] = queue.Queue()
+
+ else:
+ data = s.recv(1024)
+ if data != b'':
+ # A readable client socket has data
+ logging.info('received "%s" from %s' % (data, s.getpeername()))
+
+ # self.message_queues[s].put(data)
+ # # Add output channel for response
+
+ # if s not in self.outputs:
+ # self.outputs.append(s)
+
+ if(data.decode().split(':')[0] == "query"):
+ if data.decode().split(':')[1] == "all":
+ resp = []
+ print('start query all devices')
+ for dev in self.devices:
+ dev_info = query(dev.conn)
+ if dev_info == "fail":
+ continue
+ dev_info["addr"] = dev.addr
+ dev_info["port"] = dev.port
+ resp.append(str(dev_info))
+
+ print(resp)
+
+ if self.message_queues[s] is not None:
+ # '*' is used in web server to sperate the string
+ self.message_queues[s].put(bytes("*".join(resp), encoding = 'utf8'))
+ if s not in self.outputs:
+ self.outputs.append(s)
+ else:
+ client_addr = (data.decode().split(':')[1],int(data.decode().split(':')[2]))
+
+ if client_addr in self.conn_dict.keys():
+ print('start query device from (%s:%s)' % (client_addr[0], client_addr[1]))
+ resp = query(self.conn_dict[client_addr])
+ print(resp)
+
+ if self.message_queues[s] is not None:
+ self.message_queues[s].put(bytes(str(resp), encoding = 'utf8'))
+ if s not in self.outputs:
+ self.outputs.append(s)
+ else: # no connection
+ if self.message_queues[s] is not None:
+ self.message_queues[s].put(bytes(str("fail"), encoding = 'utf8'))
+ if s not in self.outputs:
+ self.outputs.append(s)
+ elif(data.decode().split(':')[0] == "install"):
+ client_addr = (data.decode().split(':')[1],int(data.decode().split(':')[2]))
+ app_name = data.decode().split(':')[3]
+ app_file = data.decode().split(':')[4]
+
+ if client_addr in self.conn_dict.keys():
+ print('start install application %s to ("%s":"%s")' % (app_name, client_addr[0], client_addr[1]))
+ res = install(self.conn_dict[client_addr], app_name, app_file)
+ if self.message_queues[s] is not None:
+ logging.info("response {} to cmd server".format(res))
+ self.message_queues[s].put(bytes(res, encoding = 'utf8'))
+ if s not in self.outputs:
+ self.outputs.append(s)
+ elif(data.decode().split(':')[0] == "uninstall"):
+ client_addr = (data.decode().split(':')[1],int(data.decode().split(':')[2]))
+ app_name = data.decode().split(':')[3]
+
+ if client_addr in self.conn_dict.keys():
+ print("start uninstall")
+ res = uninstall(self.conn_dict[client_addr], app_name)
+ if self.message_queues[s] is not None:
+ logging.info("response {} to cmd server".format(res))
+ self.message_queues[s].put(bytes(res, encoding = 'utf8'))
+ if s not in self.outputs:
+ self.outputs.append(s)
+
+
+ # if self.message_queues[s] is not None:
+ # self.message_queues[s].put(data)
+ # if s not in self.outputs:
+ # self.outputs.append(s)
+ else:
+ logging.warning(data)
+
+ # Interpret empty result as closed connection
+ try:
+ for dev in self.devices:
+ if s == dev.conn:
+ self.devices.remove(dev)
+ # Stop listening for input on the connection
+ if s in self.outputs:
+ self.outputs.remove(s)
+ self.inputs.remove(s)
+
+ # Remove message queue
+ if s in self.message_queues.keys():
+ del self.message_queues[s]
+ s.close()
+ except OSError as e:
+ logging.error("OSError raised, unknown connection")
+ return "got it"
+
+ def handler_send(self, writable):
+ # Handle outputs
+ for s in writable:
+ try:
+ message_queue = self.message_queues.get(s)
+ send_data = ''
+ if message_queue is not None:
+ send_data = message_queue.get_nowait()
+ except queue.Empty:
+ self.outputs.remove(s)
+ else:
+ # print "sending %s to %s " % (send_data, s.getpeername)
+ # print "send something"
+ if message_queue is not None:
+ s.send(send_data)
+ else:
+ print("client has closed")
+ # del message_queues[s]
+ # writable.remove(s)
+ # print "Client %s disconnected" % (client_address)
+ return "got it"
+
+ def handler_exception(self, exceptional):
+ # # Handle "exceptional conditions"
+ for s in exceptional:
+ print('exception condition on', s.getpeername())
+ # Stop listening for input on the connection
+ self.inputs.remove(s)
+ if s in self.outputs:
+ self.outputs.remove(s)
+ s.close()
+
+ # Remove message queue
+ del self.message_queues[s]
+ return "got it"
+
+
+def event_loop(tcpserver, inputs, outputs):
+ while inputs:
+ # Wait for at least one of the sockets to be ready for processing
+ print('waiting for the next event')
+ readable, writable, exceptional = select.select(inputs, outputs, inputs)
+ if readable is not None:
+ tcp_recever = tcpserver.handler_recever(readable)
+ if tcp_recever == 'got it':
+ print("server have received")
+ if writable is not None:
+ tcp_send = tcpserver.handler_send(writable)
+ if tcp_send == 'got it':
+ print("server have send")
+ if exceptional is not None:
+ tcp_exception = tcpserver.handler_exception(exceptional)
+ if tcp_exception == 'got it':
+ print("server have exception")
+
+
+ sleep(0.1)
+
+def run_wasm_server():
+ server_address = ('localhost', 8888)
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
+ inputs = [server]
+ outputs = []
+ message_queues = {}
+ tcpserver = TCPServer(server, server_address, inputs, outputs, message_queues)
+
+ task = threading.Thread(target=event_loop,args=(tcpserver,inputs,outputs))
+ task.start()
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG,
+ filename='wasm_server.log',
+ filemode='a',
+ format=
+ '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
+ )
+ server_address = ('0.0.0.0', 8888)
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
+ inputs = [server]
+ outputs = []
+ message_queues = {}
+ tcpserver = TCPServer(server, server_address, inputs, outputs, message_queues)
+ logging.info("TCP Server start at {}:{}".format(server_address[0], "8888"))
+
+ task = threading.Thread(target=event_loop,args=(tcpserver,inputs,outputs))
+ task.start()
+
+ # event_loop(tcpserver, inputs, outputs) \ No newline at end of file