Source code for pysys.process.user

#!/usr/bin/env python
# PySys System Test Framework, Copyright (C) 2006-2022 M.B. Grieve

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


"""
Contains `pysys.process.user.ProcessUser` which supports using processes from PySys, and provides the 
shared functionality of subclasses `pysys.basetest.BaseTest` and `pysys.baserunner.BaseRunner`. 
"""

import time, collections, inspect, locale, fnmatch, sys
import threading
import shutil
import contextlib
import importlib
import concurrent.futures

from pysys import log, process_lock
from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.filegrep import getmatches
from pysys.utils.logutils import BaseLogFormatter, stripANSIEscapeCodes
from pysys.config.project import Project
from pysys.utils.allocport import TCPPortOwner
from pysys.utils.fileutils import mkdir, deletedir, deletefile, pathexists, toLongPathSafe, fromLongPathSafe
from pysys.utils.pycompat import *
import pysys.utils.threadutils
import pysys.utils.safeeval
from pysys.mappers import applyMappers

if IS_WINDOWS:
	import win32api, win32con, win32event
else:
	import fcntl

[docs]class STDOUTERR_TUPLE(collections.namedtuple('stdouterr', ['stdout', 'stderr'])): """ Returned by `ProcessUser.allocateUniqueStdOutErr` to hold a pair of ``(stdout,stderr)`` names. """ __slots__ = () # to save memory as per the python docs @property def key(self): """ Returns the key (the prefix without the ``.out`` or ``.err``), to which you can append your own extension. For example to create a log file with the same base name as the stdout/err files:: self.startProcess(..., arguments=['--logfile', stdouterr.key+'.log'], ...) .. versionadded: 1.6.1 """ assert self.stdout.endswith('.out') return self.stdout[:-4]
[docs]class ProcessUser(object): """ ProcessUser provides support for safely using processes in PySys, including starting processes (with an appropriate output directory), waiting for them to do what they're supposed to do, and finally ensuring all processes and ports are cleaned up when each test (or the overall runner) terminates. As the common base class of both `pysys.basetest.BaseTest` and `pysys.baserunner.BaseRunner`, ProcessUser also holds shared functionality such as dynamic port allocation, copying files, creating directories etc, and keeps track of the "outcome" generated by some methods (although the outcome value is only used when it is subclassed by BaseTest). Each ProcessUser instance is responsible for managing the lifetime of the processes started using its `startProcess` method, and ensuring they are all terminated when the `cleanup` method is invoked. Additional functions to be executed during cleanup can be registered using `addCleanupFunction`, for example to delete large output directories, or terminate non-process resources such as docker containers or remote servers. Apart from the `addOutcome` method this class is not thread-safe, so if you need to access it from multiple threads be sure to add your own locking around use of its fields and methods, including any cleanup functions. :ivar str ~.input: Full path to the directory containing input files (e.g. ``testdir/Input``) :ivar str ~.output: Full path to the directory that output files should be written to (e.g. ``testdir/Output/<platformname>``) :ivar logging.Logger ~.log: The Python ``Logger`` instance that should be used to record progress and status information. :ivar pysys.config.project.Project ~.project: A reference to the singleton project instance containing the configuration of this PySys test project as defined by ``pysysproject.xml``. The project can be used to access information such as the project properties which are shared across all tests (e.g. for hosts and credentials). :ivar bool ~.disableCoverage: Set to True to disable all code coverage collection for processes started from this instance. This is automatically set for any tests marked with the "disableCoverage" group in their ``pysystest.*`` file. The built-in Python code coverage functionality in L{startPython} checks this flag. It is recommended that any other languages supporting code coverage also check the self.disableCoverage flag. :ivar bool ~.isCleanupInProgress: Set to True after the cleanup phase for this object begins. Additional variables that affect only the behaviour of a single method are documented in the associated method. """ # Runner abort status. The fields live here since we need to check this flag mostly from subclasses of ProcessUser # and want to do that both efficiently and conveniently. isRunnerAborting = False """ This static boolean field is set to True if this entire process/test run is in the process of terminating early due to an interrupt from the keyboard or a signal. .. versionadded:: 2.2 """ isRunnerAbortingEvent = win32event.CreateEvent(None, True, False, None) if IS_WINDOWS else None """ A Windows (pywin32) event that will be set/signalled when PySys is requested to terminate, or ``None`` if not supported on this platform. Use this with the pywin32 function ``win32event.WaitForMultipleObjects`` to perform waits that are aborted if a termination request happens. .. versionadded:: 2.2 """ isRunnerAbortingHandle = None """ A file handle that will have bytes available for reading when PySys is requested to terminate, or ``None`` if not supported on this platform. Not available on Windows. Use this with ``select`` to perform waits that are aborted if a termination request happens. Do not under any circumstances actually read from this handle. .. versionadded:: 2.2 """ def __init__(self): self.log = log """The logger instance that should be used to log from this class. """ self.project = Project.getInstance() """The `pysys.config.project.Project` instance containing settings for this PySys project.""" if self.project is None: assert 'doctest' in sys.argv[0], 'Project was not loaded yet' # allow it only during doctest-ing else: self.defaultAbortOnError = self.project.getProperty('defaultAbortOnError', True) self.defaultIgnoreExitStatus = self.project.getProperty('defaultIgnoreExitStatus', False) self.input = self.project.testRootDir self.output = None # must be set by subclass self.processList = [] self.processCount = {} self.__cleanupFunctions = [] # (fn, ignoreErrors) self.outcome = [] # internal, do NOT use directly self.__outcomeReason = '' self.__outcomeLocation = (None,None) self.__uniqueProcessKeys = {} self.__pythonCoverageFile = 0 self.disableCoverage = False self.lock = threading.RLock() """ A recursive lock that can be used for protecting the fields of this instance from access by background threads, as needed. Do NOT hold this lock while performing long operations or acquiring other locks. """ self.isCleanupInProgress = False # variables affecting a specific method (documented there rather than above) self.logFileContentsDefaultExcludes = [] self.threadPoolMaxWorkers = None # real initialization happens in _initThreadPoolMaxWorkers after runner __init__ has been called, unless a value is overridden here self.grepWarnIfLineLongerThan = 10000 self.grepTruncateIfLineLongerThan = 0 """ Set this to a number of characters to automatically truncate long lines during `waitForGrep` (after all mappers have completed) to the specified length. This prevents warnings or slow regular expression in large/long log files. .. versionadded:: 2.2 """ def _initThreadPoolMaxWorkers(self, pysysThreads): # In theory allow this to be influenced by pysysThreads, but for now we pick a single value since regardless of the number of pysys threads # a smaller number of threads make the pooling useless for I/O bound operations like HTTP requests (the main use case) and # a larger (or more machine-scalable) number could cause an explosive overload to the machine or Python GIL if many/all of the PySys workers/tests # each had their own pool return 6
[docs] def allocateUniqueStdOutErr(self, processKey): """Allocate unique filenames of the form ``processKey[.n].out/.err`` which can be used for the `startProcess` ``stdouterr`` parameter. The first time this is called it will return names like ``('outdir/myprocess.out', 'outdir/myprocess.err')``, the second time it will return ``('outdir/myprocess.1.out', 'outdir/myprocess.1.err')``, then ``('outdir/myprocess.2.out', 'outdir/myprocess.2.err')`` etc. :param str processKey: A user-defined identifier that will form the prefix onto which ``[.n].out`` is appended :return: A STDOUTERR_TUPLE named tuple of (stdout, stderr), where each is an absolute path. :rtype: STDOUTERR_TUPLE """ newval = self.__uniqueProcessKeys.get(processKey, -1)+1 self.__uniqueProcessKeys[processKey] = newval suffix = '.%d'%(newval) if newval > 0 else '' return STDOUTERR_TUPLE( os.path.join(self.output, processKey+suffix+'.out'), os.path.join(self.output, processKey+suffix+'.err'), )
[docs] def getInstanceCount(self, displayName): """(Deprecated) Return the number of processes started within the testcase matching the supplied displayName. :deprecated: The recommended way to allocate unique names is now L{allocateUniqueStdOutErr} The ProcessUser class maintains a reference count of processes started within the class instance via the L{startProcess()} method. The reference count is maintained against a logical name for the process, which is the C{displayName} used in the method call to L{startProcess()}, or the basename of the command if no displayName was supplied. The method returns the number of processes started with the supplied logical name, or 0 if no processes have been started. :param displayName: The process display name :return: The number of processes started matching the command basename :rtype: int """ if displayName in self.processCount: return self.processCount[displayName] else: return 0
def setKeywordArgs(self, xargs): """Set the xargs as data attributes of the class. For internal use by BaseTest/BaseRunner only. :meta private: Values in the xargs dictionary are set as data attributes using the builtin C{setattr} method. Thus an xargs dictionary of the form C{{'foo': 'bar'}} will result in a data attribute of the form C{self.foo} with C{value bar}. This is used so that subclasses can define default values of data attributes, which can be overriden on instantiation e.g. using the -X options to the runTest.py launch executable. If an existing attribute is present on this test class (typically a static class variable) and it has a type of bool, int, float or list, then any -X options will be automatically converted from string to that type. This facilitates providing default values for parameters such as iteration count or timeouts as static class variables with the possibility of overriding on the command line, for example `-Xiterations=123`. :param xargs: A dictionary of the user defined extra arguments """ pysys.utils.misc.setInstanceVariablesFromDict(self, xargs)
[docs] def getBoolProperty(self, propertyName, default=False): """ Get a True/False indicating whether the specified attribute is set on this object (typically as a result of specifying -X on the command line), or else from the project configuration. See also `pysys.baserunner.getXArg()` and `pysys.config.project.Project.getProperty()`. :param propertyName: The name of a property set on the command line or project configuration. """ val = getattr(self, propertyName, None) if val is None: val = getattr(self.project, propertyName, None) if val is None: return default if val is True or val is False: return val return val.lower()=='true'
[docs] def startPython(self, arguments, disableCoverage=False, **kwargs): """ Start a Python process with the specified arguments. Uses the same Python process the tests are running under. If PySys was run with the argument ``-XcodeCoverage`` or ``-XpythonCoverage`` then `startPython` will add the necessary arguments to enable generation of code coverage. Note that this required the coverage.py library to be installed. If a project property called `pythonCoverageArgs` exists then its value will be added as (space-delimited) arguments to the coverage tool. :param arguments: The arguments to pass to the Python executable. Typically the first one be either the name of a Python script to execute, or ``-m`` followed by a module name. :param kwargs: See L{startProcess} for detail on available arguments. :param disableCoverage: Disables code coverage for this specific process. Coverage can also be disabled by setting ``self.disableCoverage==True`` on this test instance. :return: The process handle of the process. :rtype: pysys.process.Process """ args = arguments if 'environs' in kwargs: environs = kwargs['environs'] else: environs = kwargs.setdefault('environs', self.getDefaultEnvirons(command=sys.executable)) coverageWriter = [writer for writer in getattr(self, 'runner', self).writers if isinstance(writer, pysys.writer.PythonCoverageWriter)] if coverageWriter and not disableCoverage and not self.disableCoverage: args = ['-m', 'coverage', 'run']+coverageWriter[0].getCoverageArgsList()+args if 'COVERAGE_FILE' not in environs: kwargs['environs'] = dict(environs) with self.lock: self.__pythonCoverageFile += 1 kwargs['environs']['COVERAGE_FILE'] = self.output+'/.coverage.python.%02d'%(self.__pythonCoverageFile) return self.startProcess(sys.executable, arguments=args, **kwargs)
[docs] def startProcess(self, command, arguments, environs=None, workingDir=None, state=None, timeout=TIMEOUTS['WaitForProcess'], stdout=None, stderr=None, displayName=None, abortOnError=None, expectedExitStatus='==0', ignoreExitStatus=None, onError=None, quiet=False, stdouterr=None, background=False, info={}, processFactory=None): """Start a process running in the foreground or background, and return the `pysys.process.Process` object. Typical use is:: myexecutable = self.startProcess('path_to_my_executable', arguments=['myoperation', 'arg1','arg2'], environs=self.createEnvirons(addToLibPath=['my_ld_lib_path']), # if a customized environment is needed stdouterr=self.allocateUniqueStdOutErr('myoperation'), # for stdout/err files, pick a suitable logical name for what it's doing background=True # or remove for default behaviour of executing in foreground ) The method allows spawning of new processes in a platform independent way. The command, arguments, environment and working directory to run the process in can all be specified in the arguments to the method, along with the filenames used for capturing the stdout and stderr of the process. Processes may be started in the foreground, in which case the method does not return until the process has completed or a time out occurs, or in the background in which case the method returns immediately to the caller returning a handle to the process to allow manipulation at a later stage, typically with L{waitProcess}. All processes started in the background are automatically killed on completion of the test via the L{cleanup()} destructor. To wait for background processes to complete during `execute` and/or before `verify` commences, call `waitForBackgroundProcesses()`. This is especially useful when you have a test that needs to execute lots of processes asynchronously, since having them all execute concurrently in the background and then calling `waitForBackgroundProcesses()` will be a lot quicker than executing them serially in the foreground. When starting a process that will listen on a server socket, use `getNextAvailableTCPPort` to allocate a free port before calling this method. Note that although is is possible to use this command to execute OS shell commands, that should only used for testing of shell scripts - other logic such as file system operations can be executed more easily and robustly using built-in Python (``os`` module) or PySys (e.g. `BaseTest.copy`) functions. .. versionchanged:: 1.6.0 Added onError parameter and default behaviour of logging stderr/out when there's a failure. Added info parameter. .. versionchanged:: 2.0 Added processFactory parameter. :param str command: The path to the executable to be launched (should include the full path) :param list[str] arguments: A list of arguments to pass to the command. Any non-string values in the list are converted to strings automatically. :param dict(str,str) environs: A dictionary specifying the environment to run the process in. If a None or empty dictionary is passed, L{getDefaultEnvirons} will be invoked to produce a suitable clean default environment for this `command`, containing a minimal set of variables. If you wish to specify a customized environment, L{createEnvirons()} is a great way to create it. :param str workingDir: The working directory for the process to run in (defaults to the testcase output subdirectory) :param bool background: Set to True to start the process in the background. By default processes are started in the foreground, meaning execution of the test will continue only once the process has terminated. :param state: Alternative way to set ``background=True``. Run the process either in the C{FOREGROUND} or C{BACKGROUND} (defaults to C{FOREGROUND}). Setting state=BACKGROUND is equivalent to setting background=True; in new tests using background=True is the preferred way to do this. :param int timeout: The number of seconds after which to terminate processes running in the foreground. For processes that complete in a few seconds or less, it is best to avoid overriding this and stick with the default. However for long-running foreground processes it will be necessary to set a larger number, for example if running a soak test where the process needs to run for up to 2 hours you could set ``timeout=2*60*60``. :param str stdouterr: The filename prefix to use for the stdout and stderr of the process (`.out`/`.err` will be appended), or a tuple of (stdout,stderr) as returned from L{allocateUniqueStdOutErr}. The stdouterr prefix is also used to form a default display name for the process if none is explicitly provided. The files are created relative to the test output directory. The filenames can be accessed from the returned process object using ``.stdout/err`` from `pysys.process.Process`. :param str stdout: The filename used to capture the stdout of the process. It is usually simpler to use `stdouterr` instead of this. :param str stderr: The filename used to capture the stderr of the process. It is usually simpler to use `stdouterr` instead of this. :param str displayName: Logical name of the process used for display in log messages, and the str(...) representation of the returned process object (defaults to a string generated from the stdouterr and/or the command). :param bool abortOnError: If true abort the test on any error outcome (defaults to the defaultAbortOnError project setting) :param str expectedExitStatus: The condition string used to determine whether the exit status/code returned by the process is correct. The default is '==0', as an exit code of zero usually indicates success, but if you are expecting a non-zero exit status (for example because you are testing correct handling of a failure condition) this could be set to '!=0' or a specific value such as '==5'. :param bool ignoreExitStatus: If False, a BLOCKED outcome is added if the process terminates with an exit code that doesn't match expectedExitStatus (or if the command cannot be run at all). This can be set to True in cases where you do not care whether the command succeeds or fails, or wish to handle the exit status separately with more complicated logic. The default value of ignoreExitStatus=None means the value will be taken from the project property defaultIgnoreExitStatus, which can be configured in the project XML (the recommended default property value is defaultIgnoreExitStatus=False), or is set to True for compatibility with older PySys releases if no project property is set. :param Callable[pysys.process.Process]->str onError: A function that will be called if the process times out or returns an unexpected exit status (unless ignoreExitStatus=True), before any abort exception is raised. This provides a convenient place to add logging of diagnostic information (perhaps using the stdout/err of the process) and/or extracting and returning an error message from the output, for example: ``onError=lambda process: self.logFileContents(process.stderr, tail=True) or self.logFileContents(process.stdout, tail=True)``. If a string value is returned from it will be added to the failure reason, e.g. ``onError=lambda process: self.logFileContents(process.stderr, tail=True) and self.grepOrNone(process.stderr, 'Error: (.*)')``. If no onError function is specified, the default is to log the last few lines of stderr (or if empty, stdout) when a process fails and abortOnError=True. The ``self.logFileContentsDefaultExcludes`` variable can be used to add regular expressions to exclude unimportant lines of output such as standard startup lines (see `logFileContents`). :param bool quiet: If True, this method will not do any INFO or WARN level logging (only DEBUG level), unless a failure outcome is appended. This parameter can be useful to avoid filling up the log where it is necessary to repeatedly execute a command check for completion of some operation until it succeeds; in such cases you should usually set ignoreExitStatus=True as well since both success and failure exit statuses are valid. :param dict[str,obj] info: A dictionary of user-defined information about this process that will be set as a field on the returned Process instance. This is useful for keeping track of things like server port numbers and log file paths. :param callable[kwargs] processFactory: A callable (such as a class constructor) that returns an instance or subclass of `pysys.process.helper.ProcessImpl`. This can be used either to provide a custom process subclass with extra features, or to make modifications to the arguments or environment that were specified by the code that invoked ``startProcess()``. The signature must consist of a ``**kwargs`` parameter, the members of which will be populated by the parameters listed in the constructor of `pysys.process.Process`, and can be modified by the factory. For example:: def myProcessFactory(**kwargs): kwargs['arguments'] = kwargs['arguments'][0]+['my_extra_arg']+kwargs['arguments'][1:] return pysys.process.helper.ProcessImpl(**kwargs) :return: The process object. :rtype: pysys.process.Process """ if state is None: state = FOREGROUND if background: state = BACKGROUND if ignoreExitStatus == None: ignoreExitStatus = self.defaultIgnoreExitStatus workingDir = os.path.join(self.output, workingDir or '') if abortOnError == None: abortOnError = self.defaultAbortOnError if stdouterr: if stdout or stderr: raise Exception('Cannot specify both stdouterr and stdout/stderr') if isstring(stdouterr): stdout = stdouterr+'.out' stderr = stdouterr+'.err' else: stdout, stderr = stdouterr if not displayName: # Heuristically the name selected by the user for stdout/err usually represents the # logical purpose of the process so makes a great display name. # Also add the command (unless they're the same). # NB: We do not do this if stdout/stderr are used since that could break # behaviour for old tests using getInstanceCount. displayName = os.path.basename(stdout.replace('.out','')) if os.path.basename(command) not in displayName and displayName not in command: displayName = '%s<%s>'%(os.path.basename(command), displayName) # in case stdout/err were given as non-absolute paths, make sure they go to the output dir not the cwd if stdout: stdout = os.path.join(self.output, stdout) if stderr: stderr = os.path.join(self.output, stderr) if not displayName: displayName = os.path.basename(command) if not environs: # a truly empty env isn't really usable, so populate it with a minimal default environment instead environs = self.getDefaultEnvirons(command=command) startTime = time.monotonic() # pass everything as a named parameter, which makes life easier for custom factory methods if processFactory is None: processFactory = pysys.process.helper.ProcessImpl process = processFactory(command=command, arguments=arguments, environs=environs, workingDir=workingDir, state=state, timeout=timeout, stdout=stdout, stderr=stderr, displayName=displayName, expectedExitStatus=expectedExitStatus, info=info, owner=self) def handleErrorAndGetOutcomeSuffix(process): if onError: try: suffix = onError(process) except Exception as ex: self.log.debug('Ignoring exception from onError handler: ', exc_info=True) else: if suffix and isstring(suffix): return ' - '+suffix.strip() elif abortOnError: self.logFileContents(process.stderr, tail=True) or self.logFileContents(process.stdout, tail=True) return '' try: process.start() if state == FOREGROUND: correctExitStatus = pysys.utils.safeeval.safeEval('%d %s'%(process.exitStatus, expectedExitStatus), extraNamespace={'self':self}) logmethod = log.info if correctExitStatus else log.warning if quiet: logmethod = log.debug logmethod("Executed %s, exit status %d%s", displayName, process.exitStatus, ", duration %d secs" % (time.monotonic()-startTime) if (int(time.monotonic()-startTime)) > 10 else "") if not ignoreExitStatus and not correctExitStatus: if not stderr and not quiet: log.warning('Process %s has no stdouterr= specified; providing this parameter will allow PySys to capture the process output that shows why it failed', process) self.addOutcome(BLOCKED, ( ('%s returned non-zero exit code %d'%(process, process.exitStatus)) if expectedExitStatus=='==0' else ('%s returned exit code %d (expected %s)'%(process, process.exitStatus, expectedExitStatus)) )+handleErrorAndGetOutcomeSuffix(process), abortOnError=abortOnError) elif state == BACKGROUND: (log.info if not quiet else log.debug)("Started %s with process id %d", displayName, process.pid) hasFailed = False except ProcessError as e: if not ignoreExitStatus: self.addOutcome(BLOCKED, 'Could not start %s process: %s'%(displayName, e), abortOnError=abortOnError) else: # this wouldn't happen during a polling-until-success use case so is always worth logging even in quiet mode log.info("%s", sys.exc_info()[1], exc_info=0) except ProcessTimeout: (log.warning if not quiet else log.debug)("Process %r timed out after %d seconds, stopping process", process, timeout, extra=BaseLogFormatter.tag(LOG_TIMEOUTS)) process.stop() self.addOutcome(TIMEDOUT, '%s timed out after %d seconds%s'%(process, timeout, handleErrorAndGetOutcomeSuffix(process)), printReason=False, abortOnError=abortOnError) except BaseException: # if we don't do this then we can't cleanup foreground processes interrupted by serious failures like KeyboardInterrupt with self.lock: self.processList.append(process) raise else: with self.lock: self.processList.append(process) if displayName in self.processCount: self.processCount[displayName] = self.processCount[displayName] + 1 else: self.processCount[displayName] = 1 return process
[docs] def getDefaultEnvirons(self, command=None, **kwargs): """ Create a new dictionary of environment variables, suitable for passing to L{startProcess()}, with a minimal clean set of environment variables for this platform, unaffected (as much as possible) by the environment that the tests are being run under. This environment contains a minimal PATH/LD_LIBRARY_PATH but does not attempt to replicate the full set of default environment variables on each OS, and in particular it does not include any that identify the current username or home area. Additional environment variables can be added as needed with L{createEnvirons} overrides. If you don't care about minimizing the risk of your local environment affecting the test processes you start, just use C{environs=os.environ} to allow child processes to inherit the entire parent environment. The L{createEnvirons()} and L{startProcess()} methods use this as the basis for creating a new set of default environment variables. If needed this method can be overridden in subclasses to add common environment variables for every process invoked by startProcess, for example to enable options such as code coverage for Java/Python/etc. This is also a good place to customize behaviour for different operating systems. Some features of this method can be configured by setting project properties: - ``defaultEnvirons.ENV_KEY``: if any properties with this prefix are defined, an environment variable with the ENV_KEY is set by this method (unless the property value is empty). For example, to set a default JVM heap size for all processes with the ``JAVA_TOOL_OPTIONS`` environment variable you could set ``defaultEnvirons.JAVA_TOOL_OPTIONS = -Xmx512M``. - ``defaultEnvironsDefaultLang``: if set to a value such as ``en_US.UTF-8`` the specified value is set for the LANG= variable on Unix; otherwise, the LANG variable is not set (which might result in use of the legacy POSIX/C encoding). - ``defaultEnvironsTempDir``: if set the expression will be passed to Python ``eval()`` and used to set the OS-specific temp directory environment variables. A typical value is `self.output`. - ``defaultEnvironsLegacyMode``: set to true to enable compatibility mode which keeps the behaviour the same as PySys v1.1, 1.2 and 1.3, namely using a completely empty default environment on Unix, and a copy of the entire parent environment on Windows. This is not recommended unless you have a lot of legacy tests that cannot easily be changed to only set minimal required environment variables using `createEnvirons()`. :param command: If known, the full path of the executable for which a default environment is being created (when called from `startProcess` this is always set). This allows default environment variables to be customized for different process types e.g. Java, Python, etc. When using ``command=sys.executable`` to launch another copy of the current Python executable, extra items from this process's path environment variables are added to the returned dictionary so that it can start correctly. On Unix-based systems this includes copying all of the load library path environment variable from the parent process. :param kwargs: Overrides of this method should pass any additional kwargs down to the super implementation, to allow for future extensions. :return: A new dictionary containing the environment variables. """ assert not kwargs, 'Unknown keyword arguments: %s'%kwargs.keys() # this feature is a workaround to maintain compatibility for a bug in PySys v1.1-1.3 # (see https://github.com/pysys-test/pysys-test/issues/9 for details) if getattr(self.project, 'defaultEnvironsLegacyMode','').lower()=='true': if IS_WINDOWS: return dict(os.environ) else: return {} e = {} for k, v in self.project.properties.items(): if k.startswith('defaultEnvirons.') and v: e[k[k.find('.')+1:]] = v # allows setting TEMP to output dir to avoid contamination/filling up of system location; set to blank to do nothing if self.project.getProperty('defaultEnvironsTempDir',''): tempDir = pysys.utils.safeeval.safeEval(self.project.defaultEnvironsTempDir, extraNamespace={'self':self}) self.mkdir(tempDir) if IS_WINDOWS: # pragma: no cover e['TEMP'] = e['TMP'] = os.path.normpath(tempDir) else: e['TMPDIR'] = os.path.normpath(tempDir) inherited = [] # env vars where it is safe and useful to inherit parent values # avoid anything user-specific or that might cause tests to store data # outside the test outpuot directory if IS_WINDOWS: # for windows there are lots; as a matter of policy we set this to a small # minimal set used by a lot of programs. Keeping up with every single env # var Microsoft sets in every Windows OS release would be too painful, # and better to make users explicitly opt-in to the env vars they want inherited.extend(['ComSpec', 'OS', 'PATHEXT', 'SystemRoot', 'SystemDrive', 'windir', 'NUMBER_OF_PROCESSORS', 'PROCESSOR_ARCHITECTURE', 'COMMONPROGRAMFILES', 'COMMONPROGRAMFILES(X86)', 'PROGRAMFILES', 'PROGRAMFILES(X86)', 'SYSTEM', 'SYSTEM32']) for k in inherited: if k in os.environ: e[k] = os.environ[k] # always set PATH/LD_LIB_PATH to clean values from constants.py # note that if someone is using an OS with different defaults they won't # be able to edit constants.py but will be able to provide a custom # implementation of this method e['PATH'] = PATH if LD_LIBRARY_PATH: e['LD_LIBRARY_PATH'] = LD_LIBRARY_PATH if DYLD_LIBRARY_PATH: e['DYLD_FALLBACK_LIBRARY_PATH'] = DYLD_LIBRARY_PATH if not IS_WINDOWS: if getattr(self.project, 'defaultEnvironsDefaultLang',''): e['LANG'] = self.project.defaultEnvironsDefaultLang if command == sys.executable: # Ensure it's possible to run another instance of this Python, by adding it to the start of the path env vars # (but only if full path to the Python executable exactly matches). # Keep it as clean as possible by not passing sys.path/PYTHONPATH # - but it seems we do need to copy the LD_LIBRARY_PATH from the parent process to ensure the required libraries are present. # Do not set PYTHONHOME here, as doesn't work well in virtualenv, and messes up grandchildren # processes that need a different Python version e['PATH'] = os.path.dirname(sys.executable)+os.pathsep+e['PATH'] if LIBRARY_PATH_ENV_VAR != 'PATH': # if it's an os with something like LD_LIBRARY_PATH # It's a shame it's necessary to copy parent environment, but there's no sane way to unpick which libraries are # actually required on Unix. Make sure we don't set this env var to an empty string just in case that # doesn't anything weird. newlibpath = (os.getenv(LIBRARY_PATH_ENV_VAR,'')+os.pathsep+e.get(LIBRARY_PATH_ENV_VAR,'')).strip(os.pathsep) if newlibpath: e[LIBRARY_PATH_ENV_VAR] = newlibpath self.log.debug('getDefaultEnvirons was called with a command matching this Python executable; adding required path environment variables from parent environment, including %s=%s', LIBRARY_PATH_ENV_VAR, os.getenv(LIBRARY_PATH_ENV_VAR,'')) else: self.log.debug('getDefaultEnvirons was called with a command matching this Python executable; adding required path environment variables from parent environment') return e
[docs] def createEnvirons(self, overrides=None, addToLibPath=[], addToExePath=[], command=None, **kwargs): """ Create a new customized dictionary of environment variables suitable for passing to L{startProcess()}'s ``environs=`` argument. As a starting point, this method uses the value returned by L{getDefaultEnvirons()} for this `command`. See the documentation on that method for more details. If you don't care about minimizing the risk of your local environment affecting the test processes you start, just use C{environs=os.environ} to allow child processes to inherit the entire parent environment instead of using this method. :param overrides: A dictionary of environment variables whose values will be used instead of any existing values. You can use `os.getenv('VARNAME','')` if you need to pass selected variables from the current process as part of the overrides list. If the value is set to None then any variable of this name will be deleted. Use unicode strings if possible (byte strings will be converted depending on the platform). A list of dictionaries can be specified, in which case the latest will override the earlier if there are any conflicts. :param addToLibPath: A path or list of paths to be prepended to the default value for the environment variable used to load libraries (or the value specified in overrides, if any), i.e. ``[DY]LD_LIBRARY_PATH`` on Unix or ``PATH`` on Windows. This is usually more convenient than adding it directly to `overrides`. :param addToExePath: A path or list of paths to be prepended to the default value for the environment variable used to locate executables (or the value specified in overrides, if any), i.e. ``PATH`` on both Unix and Windows. This is usually more convenient than adding it directly to ``overrides``. :param command: If known, the full path of the executable for which a default environment is being created (passed to L{getDefaultEnvirons}). :param kwargs: Overrides of this method should pass any additional kwargs down to the super implementation, to allow for future extensions. :return: A new dictionary containing the environment variables. """ assert not kwargs, 'Unknown keyword arguments: %s'%kwargs.keys() e = self.getDefaultEnvirons(command=command) if overrides: if not isinstance(overrides, list): overrides = [overrides] for d in overrides: if d: for k in d: if k.upper() in ['PATH']: k = k.upper() # normalize common ones to avoid chance of duplicates if d[k] is None: e.pop(k, None) # remove else: e[k] = d[k] def preparepath(path): if isstring(path): if os.pathsep not in path: path = os.path.normpath(path) else: path = os.pathsep.join([os.path.normpath(p) for p in path if p]) return path if addToLibPath: e[LIBRARY_PATH_ENV_VAR] = preparepath(addToLibPath)+os.pathsep+e[LIBRARY_PATH_ENV_VAR] if addToExePath: e['PATH'] = preparepath(addToExePath)+os.pathsep+e['PATH'] return e
[docs] def stopProcess(self, process, abortOnError=None): """Stops the specified process, if it is currently running. Does nothing if the process is not running. This is equivalent to calling `pysys.process.Process.stop()`, except it also logs an info message when the process is stopped. :param process: The process handle returned from the L{startProcess} method :param abortOnError: If True abort the test on any error outcome (defaults to the defaultAbortOnError project setting), if False a failure to stop the process will just be logged as a warning. """ if abortOnError == None: abortOnError = self.defaultAbortOnError if process.running(): try: process.stop() log.info("Stopped process %r", process) except ProcessError as e: if not abortOnError: log.warning("Ignoring failure to stop process %r due to: %s", process, e) else: self.abort(BLOCKED, 'Unable to stop process %r'%(process), self.__callRecord())
[docs] def signalProcess(self, process, signal, abortOnError=None): """Send a signal to a running process. This method uses the `pysys.process.Process.signal` method to send a signal to a running process. Should the request to send the signal to the running process fail, a C{BLOCKED} outcome will be added to the outcome list. :param process: The process handle returned from the L{startProcess} method :param signal: The integer value of the signal to send :param abortOnError: If True aborts the test with an exception on any error, if False just log it as a warning. (defaults to the defaultAbortOnError project setting) """ if abortOnError == None: abortOnError = self.defaultAbortOnError if process.running(): try: process.signal(signal) log.info("Sent %d signal to process %r", signal, process) except ProcessError as e: if not abortOnError: log.warning("Ignoring failure to signal process %r due to: %s", process, e) else: self.abort(BLOCKED, 'Unable to signal process %r'%(process), self.__callRecord())
[docs] def waitProcess(self, process, timeout=TIMEOUTS['WaitForProcess'], abortOnError=None, checkExitStatus=False): """Wait for a background process to terminate. For a convenient way to wait for all remaining background processes to complete, see `waitForBackgroundProcesses`. Timeouts will result in an exception and TIMEDOUT outcome unless the project property ``defaultAbortOnError==False`` is set. :param pysys.process.Process process: The process handle returned from the L{startProcess} method :param int timeout: The timeout value in seconds to wait before returning, for example ``timeout=TIMEOUTS['WaitForProcess']``. :param bool abortOnError: If True aborts the test with an exception on any error, if False just log it as a warning. (defaults to the defaultAbortOnError project setting) :param bool checkExitStatus: By default this method does not check the exit status, but set this argument to True to check that the exit status matches the ``expectedExitStatus`` specified in the call to `startProcess` (typically ``==0``). Note that this argument is not affected by the ``defaultIgnoreExitStatus`` project property. The last few lines of the stderr (or stdout) will be logged if the exit status is wrong. :return: The process's ``exitStatus``. This will be None if the process timed out and abortOnError is disabled. """ if abortOnError == None: abortOnError = self.defaultAbortOnError try: t = time.monotonic() process.wait(timeout) # this will log if it takes more than a few seconds if (time.monotonic()-t > 10) or process.exitStatus != 0: log.info("Process %s terminated after %d secs with exit status %d", process, time.monotonic()-t, process.exitStatus) except ProcessTimeout: if not abortOnError: log.warning("Ignoring timeout waiting for process %r after %d secs (as abortOnError=False)", process, time.monotonic() - t, extra=BaseLogFormatter.tag(LOG_TIMEOUTS)) else: self.abort(TIMEDOUT, 'Timed out waiting for process %s after %d secs'%(process, timeout), self.__callRecord()) else: if checkExitStatus: if not pysys.utils.safeeval.safeEval('%d %s'%(process.exitStatus, process.expectedExitStatus), extraNamespace={'self':self}): self.logFileContents(process.stderr, tail=True) or self.logFileContents(process.stdout, tail=True) self.addOutcome(BLOCKED, ('%s returned exit status %d (expected %s)'%(process, process.exitStatus, process.expectedExitStatus)), abortOnError=abortOnError) return process.exitStatus
[docs] def pollWait(self, secs): """ Sleeps the current thread for the specified number of seconds, between polling/checking for some condition to be met. Unlike `pysys.basetest.BaseTest.wait` (which should be used for larger waits inside tests), pollWait does not log anything. Use this method instead of ``time.sleep`` as it provides PySys the chance to abort test execution early when requested, for example as a result of a keyboard interrupt or signal. :param float secs: The time to sleep for, typically a few hundred milliseconds. Do not use this method for really long waits. Cannot be negative. """ # This implementation is designed to be fast for the common case as it's executed frequently on multiple threads if secs > 2: # special (and rare) case: break longer sleeps into smaller chunks in case case someone polls for a long time MAX_SLEEP = 2 self.log.debug('pollWait %s secs', secs) while secs > MAX_SLEEP: if self.isRunnerAborting is True and self.isCleanupInProgress is False: raise KeyboardInterrupt() time.sleep(MAX_SLEEP) secs -= MAX_SLEEP time.sleep(secs) # Perform an early abort if we're terminating, but not once we enter cleanup code for each test since that # may need to execute processes if self.isRunnerAborting is True and self.isCleanupInProgress is False: raise KeyboardInterrupt()
[docs] def waitForBackgroundProcesses(self, includes=[], excludes=[], timeout=TIMEOUTS['WaitForProcess'], abortOnError=None, checkExitStatus=True): """Wait for any running background processes to terminate, then check that all background processes completed with the expected exit status. This can be useful for speeding up a test that needs to run many processes, by executing all its subprocesses in parallel in the background (and then waiting for completion) rather than one-by-one in the foreground. Timeouts will result in a TIMEDOUT outcome and an exception unless the project property ``defaultAbortOnError==False`` is set. :param list[pysys.process.Process] includes: A list of processes to wait for, each returned by `startProcess()`. If none are specified, this method will wait for (and check the exit status of) all background processes. :param list[pysys.process.Process] excludes: A list of processes which are not expected to have terminated yet (this is only useful when not setting includes=[]). :param int timeout: The total time in seconds to wait for all processes to have completed, for example ``timeout=TIMEOUTS['WaitForProcess']``. :param bool abortOnError: If True aborts the test with an exception on any error, if False appends an outcome but does not raise an exception. :param bool checkExitStatus: By default this method not only waits for completion but also checks the exit status of all (included) background processes (regardless of when they completed), but set this argument to False to disable checking that the exit status of the processes matches the ``expectedExitStatus`` specified in the call to `startProcess` (typically ``==0``). The last few lines of the stderr (or stdout) will be logged if the exit status is wrong. """ assert timeout>0, 'timeout must be specified' starttime = time.monotonic() includes = includes or self.processList running = [p for p in includes if p.state==BACKGROUND and p.running() and p not in excludes] self.log.info('Waiting up to %d secs for %d background process(es) to complete', timeout, len(running)) for p in running: try: thistimeout = starttime+timeout-time.monotonic() if thistimeout <= 0: # we've run out of time raise ProcessTimeout('waitForBackgroundProcesses timed out') if p.running(): p.wait(timeout=thistimeout) # may raise ProcessTimeout except ProcessTimeout: stillrunning = [p for p in running if p.state==BACKGROUND and p.running()] if stillrunning: self.addOutcome(TIMEDOUT, 'Timed out waiting for %d processes after %d secs: %s'%(len(stillrunning), timeout, ', '.join(map(str, stillrunning))), abortOnError=abortOnError) return False # now check exit statuses - for all background processes, not only those that happen to have completed recently; # but only after we've confirmed (above) there were no timeouts if checkExitStatus: failures = [] for process in includes: if process.state!=BACKGROUND or process in excludes: continue if not pysys.utils.safeeval.safeEval('%s %s'%(process.exitStatus, process.expectedExitStatus), extraNamespace={'self':self}): self.logFileContents(process.stderr, tail=True) or self.logFileContents(process.stdout, tail=True) failures.append('%s returned exit status %s (expected %s)'%(process, process.exitStatus, process.expectedExitStatus)) if failures: self.addOutcome(BLOCKED, ('%d processes failed: '%len(failures) if len(failures)>1 else 'Process ')+'; '.join(failures), abortOnError=abortOnError) (self.log.info if (time.monotonic()-starttime>10) else self.log.debug)('All processes completed, after waiting %d secs'%(time.monotonic()-starttime))
[docs] def writeProcess(self, process, data, addNewLine=True): """Write binary data to the stdin of a process. This method uses `pysys.process.Process.write` to write binary data to the stdin of a process. This wrapper around the write method of the process helper only adds checking of the process running status prior to the write being performed, and logging to the testcase run log to detail the write. :param pysys.process.Process process: The process handle returned from the L{startProcess()} method :param bytes data: The data to write to the process stdin. As only binary data can be written to a process stdin, if a character string rather than a byte object is passed as the data, it will be automatically converted to a bytes object using the encoding given by PREFERRED_ENCODING. :param bool addNewLine: True if a new line character is to be added to the end of the data string """ if process.running(): process.write(data, addNewLine) log.info("Written to stdin of process %r", process) log.debug(" %s" % data) else: raise Exception("Write to process %r stdin not performed as process is not running", process)
[docs] def waitForSocket(self, port, host='localhost', timeout=TIMEOUTS['WaitForSocket'], abortOnError=None, process=None, socketAddressFamily=socket.AF_INET): """Wait until it is possible to establish a socket connection to a server running on the specified local or remote port. This method blocks until connection to a particular host:port pair can be established. This is useful for test timing where a component under test creates a socket for client server interaction - calling of this method ensures that on return of the method call the server process is running and a client is able to create connections to it. If a connection cannot be made within the specified timeout interval, the method returns to the caller, or aborts the test if abortOnError=True. .. versionchanged:: 1.5.1 Added host and socketAddressFamily parameters. :param port: The port value in the socket host:port pair :param host: The host value in the socket host:port pair :param timeout: The timeout in seconds to wait for connection to the socket :param abortOnError: If true abort the test on any failure (defaults to the defaultAbortOnError project setting) :param process: If a handle to a process is specified, the wait will abort if the process dies before the socket becomes available. It is recommended to set this wherever possible. :param socketAddressFamily: The socket address family e.g. IPv4 vs IPv6. See Python's ``socket`` module for details. """ if abortOnError == None: abortOnError = self.defaultAbortOnError log.debug("Performing wait for socket creation %s:%s", host, port) s = None try: startTime = time.monotonic() while True: if s is None: with process_lock: s = socket.socket(socketAddressFamily, socket.SOCK_STREAM) # the following lines are to prevent handles being inherited by # other processes started while this test is runing if IS_WINDOWS: s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 0) win32api.SetHandleInformation(s.fileno(), win32con.HANDLE_FLAG_INHERIT, 0) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) else: fcntl.fcntl(s.fileno(), fcntl.F_SETFD, 1) try: s.connect((host, port)) s.shutdown(socket.SHUT_RDWR) log.debug("Wait for socket creation completed successfully") if time.monotonic()-startTime>10: log.info("Wait for socket creation completed after %d secs", time.monotonic()-startTime) return True except socket.error as ex: if process and not process.running(): msg = "Waiting for socket connection aborted due to unexpected process %s termination"%(process) if abortOnError: self.abort(BLOCKED, msg, self.__callRecord()) else: log.warning('%s', msg) return False if timeout: currentTime = time.monotonic() if currentTime > startTime + timeout: msg = "Timed out waiting for creation of socket after %d secs: %s"%(time.monotonic()-startTime, ex) if abortOnError: self.abort(TIMEDOUT, msg, self.__callRecord()) else: log.warning('%s', msg) return False if PLATFORM == 'darwin': # MacOS gives an error if we try to connect to the same socket again after connection refused s.close() s = None self.pollWait(0.15) finally: if s is not None: s.close()
[docs] def waitForFile(self, file, filedir=None, timeout=TIMEOUTS['WaitForFile'], abortOnError=None): """Wait for a file to exist on disk. This method blocks until a file is created on disk. This is useful for test timing where a component under test creates a file (e.g. for logging) indicating it has performed all initialisation actions and is ready for the test execution steps. If a file is not created on disk within the specified timeout interval, the method returns to the caller. :param file: The basename of the file used to wait to be created :param filedir: The dirname of the file (defaults to the testcase output subdirectory) :param timeout: The timeout in seconds to wait for the file to be created :param abortOnError: If true abort the test on any failure (defaults to the defaultAbortOnError project setting) """ if abortOnError == None: abortOnError = self.defaultAbortOnError if filedir is None: filedir = self.output f = os.path.join(filedir, file) log.debug("Performing wait for file creation: %s", f) startTime = time.monotonic() while True: if timeout: currentTime = time.monotonic() if currentTime > startTime + timeout: msg = "Timed out waiting for creation of file %s after %d secs" % (file, time.monotonic()-startTime) if abortOnError: self.abort(TIMEDOUT, msg, self.__callRecord()) else: log.warning(msg) break self.pollWait(0.01) if pathexists(f): log.debug("Wait for '%s' file creation completed successfully", file) return
[docs] def waitForSignal(self, file, filedir=None, expr="", **waitForGrepArgs): """Old alias for `waitForGrep`; please use `waitForGrep` in new tests. All parameters are the same, except that in waitForSignal the (rarely used) ``filedir`` argument can be specified as the 2nd positional argument (after ``file`` and before ``expr``) whereas in waitForGrep it can only be specified as a ``filedir=`` keyword argument. """ return self.waitForGrep(file, expr=expr, filedir=filedir, **waitForGrepArgs)
[docs] def waitForGrep(self, file, expr="", condition=">=1", timeout=TIMEOUTS['WaitForSignal'], poll=0.25, ignores=[], process=None, errorExpr=[], errorIf=None, abortOnError=None, encoding=None, encodingReplaceOnError=False, detailMessage='', filedir=None, reFlags=0, mappers=[], quiet=False): """Wait for a regular expression line to be seen (one or more times) in a text file in the output directory (waitForGrep was formerly known as `waitForSignal`). This method provides some parameters that give helpful fail-fast behaviour with a descriptive outcome reason; use these whenever possible: - ``process=`` to abort if success becomes impossible due to premature termination of the process that's generating the output - ``errorExpr=`` to abort if an error message/expression is written to the file being grepped - ``errorIf=`` to abort if the specified lambda function returns an error string (which can be used if the error messages go to a different file than that being grepped This will generate much clearer outcome reasons, which makes test failures easy to triage, and also avoids wasting time waiting for something that will never happen. Example:: self.waitForGrep('myprocess.log', 'INFO .*Started successfully', encoding='utf-8', process=myprocess, errorExpr=[' (ERROR|FATAL) ', 'Failed to start']) self.waitForGrep('myoutput.log', 'My message', encoding='utf-8', process=myprocess, errorIf=lambda: self.grepOrNone('myprocess.err', ' ERROR .*')) Note that waitForGrep fails the test if the expression is not found (unless abortOnError was set to False, which isn't recommended), so there is no need to add duplication with an `assertGrep <pysys.basetest.BaseTest.assertGrep>` to check for the same expression in your validation logic. You can extract information from the matched expression, optionally perform assertions on it, by using one or more ``(?P<groupName>...)`` named groups in the expression. A common pattern is to unpack the resulting dict using ``**kwargs`` syntax and pass to `BaseTest.assertThat`. For example:: self.assertThat('username == expected', expected='myuser', **self.waitForGrep('myserver.log', r'Successfully authenticated user "(?P<username>[^"]*)"')) If the file is large or contains long lines, it can take a long time for the regular expressions to be evaluated over each line. It is important to avoid the possibility that data is written to the file faster than Python can read it, which would lead to the PySys process running very slowly and likely a timeout. To help detect this situation, this method will log warnings if dangerously long lines are detected. If needed the threshold for these can be configured by setting the ``self.grepWarnIfLineLongerThan`` field. If reading from a file that has long lines, consider adding `pysys.mappers.TruncateLongLines` to the mappers list. .. versionadded:: 1.5.1 :param str file: The path of the file to be searched. Usually this is a name/path relative to the ``self.output`` directory, but alternatively an absolute path can be specified. :param str expr: The regular expression to search for in the text file. :param str condition: The condition to be met for the number of lines matching the regular expression; by default we wait until there is at least one occurrence. :param int timeout: The number of seconds to wait for the regular expression before giving up and aborting the test with `pysys.constants.TIMEDOUT` (unless abortOnError=False in which case execution will continue). :param pysys.process.Process process: The process that is generating the specified file, to allow the wait to fail fast (instead of timing out) if the process dies before the expected signal appears. Can be None if the process is not known or is expected to terminate itself during this period. :param list[str] errorExpr: Optional list of regular expressions, which if found in the file will cause waiting for the main expression to be aborted with a `pysys.constants.BLOCKED` outcome. This is useful to avoid waiting a long time for the expected expression when an ERROR is logged that means it will never happen, and also provides much clearer test failure messages in this case. :param callable->str errorIf: A zero-arg function that returns False/None when there is no error, or a non-empty string if an error is detected which should cause us to abort looking for the grep expression. This function will be executed frequently (every ``poll`` seconds) so avoid doing anything time-consuming here unless you set a large polling interval. See above for an example. Added in PySys 1.6.0. :param list[str] ignores: A list of regular expressions used to identify lines in the files which should be ignored when matching both `expr` and `errorExpr`. :param List[callable[str]->str] mappers: A list of filter functions that will be used to pre-process each line from the file (returning None if the line is to be filtered out). This provides a very powerful capability for filtering the file, for example `pysys.mappers.IncludeLinesBetween` provides the ability to filter in/out sections of a file, and `pysys.mappers.JoinLines` can be used to put exception stack traces onto the same line as the error message. Do not share mapper instances across multiple tests or threads as this can cause race conditions. Added in PySys 1.6.0. :param float poll: The time in seconds between to poll the file looking for the regular expression and to check against the condition :param bool abortOnError: If True abort the test on any error outcome (defaults to the defaultAbortOnError project setting, which for a modern project will be True). :param str encoding: The encoding to use to open the file and convert from bytes to characters. The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. :param bool encodingReplaceOnError: Set to True to replace erroneous characters that are invalid in the expected encoding (with a backslash escape) rather than throwing an exception. Added in PySys 2.2. :param str detailMessage: An extra string to add to the start of the message logged when waiting to provide extra information about the wait condition. e.g. ``detailMessage='Wait for server startup: '``. Added in v1.5.1. From v2.1+ the detail string is added at the beginning not the end. :param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching, combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``. For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot be used because expressions are matched against one line at a time. Added in PySys 1.5.1. :param str filedir: Can be used to provide a directory name to add to the beginning of the ``file`` parameter; however usually it is clearer just to specify that directory in the ``file``. :param bool quiet: Set this to true to suppress INFO-level logging, which can be useful when you wish to print your own progress message in a more high-level fashion. :return list[re.Match]: Usually this returns a list of ``re.Match`` objects found for the ``expr``, or an empty list if there was no match. If the expr contains any ``(?P<groupName>...)`` named groups, and assuming the condition is still the default of ">=1" (i.e. not trying to find multiple matches), then instead of a list, a dict is returned containing ``dict(groupName: str, matchValue: str or None)`` (or an empty ``{}`` dict if there is no match) which allows the result to be passed to `assertThat` for further checking of the matched groups (typically unpacked using the ``**`` operator; see example above). """ assert expr, 'expr= argument must be specified' assert '\n' not in expr, 'expr= cannot contain multiple lines' if abortOnError == None: abortOnError = self.defaultAbortOnError if filedir is None: filedir = self.output f = os.path.join(filedir, file) if errorExpr: assert not isstring(errorExpr), 'errorExpr must be a list of strings not a string' matches = [] msg = "Waiting for {expr} {condition}in {file}".format( expr=quotestring(expr), # performs escaping of embedded quotes, newlines, etc condition=condition.strip()+' ' if condition!='>=1' else '', # only include if non-default file=os.path.basename(file), ) if detailMessage: # prior to v2.1 this was appended at the end but actually it's more useful at the beginning msg = detailMessage.strip(': ')+': '+msg log.debug("Performing wait for grep signal %s %s in file %s with ignores %s", expr, condition, f, ignores) loginfo = (log.debug if quiet else log.info) verboseWaitForSignal = self.getBoolProperty('verboseWaitForSignal', True) and self.getBoolProperty('verboseWaitForGrep', True) if verboseWaitForSignal: # if verbose, log when starting (which is very helpful for debugging hangs); non-verbose users get the message only when it's done loginfo('%s%s', msg, '; timeout=%ss'%timeout if timeout!=TIMEOUTS['WaitForSignal'] else '') encoding = encoding or self.getDefaultFileEncoding(f) # If condition was customized (typically to be more than 1) named groups mode isn't so useful since there would # be multiple matches, so restrict it to 1 which is the common case anyway. compiled = re.compile(expr, flags=reFlags) namedGroupsMode = compiled.groupindex and condition.replace(' ','')=='>=1' starttime = time.monotonic() lineno = [0] # use an array to hold the line counter so we can update this value inside the function def watchdogMapper(line): linelen=len(line)-1# minus one for the (likely) newline, to make the numbers round if linelen > 5000: if self.grepTruncateIfLineLongerThan > 0 and linelen > self.grepTruncateIfLineLongerThan: line = line[:self.grepTruncateIfLineLongerThan]+'\n' linelen = len(line)-1 if linelen > self.grepWarnIfLineLongerThan: self.log.warning(' very long line of %s characters detected in %s during waitForGrep; be careful as some regular expressions take a very long time to run on long input strings: %s ...', linelen, file, line[:1000]) self.grepWarnIfLineLongerThan *= 5 # increase exponentially (for this test) lineno[0] += 1 if lineno[0] % 10000 == 0: # periodically check for interruption or timeout; not too often or we might slow down the normal case if self.isRunnerAborting is True and self.isCleanupInProgress is False: raise KeyboardInterrupt() if time.monotonic()-starttime > timeout: self.log.debug('waitForGrep watchdog signalled timeout after handling %s lines', lineno[0]) raise TimeoutError("Timed out during waitForGrep watchdog") return line mappers = mappers+[watchdogMapper] # putting the watchdog later allows custom mappers that remove long lines if desired while 1: try: if pathexists(f): matches = getmatches(f, expr, encoding=encoding, ignores=ignores, flags=reFlags, mappers=mappers, encodingReplaceOnError=encodingReplaceOnError) if pysys.utils.safeeval.safeEval("%d %s" % (len(matches), condition), extraNamespace={'self':self}): timetaken = time.monotonic()-starttime # Old-style/non-verbose behaviour is to log only after complete, # new/verbose style does the main logging at INFO when starting, and only logs on completion if it took a long time # (this helps people debug tests that sometimes timeout and sometimes "nearly" timeout) if verboseWaitForSignal: (loginfo if timetaken > 30 else log.debug)(" ... found %d matches in %ss", len(matches), int(timetaken)) else: # We use the phrase "grep signal" to avoid misleading anyone, whether people used waitForGrep or the older waitForSignal loginfo("Wait for grep signal in %s completed successfully", file) break if errorExpr: for err in errorExpr: errmatches = getmatches(f, err+'.*', encoding=encoding, ignores=ignores, flags=reFlags, mappers=mappers, encodingReplaceOnError=encodingReplaceOnError) # add .* to capture entire err msg for a better outcome reason if errmatches: err = errmatches[0].group(0).strip() msg = '%s found while %s'%(quotestring(err), msg[0].lower()+msg[1:]) # always report outcome for this case; additionally abort if requested to self.addOutcome(BLOCKED, outcomeReason=msg, abortOnError=abortOnError, callRecord=self.__callRecord()) return {} if namedGroupsMode else matches # end of if exists if time.monotonic() > starttime + timeout: raise TimeoutError() except TimeoutError: # may come from the above check outside the loop, or from the check every 10k lines within the watchdog msg = "%s timed out after %d secs, %s"%(msg, timeout, ("with %d matches"%len(matches)) if pathexists(f) else 'file does not exist') if abortOnError: self.abort(TIMEDOUT, msg, self.__callRecord()) else: log.warning(msg, extra=BaseLogFormatter.tag(LOG_TIMEOUTS)) break if errorIf is not None: errmsg = errorIf() if errmsg: msg = "%s aborted due to errorIf returning %s"%(msg, errmsg) if abortOnError: self.abort(BLOCKED, msg, self.__callRecord()) else: log.warning(msg) break if process and not process.running(): msg = "%s aborted due to process %s termination"%(msg, process) if abortOnError: self.abort(BLOCKED, msg, self.__callRecord()) else: log.warning(msg) break self.pollWait(poll) if namedGroupsMode: return {} if not matches else matches[0].groupdict() return matches
[docs] def addCleanupFunction(self, fn, ignoreErrors=False): """ Registers a function that will be called as part of the `cleanup` of this object. Cleanup functions should have no arguments, and are invoked in reverse order with the most recently added first (LIFO), and before the automatic termination of any remaining processes associated with this object. Typical cleanup tasks are to cleanly shutdown processes (which is sometimes necessary to obtain code coverage information), and to (attempt to) delete large files/directories created by the test:: self.addCleanupFunction(lambda: self.cleanlyShutdownMyProcess(params)) self.addCleanupFunction(lambda: self.deleteDir('my-large-dir'), ignoreErrors=True) self.addCleanupFunction(lambda: self.deleteFile('my-large-file.log'), ignoreErrors=True) :param Callable[] fn: The cleanup function. :param bool ignoreErrors: By default, errors from cleanup functions will result in a test failure; set this to True to log them but not produce a test failure. This parameter was added in 1.6.0. """ with self.lock: if fn and (fn,ignoreErrors) not in self.__cleanupFunctions: self.__cleanupFunctions.append( (fn, ignoreErrors) )
[docs] def cleanup(self): """ Tear down function that frees resources managed by this object, for example terminating processes it has started. Should be called exactly once by the owner of this object when is no longer needed. Do not override this method, instead use `addCleanupFunction`. """ exceptions = [] try: # although we don't yet state this method is thread-safe, make it # as thread-safe as possible by using swap operations with self.lock: self.isCleanupInProgress = True # lock probably not required for this assignment but might as well cleanupfunctions, self.__cleanupFunctions = self.__cleanupFunctions, [] if cleanupfunctions: log.info('') log.info('cleanup:') for fn, ignoreErrors in reversed(cleanupfunctions): try: log.debug('Running registered cleanup function: %r'%fn) fn() except Exception as e: (log.warning if ignoreErrors else log.error)('Error while running cleanup function%s: ', ' (ignoreErrors=True)' if ignoreErrors else '', exc_info=True) if not ignoreErrors: exceptions.append('Cleanup function failed: %s (%s)'%(e, type(e).__name__)) finally: with self.lock: processes, self.processList = self.processList, [] for process in processes: try: if process.running(): log.debug("Stopping process during cleanup: %r", process) process.stop() except Exception as e: # this is pretty unlikely to fail, but we'd like to know if it does log.warning("Caught %s: %s", sys.exc_info()[0].__name__, sys.exc_info()[1], exc_info=1) exceptions.append('Failed to stop process %s: %s'%(process, e)) self.processCount = {} log.debug('ProcessUser cleanup function done.') if exceptions: raise UserError('Cleanup failed%s: %s'%(' with %d errors'%len(exceptions), '; '.join(exceptions)))
[docs] def handleRunnerAbort(self, **kwargs): """ Called from a background thread when the entire test run is aborting, to perform quick operations to help this test to terminate as quickly as possible. The default implementation attempts to immediately stop any currently running processes (in case they are holding open resources such as server sockets that the test may be blocking on). Subclasses may override this method, either to perform additional steps (such as closing server sockets that clients may be blocking on) or to avoid stopping processes that need to be terminated in a more orderly way. Unlike the `cleanup` method, this will be called from a background thread so avoid using methods that are not thread-safe. NB: Logging performed during this method will NOT be included in the test's ``run.log`` output. The test's `cleanup` method will usually be called to perform a fuller cleanup (later, or concurrently). .. versionadded:: 2.2 """ with self.lock: processes = list(self.processList) # we leave the final checking to cleanup(), so do NOT remove these from process list if self.isCleanupInProgress: # pragma: no cover # This should prevent us trying to stop processes started during cleanup() which may be important # for performing orderly cleanup self.log.debug('handleRunnerAbort is skipping %s because cleanup is already in progress', self) return for process in processes: try: if process.running(): log.info("Stopping %s process during runner abort: %r", self, process) process.stop(hard=True) except Exception as e: # pragma: no cover - this is pretty unlikely to fail, but we'd like to know if it does log.info("Failed to stop %s process %r during runner abort %s: %s", self, process, e)
[docs] def addOutcome(self, outcome, outcomeReason='', printReason=True, abortOnError=False, callRecord=None, override=False): """Add a validation outcome (and optionally a reason string) to the validation list. The method provides the ability to add a validation outcome to the internal data structure storing the list of validation outcomes. Multiple validations may be performed, the current supported validation outcomes of which are described in :ref:`assertions-and-outcomes`. The outcomes are considered to have a precedence order, as defined by the order of the outcomes listed above. Thus a C{pysys.constants.BLOCKED} outcome has a higher precedence than a C{pysys.constants.PASSED} outcome. The outcomes are defined in L{pysys.constants}. This method is thread-safe. Although this method exists on all subclasses of `pysys.process.user.ProcessUser`, in practice only `pysys.basetest.BaseTest` subclasses actually do anything with the resulting outcome. :param pysys.constants.Outcome outcome: The outcome to add, e.g. `pysys.constants.FAILED`. :param str outcomeReason: A string summarizing the reason for the outcome, for example "Grep on x.log contains 'ERROR: server failed'". :param bool printReason: If True the specified outcomeReason will be printed :param bool abortOnError: If true abort the test on any error outcome. This should usually be set to False for assertions, or the configured `self.defaultAbortOnError` setting (typically True) for operations that involve waiting. :param list[str] callRecord: An array of strings of the form absolutepath:lineno indicating the call stack that lead to this outcome. This will be appended to the log output for better test triage. :param bool override: Remove any existing test outcomes when adding this one, ensuring that this outcome is the one and only one reported even if an existing outcome has higher precedence. """ assert outcome in OUTCOMES, outcome # ensure outcome type is known, and that numeric not string constant was specified! with self.lock: if abortOnError == None: abortOnError = self.defaultAbortOnError if outcomeReason is None: outcomeReason = '' else: outcomeReason = stripANSIEscapeCodes(outcomeReason).strip().replace(u'\t', u' ').replace('\r','').replace('\n', ' ; ') if override: log.debug('addOutcome is removing existing outcome(s): %s with reason "%s"', self.outcome, self.__outcomeReason) del self.outcome[:] self.__outcomeReason = None old = self.getOutcome() if (old == NOTVERIFIED and not self.__outcomeReason): old = None self.outcome.append(outcome) # only bother populating the callrecord (which is a bit costly) when there's a failure if outcome.isFailure() and callRecord is None: callRecord = self.__callRecord() def parseLocation(cr): if not cr or ':' not in cr: return None return cr[:cr.rfind(':')], cr[cr.rfind(':')+1:] #store the reason of the highest precedent outcome # although we should print whatever is passed in, store a version with control characters stripped # out so that it's easier to read (e.g. coloring codes from third party tools) if self.getOutcome() != old: self.__outcomeReason = re.sub(u'[\x00-\x08\x0b\x0c\x0e-\x1F]', '', outcomeReason) self.__outcomeLocation = None, None if callRecord: locations = [parseLocation(loc) for loc in callRecord] if len(locations)>1: # if possible, try to prioritize locations under the testDir above shared framework classes locations = [loc for loc in locations if loc[0].lower().startswith(self.descriptor.testDir.lower()) ] if len(locations) == 0: locations = [parseLocation(loc) for loc in callRecord] loc = locations[0] self.__outcomeLocation = (os.path.normpath(os.path.join(self.descriptor.testDir, fromLongPathSafe(loc[0]))), loc[1]) if outcome.isFailure() and abortOnError: if callRecord==None: callRecord = self.__callRecord() self.abort(outcome, outcomeReason, callRecord) if outcomeReason and printReason: if outcome.isFailure(): def maybebasename(p): if self.project.getProperty('pysysLogAbsolutePaths', False): return p return os.path.basename(p) log.warning(u'%s ... %s %s', outcomeReason, str(outcome).lower(), u'[%s]'%', '.join( u'%s:%s'%(maybebasename(loc[0]), loc[1]) for loc in map(parseLocation,callRecord)) if callRecord!=None else u'', extra=BaseLogFormatter.tag(str(outcome).lower(),1)) else: log.info(u'%s ... %s', outcomeReason, str(outcome).lower(), extra=BaseLogFormatter.tag(str(outcome).lower(),1))
[docs] def abort(self, outcome, outcomeReason, callRecord=None): """Raise an AbortExecution exception with the specified outcome and reason, to abort the current test. See also L{skipTest}. :param outcome: The outcome, which will NOT override any existing outcomes previously recorded, unless the abort outcome is a non-failure (such as SKIPPED). If you want overriding behaviour for failure outcomes, instead of this function use:: self.addOutcome(outcome, outcomeReason, abortOnError=True, override=True) :param outcomeReason: A string summarizing the reason for the outcome. .. versionchanged:: 2.2 Previous outcomes are no longer overridden when aborting. """ raise AbortExecution(outcome, outcomeReason, callRecord)
[docs] def skipTest(self, outcomeReason, callRecord=None): """Raise an AbortException that will set the test outcome to SKIPPED and ensure that the rest of the execute() and validate() methods do not execute. This is useful when a test should not be executed in the current mode or platform. :param outcomeReason: A string summarizing the reason the test is being skipped, for example "Feature X is not supported on Windows". """ raise AbortExecution(SKIPPED, outcomeReason, callRecord)
[docs] def getOutcome(self): """Get the final outcome for this test, based on the precedence order defined in `pysys.constants.OUTCOMES`. To find out whether this test has failed:: if self.getOutcome().isFailure(): ... :return pysys.constants.Outcome: The overall outcome. Use ``%s`` or ``str()`` to convert to a display name. """ with self.lock: if len(self.outcome) == 0: return NOTVERIFIED return sorted(self.outcome, key=lambda x: OUTCOMES.index(x))[0]
[docs] def getOutcomeReason(self): """Get the reason string for the current overall outcome (if specified). :return: The overall test outcome reason or '' if not specified :rtype: string """ with self.lock: fails = len([o for o in self.outcome if o.isFailure()]) if self.__outcomeReason and (fails > 1): return u'%s (+%d other failures)'%(self.__outcomeReason, fails-1) return self.__outcomeReason
[docs] def getOutcomeLocation(self): """Get the location in the Python source file where this outcome was added. .. versionadded:: 1.6.0 :return (str,str): The absolute filename, and the line number. Returns (None,None) if not known. """ with self.lock: return self.__outcomeLocation
[docs] def getNextAvailableTCPPort(self, hosts=['', 'localhost'], socketAddressFamily=socket.AF_INET): """Allocate a free TCP port which can be used for starting a server on this machine. The port is taken from the pool of available server (non-ephemeral) ports on this machine, and will not be available for use by any other code in the current PySys process until this object's `cleanup` method is called to return it to the pool of available ports. For advanced options such as port exclusions see `pysys.utils.allocport`. To allocate an IPv4 port for use only on this host:: port = self.getNextAvailableTCPPort(hosts=['localhost']) .. versionchanged:: 1.5.1 Added hosts and socketAddressFamily parameters. :param list(Str) hosts: A list of the host names or IP addresses to check when establishing that a potential allocated port isn't already in use by a process outside the PySys framework. By default we check ``""`` (which corresponds to ``INADDR_ANY`` and depending on the OS means either one or all non-localhost IPv4 addresses) and also ``localhost``. Many machines have multiple network cards each with its own host IP address, and typically you'll only be using one of them in your test, most commonly ``localhost``. If you do know which host/IP you'll actually be using, just specify that directly to save time, and avoid needlessly opening remote ports on hosts you're not using. A list of available host addresses can be found from ``socket.getaddrinfo('', None)``. :param socketAddressFamily: The socket address family e.g. IPv4 vs IPv6. See Python's ``socket`` module for details. """ o = TCPPortOwner() self.addCleanupFunction(lambda: o.cleanup()) return o.port
def __callRecord(self): """Retrieve a call record outside of this module, up to the execute or validate method of the test case. """ stack=[] from pysys.basetest import BaseTest if isinstance(self, BaseTest): testmodule = os.path.splitext(os.path.join(self.descriptor.testDir, self.descriptor.module))[0] if self.descriptor.module != 'PYTHONPATH' else None for record in inspect.stack(): info = inspect.getframeinfo(record[0]) if (self.__skipFrame(info.filename, ProcessUser) ): continue if (self.__skipFrame(info.filename, BaseTest) ): continue stack.append( '%s:%s' % (info.filename.strip(), info.lineno) ) if (testmodule is None or os.path.splitext(info.filename)[0] == testmodule) and (info.function in ['execute', 'validate']): return stack return None def __skipFrame(self, file, clazz): """Private method to check if a file is that for a particular class. :param file: The filepatch to check :param clazz: The class to check against """ return os.path.splitext(file)[0] == os.path.splitext(sys.modules[clazz.__module__].__file__)[0]
[docs] def grep(self, path, expr, encoding=None, reFlags=0, mappers=[], **kwargs): r"""Returns the first occurrence of a regular expression in the specified file, or raises an exception if not found. See also `grepOrNone` and `grepAll` or no-error-on-missing and return-all behaviour. If you want to use a grep to set the outcome of the test, use `pysys.basetest.BaseTest.assertThatGrep` or `pysys.basetest.BaseTest.assertGrep` instead. The documentation for assertGrep also provides some helpful examples of regular expressions that could also be used with this method, and tips for escaping in regular expressions. If you have a complex expression with multiple values to extract, you can use ``(?P<groupName>...)`` named groups in which case a dictionary is returned providing access to the individual elements:: authInfoDict = self.grep('myserver.log', expr=r'Successfully authenticated user "(?P<username>[^"]*)" in (?P<authSecs>[^ ]+) seconds\.')) For extracting a single value you can use an unnamed group using ``(expr)`` syntax, in which case that group is returned:: myKey = self.grep('test.txt', r'myKey="(.*)"') # on a file containing 'myKey="foobar"' would return "foobar" .. versionadded: 2.0 :param str path: file to search (located in the output dir unless an absolute path is specified) :param str expr: the regular expression, optionally containing named groups. Remember to escape regular expression special characters such as ``.``, ``(``, ``[``, ``{`` and ``\`` if you want them to be treated as literal values. If you have a string with regex backslashes, it's best to use a 'raw' Python string so that you don't need to double-escape them, e.g. ``expr=r'function[(]"str", 123[.]4, (\d+), .*[)]'``. :param str encoding: The encoding to use to open the file. The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. :param List[callable[str]->str] mappers: A list of filter functions that will be used to pre-process each line from the file (returning None if the line is to be filtered out). This provides a very powerful capability for filtering the file, for example `pysys.mappers.IncludeLinesBetween` provides the ability to filter in/out sections of a file. Do not share mapper instances across multiple tests or threads as this can cause race conditions. Added in PySys 1.6.0. :param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching, combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``. For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot be used because expressions are matched against one line at a time. Added in PySys 1.5.1. :return: A str containing the matching expression, or if the expr contains any ``(?P<groupName>...)`` named groups a dict[str,str] is returned where the keys are the groupNames. """ return self.getExprFromFile(path=path, expr=expr, returnAll=False, returnNoneIfMissing=False, encoding=encoding, reFlags=reFlags, mappers=mappers, **kwargs)
[docs] def grepOrNone(self, path, expr, encoding=None, reFlags=0, mappers=[], mustExist=True, **kwargs): r"""Returns the first occurrence of a regular expression in the specified file, or None if not found. See also `grep` and `grepAll` for error-on-missing and return-all behaviour. If you want to use a grep to set the outcome of the test, use `pysys.basetest.BaseTest.assertThatGrep` or `pysys.basetest.BaseTest.assertGrep` instead. The documentation for assertGrep also provides some helpful examples of regular expressions that could also be used with this method, and tips for escaping in regular expressions. If you have a complex expression with multiple values to extract, you can use ``(?P<groupName>...)`` named groups in which case a dictionary is returned providing access to the individual elements:: authInfoDict = self.grepOrNone('myserver.log', expr=r'Successfully authenticated user "(?P<username>[^"]*)" in (?P<authSecs>[^ ]+) seconds\.') ) or {'username':'myuser', 'authSecs': '0.0'} For extracting a single value you can use an unnamed group using ``(expr)`` syntax, in which case that group is returned:: myKey = self.grepOrNone('test.txt', r'myKey="(.*)"') or 'mydefault' # on a file containing 'myKey="foobar"' would return "foobar" .. versionadded: 2.0 :param str path: file to search (located in the output dir unless an absolute path is specified) :param str expr: the regular expression, optionally containing named groups. Remember to escape regular expression special characters such as ``.``, ``(``, ``[``, ``{`` and ``\`` if you want them to be treated as literal values. If you have a string with regex backslashes, it's best to use a 'raw' Python string so that you don't need to double-escape them, e.g. ``expr=r'function[(]"str", 123[.]4, (\d+), .*[)]'``. :param str encoding: The encoding to use to open the file. The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. :param List[callable[str]->str] mappers: A list of filter functions that will be used to pre-process each line from the file (returning None if the line is to be filtered out). This provides a very powerful capability for filtering the file, for example `pysys.mappers.IncludeLinesBetween` provides the ability to filter in/out sections of a file. Do not share mapper instances across multiple tests or threads as this can cause race conditions. Added in PySys 1.6.0. :param bool mustExist: Set this to False to tolerate the file not existing and treat a missing file like an empty file. Added in PySys 2.2. :param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching, combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``. For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot be used because expressions are matched against one line at a time. Added in PySys 1.5.1. :return: A str containing the matching expression, None if there are no matches, or if the expr contains any ``(?P<groupName>...)`` named groups a dict[str,str] is returned where the keys are the groupNames. """ return self.getExprFromFile(path=path, expr=expr, returnAll=False, returnNoneIfMissing=True, encoding=encoding, reFlags=reFlags, mappers=mappers, mustExist=mustExist, **kwargs)
[docs] def grepAll(self, path, expr, encoding=None, reFlags=0, mappers=[], mustExist=True, **kwargs): r"""Returns a list of all the occurrences of a regular expression in the specified file. See also `grep` and `grepOrNone` for return-first-only behaviour. If you have a complex expression with multiple values to extract, you can use ``(?P<groupName>...)`` named groups in which case each item in the returned list is a dictionary is returned providing access to the individual elements:: authInfoDictList = self.grepAll('myserver.log', expr=r'Successfully authenticated user "(?P<username>[^"]*)" in (?P<authSecs>[^ ]+) seconds\.')) For extracting a single value you can use an unnamed group using ``(expr)`` syntax, in which case that group is returned:: myKey = self.grepAll('test.txt', r'myKey="(.*)"') # on a file containing 'myKey="foobar"' would return ["foobar"] .. versionadded: 2.0 :param str path: file to search (located in the output dir unless an absolute path is specified) :param str expr: the regular expression, optionally containing named groups. Remember to escape regular expression special characters such as ``.``, ``(``, ``[``, ``{`` and ``\`` if you want them to be treated as literal values. If you have a string with regex backslashes, it's best to use a 'raw' Python string so that you don't need to double-escape them, e.g. ``expr=r'function[(]"str", 123[.]4, (\d+), .*[)]'``. :param str encoding: The encoding to use to open the file. The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. :param List[callable[str]->str] mappers: A list of filter functions that will be used to pre-process each line from the file (returning None if the line is to be filtered out). This provides a very powerful capability for filtering the file, for example `pysys.mappers.IncludeLinesBetween` provides the ability to filter in/out sections of a file. Do not share mapper instances across multiple tests or threads as this can cause race conditions. Added in PySys 1.6.0. :param bool mustExist: Set this to False to tolerate the file not existing and treat a missing file like an empty file. Added in PySys 2.2. :param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching, combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``. For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot be used because expressions are matched against one line at a time. Added in PySys 1.5.1. :return: A list where each item is a str containing the matching expression, or if the expr contains any ``(?P<groupName>...)`` named groups each item is a dict[str,str] where the keys are the groupNames. """ return self.getExprFromFile(path=path, expr=expr, returnAll=True, returnNoneIfMissing=False, encoding=encoding, reFlags=reFlags, mappers=mappers, mustExist=mustExist, **kwargs)
[docs] def getExprFromFile(self, path, expr, groups=[1], returnAll=False, returnNoneIfMissing=False, mustExist=True, encoding=None, encodingReplaceOnError=False, reFlags=0, mappers=[]): r""" Searches for a regular expression in the specified file, and returns it. Use of this function is discouraged - consider using `grep` / `grepOrNone` / `grepAll` instead. If the regex contains unnamed groups using ``(expr)`` syntax, the specified group is returned. If the expression is not found, an exception is raised, unless returnAll=True or returnNoneIfMissing=True. For example:: myKey = self.getExprFromFile('test.txt', r'myKey="(.*)"') # on a file containing 'myKey="foobar"' would return "foobar" err = self.getExprFromFile('test.txt', r'ERROR .*') # on a file containing 'ERROR It went wrong' would return "that entire string" If you have a complex expression with multiple values to extract, it is usually clearer to use ``(?P<groupName>...)`` named groups rather than unnamed groups referenced by index. This produces a dictionary:: authInfo = self.getExprFromFile('myserver.log', expr=r'Successfully authenticated user "(?P<username>[^"]*)" in (?P<authSecs>[^ ]+) seconds\.')) allAuthList = self.getExprFromFile('myserver.log', expr=r'Successfully authenticated user "(?P<username>[^"]*)" in (?P<authSecs>[^ ]+) seconds\.', returnAll=True)) See also `pysys.basetest.BaseTest.assertThatGrep` which should be used when instead of just finding out what's in the file you want to assert that a specific expression is matched. The documentation for assertGrep also provides some helpful examples of regular expressions that could also be applied to this method, and tips for dealing with escaping in regular expressions. .. versionchanged:: 1.6.0 Support for named groups was added in 1.6.0. :param str path: file to search (located in the output dir unless an absolute path is specified) :param str expr: the regular expression, optionally containing the regex group operator ``(...)`` Remember to escape regular expression special characters such as ``.``, ``(``, ``[``, ``{`` and ``\`` if you want them to be treated as literal values. If you have a string with regex backslashes, it's best to use a 'raw' Python string so that you don't need to double-escape them, e.g. ``expr=r'function[(]"str", 123[.]4, (\d+), .*[)]'``. :param List[int] groups: which numeric regex group numbers (as indicated by brackets in the regex) should be returned; default is ``[1]`` meaning the first group. If more than one group is specified, the result will be a tuple of group values, otherwise the result will be the value of the group at the specified index as a str. This parameter is ignored if the regular expression contains any ``(?P<groupName>...)`` named groups. :param bool returnAll: returns a list containing all matching lines if True, the first matching line otherwise. :param bool returnNoneIfMissing: True to return None instead of raising an exception if the regex is not found in the file (not needed when returnAll is used). :param bool mustExist: Set to False to tolerate the file not existing and treat a missing file like an empty file. :param str encoding: The encoding to use to open the file. The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. :param bool encodingReplaceOnError: Set to True to replace erroneous characters that are invalid in the expected encoding (with a backslash escape) rather than throwing an exception. Added in PySys 2.2. :param List[callable[str]->str] mappers: A list of filter functions that will be used to pre-process each line from the file (returning None if the line is to be filtered out). This provides a very powerful capability for filtering the file, for example `pysys.mappers.IncludeLinesBetween` provides the ability to filter in/out sections of a file. Do not share mapper instances across multiple tests or threads as this can cause race conditions. Added in PySys 1.6.0. :param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching, combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``. For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot be used because expressions are matched against one line at a time. Added in PySys 1.5.1. :return: For a regular expression with one unnamed group, the match value is a str; if there are multiple unnamed numeric groups it is List[str] (with values corresponding to the groups= argument); if it contains any ``(?P<groupName>...)`` named groups a dict[str,str] is returned where the keys are the groupNames. If returnAll=True, the return value is a list of all the match values, with types as above. """ namedGroupsMode = False compiled = re.compile(expr, flags=reFlags) namedGroupsMode = compiled.groupindex path = os.path.join(self.output, path) assert not os.path.isdir(path), 'Cannot grep directory: %s'%path matches = [] if mustExist is False and not os.path.exists(path): pass else: with openfile(path, 'r', encoding=encoding or self.getDefaultFileEncoding(path), errors='backslashreplace' if encodingReplaceOnError else None) as f: for l in applyMappers(f, mappers): match = compiled.search(l) if not match: continue if namedGroupsMode: val = match.groupdict() elif match.groups(): val = match.group(*groups) else: val = match.group(0) if returnAll: matches.append(val) else: return val if returnAll: return matches if returnNoneIfMissing: return None if os.path.getsize(path) == 0: # can happen due to race conditions in file system writing; maybe they need a waitForGrep raise Exception('Could not find expression %s in %s because file is empty'%(quotestring(expr), os.path.basename(path))) raise Exception('Could not find expression %s in %s'%(quotestring(expr), os.path.basename(path)))
[docs] def logFileContents(self, path, includes=None, excludes=None, maxLines=20, tail=False, encoding=None, logFunction=None, reFlags=0, stripWhitespace=True, mappers=[], color=True, message=None): """ Logs some or all of the lines from the specified file. If the file does not exist or cannot be opened, does nothing. The method is useful for providing key diagnostic information (e.g. error messages from tools executed by the test) directly in run.log, or to make test failures easier to triage quickly. :param str path: May be an absolute, or relative to the test output directory. :param list[str] includes: Optional list of regex strings. If specified, only matches of these regexes will be logged. :param list[str] excludes: Optional list of regex strings. If specified, no line containing these will be logged. The variable ``self.logFileContentsDefaultExcludes`` (= ``[]`` by default) is used when this method is called with the default argument of ``excludes=None``, and can be used to provide a global set of default exclusion lines shared by all your tests, which is particularly useful if some processes always log some unimportant text to stderr (or stdout) that would be distracting to log out. Added in PySys 1.6.0. :param List[callable[str]->str] mappers: A list of filter functions that will be used to pre-process each line from the file (returning None if the line is to be filtered out). This provides a very powerful capability for filtering the file, for example `pysys.mappers.IncludeLinesBetween` provides the ability to filter in/out sections of a file. Do not share mapper instances across multiple tests or threads as this can cause race conditions. Added in PySys 2.0. :param int maxLines: Upper limit on the number of lines from the file that will be logged. Set to zero for unlimited :param bool tail: Prints the _last_ 'maxLines' in the file rather than the first 'maxLines'. :param str encoding: The encoding to use to open the file. The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. Any character encoding errors will result in backslash escape sequences being logged instead. :param Callable[[line],None] logFunction: The function that will be used to log individual lines from the file. Usually this is ``self.log.info(u' %s', line, extra=BaseLogFormatter.tag(LOG_FILE_CONTENTS))`` but a custom implementation can be provided, for example to provide a different color using `pysys.utils.logutils.BaseLogFormatter.tag`. Added in PySys 1.5.1. :param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching, combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``. For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot be used because expressions are matched against one line at a time. Added in PySys 1.5.1. :param bool stripWhitespace: By default blank lines are removed; set this to False to disable that behaviour. Added in PySys 2.0. :param bool color: By default logged lines are colored blue to distinguish from the rest of the log contents. Set this to False to disable coloring. Added in PySys 2.1. :param str message: The introductory message to log before the file content, with ``@PATH@`` as a placeholder for the path. If not specified the default is equivalent to ``"Contents of @PATH@: "``. Added in PySys 2.2 :return: True if anything was logged, False if not. """ if excludes is None: excludes = self.logFileContentsDefaultExcludes if not path: return False actualpath= os.path.join(self.output, path) try: # always open with a specific encoding not in bytes mode, since otherwise we can't reliably pass the read lines to the logger f = openfile(actualpath, 'r', encoding=encoding or self.getDefaultFileEncoding(actualpath) or PREFERRED_ENCODING, errors='backslashreplace') except Exception as e: self.log.debug('logFileContents cannot open file "%s": %s', actualpath, e) return False try: lineno = 0 def matchesany(s, regexes): assert not isstring(regexes), 'must be a list of strings not a string' for x in regexes: m = re.search(x, s, flags=reFlags) if m: return m.group(0) return None tolog = [] for l in applyMappers(f, mappers): if stripWhitespace: l = l.rstrip() if len(l) == 0: continue else: l = l.strip('\n\r') if includes: l = matchesany(l, includes) if not l: continue if excludes and matchesany(l, excludes): continue lineno +=1 tolog.append(l) if maxLines: if not tail and len(tolog) == maxLines: tolog.append('...') break if tail and len(tolog)==maxLines+1: del tolog[0] finally: f.close() if not tolog: return False logextra = BaseLogFormatter.tag(LOG_FILE_CONTENTS if color else None, suppress_prefix=True) if logFunction is None: def logFunction(line): self.log.info(u' %s', l, # special-case printing of lines that already have coloring - don't add any extra colors to such lines extra=BaseLogFormatter.tag(None, suppress_prefix=True) if color and '\033[' in line else logextra) path = os.path.normpath(path) if path.startswith(self.output): path = path[len(self.output)+1:] self.log.info('%s', message.replace('@PATH@', fromLongPathSafe(path)) if message else u'Contents of %s%s: '%(fromLongPathSafe(path), ' (filtered)' if includes or excludes else ''), extra=BaseLogFormatter.tag(LOG_FILE_CONTENTS, suppress_prefix=False)) for l in tolog: logFunction(l) self.log.info(' -----', extra=logextra) self.log.info('', extra=logextra) return True
[docs] def listDirContents(self, path, recurse=True): r""" Recursively scans the specified directory and returns a sorted list of the file/directory paths under it suitable for diffing. The contents are returned in a normalized form suitable for diffing: relative to the scanned path, with forward slashes on all platforms, a trailing slash for directories, and sorted to ensure deterministic results. Symbolic links are not searched. For example this can be used with `pysys.basetest.BaseTest.assertDiff` like this:: self.assertDiff( self.write_text('MyDir-contents.txt', '\\n'.join( self.listDirContents('MyDir') ))) :param str path: The path to search, either absolute or relative to the output directory. :param bool recurse: Set this to False to just include the specified directory but not any children. :return: A list of strings with the relative paths found, e.g. ``["mysubdir/myfile.txt", "mysubdir/mysubsubdir/"]``. .. versionadded:: 2.2 """ return pysys.utils.fileutils.listDirContents(os.path.join(self.output, path), recurse=recurse)
[docs] def mkdir(self, path): """ Create a directory, with recursive creation of any parent directories. This function does nothing (does not raise an except) if the directory already exists. :param path: The path to be created. This can be an absolute path or relative to the testcase output directory. :return: the absolute path of the new directory, to facilitate fluent-style method calling. """ path = os.path.join(self.output, path) mkdir(path) return path
def deletedir(self, path, **kwargs): return self.deleteDir(path, **kwargs)
[docs] def deleteDir(self, path, **kwargs): """ Recursively delete the specified directory. Does nothing if it does not exist. Raises an exception if the deletion fails. :param path: The path to be deleted. This can be an absolute path or relative to the testcase output directory. :param kwargs: Any additional arguments such as ``retries`` and ``ignore_errors`` are passed to L{pysys.utils.fileutils.deletedir()}. """ return deletedir(os.path.join(self.output, path), **kwargs)
[docs] def deleteFile(self, path, **kwargs): """ Delete the specified file, with optional retries and ignoring of errors. Does nothing if it does not exist. Raises an exception if the deletion fails. :param path: The path to be deleted. This can be an absolute path or relative to the testcase output directory. :param kwargs: Any additional arguments such as ``retries`` and ``ignore_errors`` are passed to L{pysys.utils.fileutils.deletefile()}. """ return deletefile(os.path.join(self.output, path), **kwargs)
[docs] def getDefaultFileEncoding(self, file, **xargs): """ Specifies what encoding should be used to read or write the specified text file. This method is used to select the appropriate encoding whenever PySys needs to open a file, for example to wait for a signal, for a file-based assertion, or to write a file with replacements. Many methods allow the encoding to be overridden for just that call, but getDefaultFileEncoding exists to allow global defaults to be specified based on the filename. For example, this method could be overridden to specify that utf-8 encoding is to be used for opening filenames ending in .xml, .json and .yaml. The default implementation of this method uses pysysproject.xml configuration rules such as:: <default-file-encoding pattern="*.xml" encoding="utf-8"/> A return value of None indicates default behaviour, which is to use the default OS encoding, as specified by `pysys.constants.PREFERRED_ENCODING`. :param file: The filename to be read or written. This may be an absolute path or a relative path. :param xargs: Ensure that an ``**xargs`` argument is specified so that additional information can be passed to this method in future releases. :return: The encoding to use for this file, or None if default behaviour is to be used. For example, ``utf-8`` or (for UTF-8 with a Byte Order Mark), ``utf-8-sig``. """ file = file.replace('\\','/').lower() # normalize slashes and ignore case for e in self.project.defaultFileEncodings: # first match wins if fnmatch.fnmatchcase(file, e['pattern'].lower()) or fnmatch.fnmatchcase(os.path.basename(file), e['pattern'].lower()): return e['encoding'] return None
[docs] @staticmethod def compareVersions(v1, v2): """ Compares two alphanumeric dotted version strings to see which is more recent. Example usage:: if self.compareVersions(thisversion, '1.2.alpha-3') > 0: ... # thisversion is newer than 1.2.alpha-3 The comparison algorithm ignores case, and normalizes separators ./-/_ so that ``'1.alpha2'=='1Alpha2'``. Any string components are compared lexicographically with other strings, and compared to numbers strings are always considered greater. >>> ProcessUser.compareVersions('10-alpha5.dev10', '10alpha-5-dEv_10') == 0 # normalization of case and separators True >>> ProcessUser.compareVersions(b'1....alpha.2', u'1Alpha2') == 0 # ascii byte and unicode strings both supported True >>> ProcessUser.compareVersions('1.2.0', '1.2') 0 >>> ProcessUser.compareVersions('1.02', '1.2') 0 >>> ProcessUser().compareVersions('1.2.3', '1.2') > 0 True >>> ProcessUser.compareVersions('1.2', '1.2.3') -1 >>> ProcessUser.compareVersions('10.2', '1.2') 1 >>> ProcessUser.compareVersions('1.2.text', '1.2.0') # letters are > numbers 1 >>> ProcessUser.compareVersions('1.2.text', '1.2') # letters are > numbers 1 >>> ProcessUser.compareVersions('10.2alpha1', '10.2alpha') 1 >>> ProcessUser.compareVersions('10.2dev', '10.2alpha') # letters are compared lexicographically 1 >>> ProcessUser.compareVersions('', '') 0 >>> ProcessUser.compareVersions('1', '') 1 :param v1: A string containing a version number, with any number of components. :param v2: A string containing a version number, with any number of components. :return: an integer > 0 if v1>v2, an integer < 0 if v1<v2, or 0 if they are semantically the same. """ return pysys.utils.misc.compareVersions(v1, v2)
[docs] def write_text(self, file, text, encoding=None): r""" Writes the specified characters to a file in the output directory. :param file: The path of the file to write, either an absolute path or relative to the `self.output` directory. :param text: The string to write to the file, with ``\n`` for newlines (do not use `os.linesep` as the file will be opened in text mode so platform line separators will be added automatically). This must be a character string. :param encoding: The encoding to use to open the file. The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. :return str: Return the absolute path of the generated file. """ # This method provides similar functionality to the Python3 pathlib write_text method. out = os.path.join(self.output, file) with openfile(os.path.join(self.output, file), 'w', encoding=encoding or self.getDefaultFileEncoding(file)) as f: f.write(text) return out
[docs] @contextlib.contextmanager def disableLogging(self): """ Temporarily stops logging for the current thread. For example:: with self.disableLogging(): self.startProcess(...) Note that this method will do nothing if the log level if pysys is run with ``-vDEBUG`` or ``-vdisabledLogging=DEBUG``. .. versionadded:: 1.6.0 """ logger = logging.getLogger('pysys.disabledLogging') saved = pysys.internal.initlogging.pysysLogHandler.getLogHandlersForCurrentThread() if logger.isEnabledFor(logging.DEBUG): logger.debug('Ignoring disableLogging() request as debug logging is enabled') else: pysys.internal.initlogging.pysysLogHandler.setLogHandlersForCurrentThread([]) try: yield None finally: pysys.internal.initlogging.pysysLogHandler.setLogHandlersForCurrentThread(saved)
[docs] def unpackArchive(self, archive, dest=None, autoCleanup=True): """ Unpacks the specified file(s) from an archive to a directory. Supports archive format such as zip/tar.gz/gz/tar.xz/xz. It is a good idea to store large textual Input/ assets (such as log files, which usually compress very well) as compressed archives to reduce disk space in your version control system. By default this method will automatically delete the extracted file/dir during test cleanup so it doesn't sit around on disk (or in CI uploaded failure archives) consuming space; if the file/dir is mutated by the test you may wish to disable this so you can manually inspect them by setting ``autoCleanup=False``. For example:: unpacked = self.unpackArchive('mybigfile.log.xz') # do something with "unpacked"... Note that ``.xz`` (for single files) and ``.tar.xz`` (for multiple files) are recommended for optimal compression, and these (and ``.gz``) are *significantly* better than zip, which performs poorly when compressing multiple similar text files into one archive. Don't use more than one single archive per testcase (if possible) to ensure you benefit from similarities between the various files. Files are decompressed in binary mode, so if you require platform-native line endings you should use `copy` to post-process them after decompressing. :param str archive: The path of the archive to unpack, by default from the test input directory. Alternatively you could provide an absolute path using ``self.project.testRootDir`` or similar if an archive is shared across multiple test cases. :param str dest: The directory in which the contents of the archive will be written; by default this is the test output directory for archive types that are always single-file, and a subdirectory named after the archive if not. This dir will be created if needed. :param bool autoCleanup: Automatically deletes the unpackaged file/directory during test cleanup to save disk space (even if the test fails). :return: The full path to the decompressed file or directory. """ self.log.info('Unpacking archive from %s to %s%s', archive, dest or '<test output dir>', ' (with auto-delete during test cleanup)' if autoCleanup else '') dest = toLongPathSafe(os.path.join(self.output, dest or self.output)).rstrip('/\\') self.mkdir(dest) archive = os.path.join(self.input, archive) archivebasename = os.path.basename(archive).lower() if '.tar.' not in archivebasename: # shutil can't do individual non-tar'd files for ext, module in [('.xz', 'lzma'), ('.gz', 'gzip')]: if archivebasename.endswith(ext): dest = os.path.join(dest, os.path.basename(archive)[:-len(ext)]) module = importlib.import_module(module) # some compression modules are not available on all systems, so only do this on demand with module.open(archive) as src, open(dest, 'wb') as decompressed: shutil.copyfileobj(src, decompressed) if autoCleanup: self.addCleanupFunction(lambda: self.deleteFile(dest, ignore_errors=True)) return dest # don't unpack multiple files to the output dir directly as then we can't delete them if os.path.samefile(dest, toLongPathSafe(self.output)): dest = os.path.join(dest, os.path.basename(archive)[:os.path.basename(archive).find('.')]) if autoCleanup: self.addCleanupFunction(lambda: self.deleteDir(dest, ignore_errors=True)) # this takes care of multi-file archives such as tar.XXX/zip etc unpackargs = {} if sys.version_info[:2] == (3, 12) and not '.zip' in archive: # avoid deprecation warning in 3.12 by explicitly requesting safe tar unpacking; unfortunately not accepted by .zip # see https://github.com/python/cpython/issues/102950#issuecomment-1745101949 # for 3.13+ we may have to extend this hack, but maybe there will be a better fix by then unpackargs['filter'] = 'data' shutil.unpack_archive(archive, dest, **unpackargs) return dest
[docs] def copy(self, src, dest, mappers=[], encoding=None, symlinks=False, ignoreIf=None, skipMappersIf=None, overwrite=None): r"""Copy a directory or a single text or binary file, optionally tranforming the contents by filtering each line through a list of mapping functions. If any `pysys.mappers` are provided, the file is copied in text mode and each mapper is given the chance to modify or omit each line, or even reorder the lines of the file. If no mappers are provided, the file is copied in binary mode. For example:: self.copy('output-raw.txt', 'output-processed.txt', encoding='utf-8', mappers=[ lambda line: None if ('Timestamp: ' in line) else line, lambda line: line.replace('foo', 'bar'), pysys.mappers.IncludeLinesBetween('Error message .*:', stopBefore='^$'), pysys.mappers.RegexReplace(pysys.mappers.RegexReplace.DATETIME_REGEX, '<timestamp>'), ]) In addition to the file contents the attributes such as modification time and executable permission will be copied where possible. This function is useful for creating a modified version of an output file that's more suitable for later validation steps such as diff-ing, and also for copying required files from the input to the output directory. It can also be used for copying a whole directory, similar to ``shutil.copytree`` but with the advantages of support for long paths on Windows, better error safety, and that relative paths are evaluated relative to the self.output directory (which is both convenient, and safer than shutil's evaluation relative to the current working directory). For example:: self.copy('src.txt', 'dest.txt') # copies to outputdir/dest.txt self.copy('src.txt', self.output) # copies to outputdir/src.txt, since self.output is an existing directory self.copy('src.txt', 'foo/') # copies to outputdir/foo/src.txt since destination ends with a slash self.copy('srcdirname', 'foo/') # copies to outputdir/foo/srcdirname since destination ends with a slash For more information about pre-defined mappers, see `pysys.mappers`. Custom mappers can be specified as simple functions or lambdas, however for advanced use cases you can additionally provide ``mapper.fileStarted([self,] srcPath, destPath, srcFile, destFile)`` and/or ``mapper.fileFinished(...)`` methods to allow stateful operations, or to perform extra read/write operations before lines are read/written. For example:: class CustomLineMapper(object): def fileStarted(self, srcPath, destPath, srcFile, destFile): self.src = os.path.basename(srcPath) def __call__(self, line): return '"'+self.src+'": '+line def fileFinished(self, srcPath, destPath, srcFile, destFile): destFile.write('\n' + 'footer added by CustomLineMapper') self.copy('src.txt', 'dest.txt', mappers=[CustomLineMapper()]) .. versionchanged:: 1.6.0 Ability to copy directories was added, along with the ``overwrite=``, ``symlinks=``, ``ignoreIf=`` and ``skipMappersIf=`` arguments. :param str src: The source filename or directory, which can be an absolute path, or a path relative to the ``self.output`` directory. Use ``src=self.input+'/myfile'`` if you wish to copy a file from the test input directory. :param str dest: The destination file or directory name, which can be an absolute path, or a path relative to the `self.output` directory. Destination file(s) are overwritten if the dest already exists. The dest and src can be the same for file copies (but not directory copies). As a convenience to avoid repeating the same text in the src and destination, if the dest ends with a slash, or the src is a file and the dest is an existing directory, the dest is taken as a parent directory into which the src will be copied in retaining its current name. It is best to avoid copies where the src dir already contains the dest (which would be recursive) such as copying the test dir (possibly configured as ``self.input``) to destination ``self.output``, however if this is attempted PySys will log a warning and copy everything else except the recursive part. :param bool overwrite: If True, source files will be allowed to overwrite destination files, if False an exception will be raised if a destination file already exists. By default overwrite=None which means it's enabled for single file copy() but disabled for directory copies. :param List[callable[str]->str] mappers: A list of filter functions that will be applied, in order, to map each line from source to destination. Each function accepts a string for the current line as input and returns either a string to write or None if the line is to be omitted. Any ``None`` items in the mappers list will be ignored. Mappers must always preserve the final ``\n`` of each line (if present). See `pysys.mappers` for some useful predefined mappers such as `pysys.mappers.IncludeLinesBetween`, `pysys.mappers.RegexReplace` and `pysys.mappers.SortLines`. If present the ``mapper.fileStarted(...)`` and/or ``mapper.fileFinished(...)`` methods will be called on each mapper in the list at the start and end of each file; see above for an example. Do not share mapper instances across multiple tests or threads as this can cause race conditions. :param str encoding: The encoding to use to open the file (only used if mappers are provided; if not, it is opened in binary mode). The default value is None which indicates that the decision will be delegated to the L{getDefaultFileEncoding()} method. :param bool symlinks: Set to True if symbolic links in the source tree should be represented as symbolic links in the destination (rather than just being copied). :param callable[str]->bool ignoreIf: A callable that accepts a source path and returns True if this file/directory should be omitted from a directory copy. For example: ``ignoreIf=lambda src: src.endswith(('.tmp', '.log')))`` :param callable[str]->bool skipMappersIf: A callable that accepts a source path and returns True if this file should be copied in binary mode (as if no mappers had been specified). For example: ``skipMappersIf=lambda src: not src.endswith(('.xml', '.properties')))`` :return str: the absolute path of the destination file. """ origdest = dest src = toLongPathSafe(os.path.join(self.output, src)) srcIsDir = os.path.isdir(src) dest = toLongPathSafe(os.path.join(self.output, dest)).rstrip('/\\') if origdest.endswith((os.sep, '/', '\\')) or (not srcIsDir and os.path.isdir(dest)): dest = toLongPathSafe(dest+os.sep+os.path.basename(src)) self.log.debug('Copying %s to %s', src, dest) if src == dest and not srcIsDir: dest = src+'__pysys_copy.tmp' renameDestAtEnd = True else: renameDestAtEnd = False assert src != dest, 'Source and destination directory cannot be the same' if overwrite is None: overwrite = not srcIsDir if srcIsDir: destexists = os.path.isdir(dest) if not destexists: self.mkdir(dest) with os.scandir(src) as iterator: for e in iterator: path = e.path if ignoreIf is not None and ignoreIf(path): continue if dest.lower().startswith(path.lower()+os.sep): self.log.warning(f'Copy will ignore {dest[len(src):]} while copying from {src} to avoid recursive copy; it is best to avoid having a source path that is a parent dir of the destination') continue if e.is_symlink() and symlinks: linkdest = dest+os.sep+os.path.basename(path) os.symlink(os.readlink(path), linkdest) shutil.copystat(path, linkdest, follow_symlinks=False) continue self.copy(path, dest+os.sep+os.path.basename(path), # must pass all other arguments through mappers=mappers, encoding=encoding, symlinks=symlinks, ignoreIf=ignoreIf, skipMappersIf=skipMappersIf, overwrite=overwrite) return dest if skipMappersIf is not None and skipMappersIf(src): mappers = None if not overwrite and os.path.exists(dest): raise Exception('copy() will not overwrite an existing file unless overwrite=True: %s'%dest) if not mappers: # simple binary copy shutil.copyfile(src, dest) else: if None in mappers: mappers = [m for m in mappers if m] with openfile(src, 'r', encoding=encoding or self.getDefaultFileEncoding(src)) as srcf: with openfile(dest, 'w', encoding=encoding or self.getDefaultFileEncoding(dest)) as destf: # give mappers a change to setup initial state and/or read/write from the source and dest files for mapper in mappers: fn = getattr(mapper, 'fileStarted', None) if fn: fn(src, dest, srcf, destf) for line in applyMappers(srcf, mappers): destf.write(line) for mapper in mappers: fn = getattr(mapper, 'fileFinished', None) if fn: fn(src, dest, srcf, destf) shutil.copystat(src, dest) if renameDestAtEnd: os.remove(src) try: os.rename(dest, src) except Exception: # pragma: no cover - work around windows file locking issues self.pollWait(20) os.rename(dest, src) return src return dest
[docs] def createThreadPoolExecutor(self, maxWorkers=None) -> concurrent.futures.ThreadPoolExecutor: """ Create a PySys-friendly instance of Python's ThreadPoolExecutor, configured with support for PySys loggers, cleanup, and a recommended default number of workers. During cleanup, the thread pool is shutdown, with any unstarted futures cancelled (requires Python 3.9+), and a wait until all in-progress futures have completed. This is useful for tests that need to perform many latency-bound operations such as making HTTP requests. Do not use it for starting lots of processes in parallel and waiting for them, since that is more easily achieved using `waitForBackgroundProcesses()`. If you use ``submit`` (rather than ``map``), then any exceptions during the submitted job will not be logged anywhere unless you you wait for the submitted job (or add an exception handler). Note that this class is not thread-safe (apart from ``addOutcome``, ``startProcess`` and the reading of fields like ``self.output`` that don't change) so if you need to use its fields or methods from background threads, be sure to add your own locking to the foreground and background threads in your test, including any custom cleanup functions. Example usage:: tp = self.createThreadPoolExecutor() results = tp.map(items, makeHTTPRequest) # Or for more complex use cases: submittedFutures = [] submittedFutures.append(tp.submit(...)) concurrent.futures.wait(submittedFutures) .. versionadded:: 2.2 :param int maxWorkers: Overrides the maximum number of worker threads that can be created by this pool. Only override this if needed. The default maxWorkers is configured in ``self.threadPoolMaxWorkers`` and is currently set at 6 since a higher number might overload a machine (or Python, due to the GIL) if pools are used by many/all of the PySys workers/tests running in parallel. The default may change in future. If overriding this value, use a small number of workers for Python-based logic that will hold the Python Global Interpreter Lock, or a larger/more scalable number of workers for heavily I/O-bound operations with little Python logic. :return: A ``concurrent.futures.ThreadPoolExecutor`` instance. """ if not maxWorkers: maxWorkers = self.threadPoolMaxWorkers log.info('Created thread pool with %s workers', maxWorkers) # be explicit, since number of workers could affect test race conditions pool = concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers, initializer=pysys.utils.threadutils.createThreadInitializer(self)) def cleanupThreadPool(): log.info('Shutting down thread pool') starttime = time.monotonic() if sys.version_info[:2] >= (3, 9): pool.shutdown(wait=True, cancel_futures=True) else: pool.shutdown(wait=True) (log.info if time.monotonic()-starttime>5 else log.debug)('Completed shutdown of thread pool after %0.1f seconds', time.monotonic()-starttime) self.addCleanupFunction(cleanupThreadPool) return pool
if not IS_WINDOWS: ProcessUser.isRunnerAbortingHandle, ProcessUser._isRunnerAbortingWriteHandle = os.pipe() import pysys.process.helper