summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/test-tools/component-test/harness/harness_api.py
blob: e35aa6b4cd87995a8120334591dbcc5182b95581 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#
# Copyright (C) 2019 Intel Corporation.  All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#

import os
import shutil
import subprocess
import json
import time

from framework import test_api
from framework.test_utils import *

output = "output.txt"

def start_env():
    os.system("./start.sh")

def stop_env():
    os.system("./stop.sh")
    time.sleep(0.5)
    os.chdir("../") #reset path for other cases in the same suite

def check_is_timeout():
    line_num = 0
    ft = open(output, 'r')
    lines = ft.readlines()

    for line in reversed(lines):
        if (line[0:36] == "--------one operation begin.--------"):
            break
        line_num = line_num + 1

    ft.close()
    if (lines[-(line_num)] == "operation timeout"):
        return True
    else:
        return False

def parse_ret(file):
    ft = open(file, 'a')
    ft.writelines("\n")
    ft.writelines("--------one operation finish.--------")
    ft.writelines("\n")
    ft.close()

    ft = open(file, 'r')
    for line in reversed(ft.readlines()):
        if (line[0:16] == "response status "):
            ret = line[16:]
            ft.close()
            return int(ret)

def run_host_tool(cmd, file):
    ft = open(file, 'a')
    ft.writelines("--------one operation begin.--------")
    ft.writelines("\n")
    ft.close()
    os.system(cmd + " -o" + file)
    if (check_is_timeout() == True):
        return -1
    return parse_ret(file)

def install_app(app_name, file_name):
    return run_host_tool("./host_tool -i " + app_name + " -f ../test-app/" + file_name, output)

def uninstall_app(app_name):
    return run_host_tool("./host_tool -u " + app_name, output)

def query_app():
    return run_host_tool("./host_tool -q ", output)

def send_request(url, action, payload):
    if (payload is None):
        return run_host_tool("./host_tool -r " + url + " -A " + action, output)
    else:
        return run_host_tool("./host_tool -r " + url + " -A " + action + " -p " + payload, output)

def register(url, timeout, alive_time):
    return run_host_tool("./host_tool -s " + url + " -t " + str(timeout) + " -a " + str(alive_time), output)

def deregister(url):
    return run_host_tool("./host_tool -d " + url, output)

def get_response_payload():
    line_num = 0
    ft = open(output, 'r')
    lines = ft.readlines()

    for line in reversed(lines):
        if (line[0:16] == "response status "):
            break
        line_num = line_num + 1

    payload_lines = lines[-(line_num):-1]
    ft.close()

    return payload_lines

def check_query_apps(expected_app_list):
    if (check_is_timeout() == True):
        return False
    json_lines = get_response_payload()
    json_str = " ".join(json_lines)
    json_dict = json.loads(json_str)
    app_list = []

    for key, value in json_dict.items():
        if key[0:6] == "applet":
            app_list.append(value)

    if (sorted(app_list) == sorted(expected_app_list)):
        return True
    else:
        return False

def check_response_payload(expected_payload):
    if (check_is_timeout() == True):
        return False
    json_lines = get_response_payload()
    json_str = " ".join(json_lines)

    if (json_str.strip() != ""):
        json_dict = json.loads(json_str)
    else:
        json_dict = {}

    if (json_dict == expected_payload):
        return True
    else:
        return False

def check_get_event():
    line_num = 0
    ft = open(output, 'r')
    lines = ft.readlines()

    for line in reversed(lines):
        if (line[0:16] == "response status "):
            break
        line_num = line_num + 1

    payload_lines = lines[-(line_num):-1]
    ft.close()

    if (payload_lines[1][0:17] ==  "received an event"):
        return True
    else:
        return False