base.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. #!/usr/bin/env python3
  2. #
  3. # Copyright (c) 2022 Project CHIP Authors
  4. # All rights reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. import datetime
  19. # Commissioning test.
  20. import queue
  21. import subprocess
  22. import sys
  23. import threading
  24. import time
  25. import typing
  26. from colorama import Fore, Style
  27. def EnqueueLogOutput(fp, tag, q):
  28. for line in iter(fp.readline, b''):
  29. timestamp = time.time()
  30. if len(line) > len('[1646290606.901990]') and line[0:1] == b'[':
  31. try:
  32. timestamp = float(line[1:18].decode())
  33. line = line[19:]
  34. except Exception:
  35. pass
  36. sys.stdout.buffer.write(
  37. (f"[{datetime.datetime.fromtimestamp(timestamp).isoformat(sep=' ')}]").encode() + tag + line)
  38. sys.stdout.flush()
  39. fp.close()
  40. def RedirectQueueThread(fp, tag, queue) -> threading.Thread:
  41. log_queue_thread = threading.Thread(target=EnqueueLogOutput, args=(
  42. fp, tag, queue))
  43. log_queue_thread.start()
  44. return log_queue_thread
  45. def DumpProgramOutputToQueue(thread_list: typing.List[threading.Thread], tag: str, process: subprocess.Popen, queue: queue.Queue):
  46. thread_list.append(RedirectQueueThread(process.stdout,
  47. (f"[{tag}][{Fore.YELLOW}STDOUT{Style.RESET_ALL}]").encode(), queue))
  48. thread_list.append(RedirectQueueThread(process.stderr,
  49. (f"[{tag}][{Fore.RED}STDERR{Style.RESET_ALL}]").encode(), queue))