wifi_scan.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # APIs for interpreting and creating protobuf packets for Wi-Fi Scanning
  16. from __future__ import print_function
  17. from future.utils import tobytes
  18. import utils
  19. import proto
  20. def print_verbose(security_ctx, data):
  21. if (security_ctx.verbose):
  22. print("++++ " + data + " ++++")
  23. def scan_start_request(security_ctx, blocking=True, passive=False, group_channels=5, period_ms=120):
  24. # Form protobuf request packet for ScanStart command
  25. cmd = proto.wifi_scan_pb2.WiFiScanPayload()
  26. cmd.msg = proto.wifi_scan_pb2.TypeCmdScanStart
  27. cmd.cmd_scan_start.blocking = blocking
  28. cmd.cmd_scan_start.passive = passive
  29. cmd.cmd_scan_start.group_channels = group_channels
  30. cmd.cmd_scan_start.period_ms = period_ms
  31. enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1')
  32. print_verbose(security_ctx, "Client -> Device (Encrypted CmdScanStart) " + utils.str_to_hexstr(enc_cmd))
  33. return enc_cmd
  34. def scan_start_response(security_ctx, response_data):
  35. # Interpret protobuf response packet from ScanStart command
  36. dec_resp = security_ctx.decrypt_data(tobytes(response_data))
  37. resp = proto.wifi_scan_pb2.WiFiScanPayload()
  38. resp.ParseFromString(dec_resp)
  39. print_verbose(security_ctx, "ScanStart status " + str(resp.status))
  40. if resp.status != 0:
  41. raise RuntimeError
  42. def scan_status_request(security_ctx):
  43. # Form protobuf request packet for ScanStatus command
  44. cmd = proto.wifi_scan_pb2.WiFiScanPayload()
  45. cmd.msg = proto.wifi_scan_pb2.TypeCmdScanStatus
  46. enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1')
  47. print_verbose(security_ctx, "Client -> Device (Encrypted CmdScanStatus) " + utils.str_to_hexstr(enc_cmd))
  48. return enc_cmd
  49. def scan_status_response(security_ctx, response_data):
  50. # Interpret protobuf response packet from ScanStatus command
  51. dec_resp = security_ctx.decrypt_data(tobytes(response_data))
  52. resp = proto.wifi_scan_pb2.WiFiScanPayload()
  53. resp.ParseFromString(dec_resp)
  54. print_verbose(security_ctx, "ScanStatus status " + str(resp.status))
  55. if resp.status != 0:
  56. raise RuntimeError
  57. return {"finished": resp.resp_scan_status.scan_finished, "count": resp.resp_scan_status.result_count}
  58. def scan_result_request(security_ctx, index, count):
  59. # Form protobuf request packet for ScanResult command
  60. cmd = proto.wifi_scan_pb2.WiFiScanPayload()
  61. cmd.msg = proto.wifi_scan_pb2.TypeCmdScanResult
  62. cmd.cmd_scan_result.start_index = index
  63. cmd.cmd_scan_result.count = count
  64. enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1')
  65. print_verbose(security_ctx, "Client -> Device (Encrypted CmdScanResult) " + utils.str_to_hexstr(enc_cmd))
  66. return enc_cmd
  67. def scan_result_response(security_ctx, response_data):
  68. # Interpret protobuf response packet from ScanResult command
  69. dec_resp = security_ctx.decrypt_data(tobytes(response_data))
  70. resp = proto.wifi_scan_pb2.WiFiScanPayload()
  71. resp.ParseFromString(dec_resp)
  72. print_verbose(security_ctx, "ScanResult status " + str(resp.status))
  73. if resp.status != 0:
  74. raise RuntimeError
  75. authmode_str = ["Open", "WEP", "WPA_PSK", "WPA2_PSK", "WPA_WPA2_PSK", "WPA2_ENTERPRISE"]
  76. results = []
  77. for entry in resp.resp_scan_result.entries:
  78. results += [{"ssid": entry.ssid.decode('latin-1').rstrip('\x00'),
  79. "bssid": utils.str_to_hexstr(entry.bssid.decode('latin-1')),
  80. "channel": entry.channel,
  81. "rssi": entry.rssi,
  82. "auth": authmode_str[entry.auth]}]
  83. print_verbose(security_ctx, "ScanResult SSID : " + str(results[-1]["ssid"]))
  84. print_verbose(security_ctx, "ScanResult BSSID : " + str(results[-1]["bssid"]))
  85. print_verbose(security_ctx, "ScanResult Channel : " + str(results[-1]["channel"]))
  86. print_verbose(security_ctx, "ScanResult RSSI : " + str(results[-1]["rssi"]))
  87. print_verbose(security_ctx, "ScanResult AUTH : " + str(results[-1]["auth"]))
  88. return results