import os import shutil import ctypes import sys import re import platform import subprocess import threading import queue import datetime import time import renderdoc as rd from . import util from . import testcase from .logging import log from pathlib import Path from rdtest.remoteserver import RemoteServer def get_tests(): testcases = [] for m in sys.modules.values(): for name in m.__dict__: obj = m.__dict__[name] if isinstance(obj, type) and issubclass(obj, testcase.TestCase) and obj != testcase.TestCase and not obj.internal: testcases.append(obj) testcases.sort(key=lambda t: (t.slow_test,t.__name__)) return testcases RUNNER_DEBUG = False # Debug test runner running by printing messages to track it def _enqueue_output(process: subprocess.Popen, out, q: queue.Queue): try: for line in iter(out.readline, b''): q.put(line) if process.returncode is not None: break except Exception: pass def _run_test(testclass, runner_timeout, failedcases: list): name = testclass.__name__ # Fork the interpreter to run the test, in case it crashes we can catch it. # We can re-run with the same parameters args = sys.argv.copy() args.insert(0, sys.executable) # Add parameter to run the test itself args.append('--internal_run_test') args.append(name) test_run = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) output_threads = [] test_stdout = queue.Queue() t = threading.Thread(target=_enqueue_output, args=(test_run, test_run.stdout, test_stdout)) t.daemon = True # thread dies with the program t.start() output_threads.append(t) test_stderr = queue.Queue() t = threading.Thread(target=_enqueue_output, args=(test_run, test_run.stderr, test_stderr)) t.daemon = True # thread dies with the program t.start() output_threads.append(t) if RUNNER_DEBUG: print("Waiting for test runner to complete...") out_pending = "" err_pending = "" while test_run.poll() is None: out = err = "" if RUNNER_DEBUG: print("Checking runner output...") try: out = test_stdout.get(timeout=runner_timeout) while not test_stdout.empty(): out += test_stdout.get_nowait() if test_run.poll() is not None: break except queue.Empty: out = None # No output try: err = None while not test_stderr.empty(): if err is None: err = '' err += test_stderr.get_nowait() if test_run.poll() is not None: break except queue.Empty: err = None # No output if RUNNER_DEBUG: if out is not None: print("Test stdout: {}".format(out)) if err is not None: print("Test stderr: {}".format(err)) else: if out is not None: out_pending += out if err is not None: err_pending += err while True: try: nl = out_pending.index('\n') line = out_pending[0:nl] out_pending = out_pending[nl+1:] line = line.replace('\r', '') sys.stdout.write(line + '\n') sys.stdout.flush() except: break while True: try: nl = err_pending.index('\n') line = err_pending[0:nl] err_pending = err_pending[nl+1:] line = line.replace('\r', '') sys.stderr.write(line + '\n') sys.stderr.flush() except: break if out is None and err is None and test_run.poll() is None: log.error('Timed out, no output within {}s elapsed'.format(runner_timeout)) test_run.kill() test_run.communicate() raise subprocess.TimeoutExpired(' '.join(args), runner_timeout) if RUNNER_DEBUG: print("Test runner has finished") # If we couldn't get the return code, something went wrong in the timeout above # and the program never exited. Try once more to kill it then bail if test_run.returncode is None: test_run.kill() test_run.communicate() raise RuntimeError('INTERNAL ERROR: Couldn\'t get test return code') for t in output_threads: t.join(10) if t.is_alive(): raise RuntimeError('INTERNAL ERROR: Subprocess output thread couldn\'t be closed') # Return code of 0 means we exited cleanly, nothing to do if test_run.returncode == 0: pass # Return code of 1 means the test failed, but we have already logged the exception # so we just need to mark this test as failed elif test_run.returncode == 1: failedcases.append(testclass) else: raise RuntimeError('Test did not exit cleanly while running, possible crash. Exit code {}' .format(test_run.returncode)) def fetch_tests(): output = util.run_demo_blocking(['--list-raw']).splitlines() # Skip to just past the header, grab all the remaining lines tests = output[output.index("Name\tAvailable\tAvailMessage")+1:] # Split the TSV values and store split_tests = [ test.split('\t') for test in tests ] return { x[0]: (x[1] == 'True', x[2]) for x in split_tests } def run_tests(test_include: str, test_exclude: str, in_process: bool, slow_tests: bool, debugger: bool, test_timeout: int): start_time = datetime.datetime.now(datetime.timezone.utc) rd.InitialiseReplay(rd.GlobalEnvironment(), []) server: RemoteServer = util.get_remote_server() if server is not None: server.init(in_process) # On windows, disable error reporting if 'windll' in dir(ctypes): ctypes.windll.kernel32.SetErrorMode(1 | 2) # SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX # clean up artifacts and temp folder if os.path.exists(util.get_artifact_dir()): shutil.rmtree(util.get_artifact_dir(), ignore_errors=True) if os.path.exists(util.get_tmp_dir()): shutil.rmtree(util.get_tmp_dir(), ignore_errors=True) log.add_output(util.get_artifact_path("output.log.html")) for file in ['testresults.css', 'testresults.js']: shutil.copyfile(os.path.join(os.path.dirname(__file__), file), util.get_artifact_path(file)) log.rawprint('' + '
' + '' + '', with_stdout=False) if server is not None: server.shutdown() rd.ShutdownReplay() if len(failedcases) > 0: sys.exit(1) sys.exit(0) def vulkan_register(): rd.UpdateVulkanLayerRegistration(True) def launch_remote_server(): # Fork the interpreter to run the test, in case it crashes we can catch it. # We can re-run with the same parameters args = sys.argv.copy() args.insert(0, sys.executable) # Add parameter to run the remote server itself args.append('--internal_remote_server') # if we're running from renderdoccmd, invoke it properly if 'renderdoccmd' in sys.executable: # run_tests.py # --renderdoc #