Pārlūkot izejas kodu

ci: lift restriction on pygdbmi in panic test

Aleksei Apaseev 3 gadi atpakaļ
vecāks
revīzija
01f1aba2d0

+ 24 - 13
tools/test_apps/system/panic/conftest.py

@@ -3,7 +3,6 @@
 
 # pylint: disable=W0621  # redefined-outer-name
 
-import hashlib
 import logging
 import os
 import subprocess
@@ -14,17 +13,11 @@ import pexpect
 import pytest
 from _pytest.fixtures import FixtureRequest
 from _pytest.monkeypatch import MonkeyPatch
-from pygdbmi.gdbcontroller import GdbController, GdbTimeoutError, NoGdbProcessError
+from pygdbmi.gdbcontroller import GdbController
 from pytest_embedded_idf.app import IdfApp
 from pytest_embedded_idf.dut import IdfDut
 from pytest_embedded_idf.serial import IdfSerial
-
-
-def sha256(file: str) -> str:
-    res = hashlib.sha256()
-    with open(file, 'rb') as fr:
-        res.update(fr.read())
-    return res.hexdigest()
+from utils import NoGdbProcessError, attach_logger, quote_string, sha256, verify_valid_gdb_subprocess
 
 
 class PanicTestDut(IdfDut):
@@ -157,11 +150,21 @@ class PanicTestDut(IdfDut):
         Runs GDB and connects it to the "serial" port of the DUT.
         After this, the DUT expect methods can no longer be used to capture output.
         """
-        self.gdb = GdbController(gdb_path=self.toolchain_prefix + 'gdb')
+        gdb_path = self.toolchain_prefix + 'gdb'
+        try:
+            from pygdbmi.constants import GdbTimeoutError
+            default_gdb_args = ['--nx', '--quiet', '--interpreter=mi2']
+            gdb_command = [gdb_path] + default_gdb_args
+            self.gdb = GdbController(command=gdb_command)
+            pygdbmi_logger = attach_logger()
+        except ImportError:
+            # fallback for pygdbmi<0.10.0.0.
+            from pygdbmi.gdbcontroller import GdbTimeoutError
+            self.gdb = GdbController(gdb_path=gdb_path)
+            pygdbmi_logger = self.gdb.logger
 
         # pygdbmi logs to console by default, make it log to a file instead
         pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt')
-        pygdbmi_logger = self.gdb.logger
         pygdbmi_logger.setLevel(logging.DEBUG)
         while pygdbmi_logger.hasHandlers():
             pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
@@ -170,15 +173,23 @@ class PanicTestDut(IdfDut):
             logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
         )
         pygdbmi_logger.addHandler(log_handler)
+        try:
+            gdb_command = self.gdb.command
+        except AttributeError:
+            # fallback for pygdbmi < 0.10
+            gdb_command = self.gdb.cmd
 
-        logging.info('Running command: %s', self.gdb.get_subprocess_cmd())
+        logging.info(f'Running command: "{" ".join(quote_string(c) for c in gdb_command)}"')
         for _ in range(10):
             try:
                 # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that
                 # an RPI under high load will get non-responsive during creating a lot of processes.
+                if not hasattr(self.gdb, 'verify_valid_gdb_subprocess'):
+                    # for pygdbmi >= 0.10.0.0
+                    verify_valid_gdb_subprocess(self.gdb.gdb_process)
                 resp = self.gdb.get_gdb_response(
                     timeout_sec=10
-                )  # calls verify_valid_gdb_subprocess() internally
+                )  # calls verify_valid_gdb_subprocess() internally for pygdbmi < 0.10.0.0
                 # it will be interesting to look up this response if the next GDB command fails (times out)
                 logging.info('GDB response: %s', resp)
                 break  # success

+ 59 - 0
tools/test_apps/system/panic/utils.py

@@ -0,0 +1,59 @@
+# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Apache-2.0
+
+import hashlib
+import logging
+import re
+import time
+from subprocess import Popen
+
+
+class NoGdbProcessError(ValueError):
+    """Raise when trying to interact with gdb subprocess, but it does not exist.
+    It may have been killed and removed, or failed to initialize for some reason."""
+
+    pass
+
+
+def sha256(file: str) -> str:
+    res = hashlib.sha256()
+    with open(file, 'rb') as fr:
+        res.update(fr.read())
+    return res.hexdigest()
+
+
+def quote_string(string: str) -> str:
+    """Return a shell-escaped version of the string *string*."""
+    _find_unsafe = re.compile(r'[^\w@%+=:,./-]', re.ASCII).search
+    if not string:
+        return "''"
+    if _find_unsafe(string) is None:
+        return string
+
+    # use single quotes, and put single quotes into double quotes
+    # the string $'b is then quoted as '$'"'"'b'
+    return "'" + string.replace("'", "'\"'\"'") + "'"
+
+
+def verify_valid_gdb_subprocess(gdb_process: Popen) -> None:
+    """Verify there is a process object, and that it is still running.
+    Raise NoGdbProcessError if either of the above are not true."""
+    if not gdb_process:
+        raise NoGdbProcessError('gdb process is not attached')
+
+    elif gdb_process.poll() is not None:
+        raise NoGdbProcessError(
+            'gdb process has already finished with return code: %s'
+            % str(gdb_process.poll())
+        )
+
+
+def attach_logger() -> logging.Logger:
+    handler = logging.StreamHandler()
+    handler.setFormatter(logging.Formatter('%(message)s'))
+    unique_number = time.time()
+    logger = logging.getLogger(__name__ + '.' + str(unique_number))
+    logger.propagate = False
+    logger.setLevel(logging.ERROR)
+    logger.addHandler(handler)
+    return logger