harness_api.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. #
  2. # Copyright (C) 2019 Intel Corporation. All rights reserved.
  3. # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  4. #
  5. import os
  6. import shutil
  7. import subprocess
  8. import json
  9. import time
  10. from framework import test_api
  11. from framework.test_utils import *
  12. output = "output.txt"
  13. def start_env():
  14. os.system("./start.sh")
  15. def stop_env():
  16. os.system("./stop.sh")
  17. time.sleep(0.5)
  18. os.chdir("../") #reset path for other cases in the same suite
  19. def check_is_timeout():
  20. line_num = 0
  21. ft = open(output, 'r')
  22. lines = ft.readlines()
  23. for line in reversed(lines):
  24. if (line[0:36] == "--------one operation begin.--------"):
  25. break
  26. line_num = line_num + 1
  27. ft.close()
  28. if (lines[-(line_num)] == "operation timeout"):
  29. return True
  30. else:
  31. return False
  32. def parse_ret(file):
  33. ft = open(file, 'a')
  34. ft.writelines("\n")
  35. ft.writelines("--------one operation finish.--------")
  36. ft.writelines("\n")
  37. ft.close()
  38. ft = open(file, 'r')
  39. for line in reversed(ft.readlines()):
  40. if (line[0:16] == "response status "):
  41. ret = line[16:]
  42. ft.close()
  43. return int(ret)
  44. def run_host_tool(cmd, file):
  45. ft = open(file, 'a')
  46. ft.writelines("--------one operation begin.--------")
  47. ft.writelines("\n")
  48. ft.close()
  49. os.system(cmd + " -o" + file)
  50. if (check_is_timeout() == True):
  51. return -1
  52. return parse_ret(file)
  53. def install_app(app_name, file_name):
  54. return run_host_tool("./host_tool -i " + app_name + " -f ../test-app/" + file_name, output)
  55. def uninstall_app(app_name):
  56. return run_host_tool("./host_tool -u " + app_name, output)
  57. def query_app():
  58. return run_host_tool("./host_tool -q ", output)
  59. def send_request(url, action, payload):
  60. if (payload is None):
  61. return run_host_tool("./host_tool -r " + url + " -A " + action, output)
  62. else:
  63. return run_host_tool("./host_tool -r " + url + " -A " + action + " -p " + payload, output)
  64. def register(url, timeout, alive_time):
  65. return run_host_tool("./host_tool -s " + url + " -t " + str(timeout) + " -a " + str(alive_time), output)
  66. def deregister(url):
  67. return run_host_tool("./host_tool -d " + url, output)
  68. def get_response_payload():
  69. line_num = 0
  70. ft = open(output, 'r')
  71. lines = ft.readlines()
  72. for line in reversed(lines):
  73. if (line[0:16] == "response status "):
  74. break
  75. line_num = line_num + 1
  76. payload_lines = lines[-(line_num):-1]
  77. ft.close()
  78. return payload_lines
  79. def check_query_apps(expected_app_list):
  80. if (check_is_timeout() == True):
  81. return False
  82. json_lines = get_response_payload()
  83. json_str = " ".join(json_lines)
  84. json_dict = json.loads(json_str)
  85. app_list = []
  86. for key, value in json_dict.items():
  87. if key[0:6] == "applet":
  88. app_list.append(value)
  89. if (sorted(app_list) == sorted(expected_app_list)):
  90. return True
  91. else:
  92. return False
  93. def check_response_payload(expected_payload):
  94. if (check_is_timeout() == True):
  95. return False
  96. json_lines = get_response_payload()
  97. json_str = " ".join(json_lines)
  98. if (json_str.strip() != ""):
  99. json_dict = json.loads(json_str)
  100. else:
  101. json_dict = {}
  102. if (json_dict == expected_payload):
  103. return True
  104. else:
  105. return False
  106. def check_get_event():
  107. line_num = 0
  108. ft = open(output, 'r')
  109. lines = ft.readlines()
  110. for line in reversed(lines):
  111. if (line[0:16] == "response status "):
  112. break
  113. line_num = line_num + 1
  114. payload_lines = lines[-(line_num):-1]
  115. ft.close()
  116. if (payload_lines[1][0:17] == "received an event"):
  117. return True
  118. else:
  119. return False