Source code for pysys.process.monitor

#!/usr/bin/env python
# PySys System Test Framework, Copyright (C) 2006-2022 M.B. Grieve

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""
Contains the process monitoring API used by L{pysys.basetest.BaseTest.startProcessMonitor}.

The main components are the L{pysys.process.monitor.BaseProcessMonitor} class, L{ProcessMonitorKey} constants for identifying 
columns and the default L{ProcessMonitorTextFileHandler} class for writing monitoring information to a file. 
"""

__all__ = ['BaseProcessMonitor', 'BaseProcessMonitorHandler', 'ProcessMonitorTextFileHandler', 'ProcessMonitorKey']

import os, sys, string, time, threading, logging, multiprocessing
from pysys import process_lock
from pysys.constants import *
from pysys.utils.pycompat import *


if PLATFORM=='win32' and 'sphinx' not in sys.modules:
	import win32api, win32pdh, win32con, win32process # pragma: no cover

log = logging.getLogger('pysys.processmonitor')

[docs]class ProcessMonitorKey(object): """ Contains constants for supported process monitor keys. Some of these keys are not currently returned on all platforms. These constants provide the display names used for column headings etc. Usually L{CPU_CORE_UTILIZATION} is the best key for measuring CPU utilization and L{MEMORY_RESIDENT_KB} is the most useful way to monitor memory usage. """ CPU_CORE_UTILIZATION = 'CPU core %' """CPU utilization % scaled by the number of cores so that 100% indicates full use of one core, 200% indicates full use of two cores, etc. The maximum value is 100 multiplied by the number of CPU cores (as given by `multiprocessing.cpu_count()`). The type is C{int}. """ CPU_TOTAL_UTILIZATION = 'CPU total %' """Total utilization % of all available CPU cores, with a maximum value of 100%. If you have 2 cores and one of them is 50% utilized, the value would be 25%. The type is C{int}. """ MEMORY_RESIDENT_KB = 'Resident memory kB' """ Resident / working set memory usage. This is the non-swapped physical memory, and is usually a good statistic to use for checking for memory leaks and excessive memory usage. On Unix it is equivalent to `rss`/`RES` and on Windows to the working set memory usage. The type is C{int}. """ MEMORY_VIRTUAL_KB = 'Virtual memory kB' """ Virtual memory of the process including memory that is swapped out. On Unix it is equivalent to `vsz`/`VIRT`, and on Windows to the current page file usage. The type is C{int}. """ DATE_TIME = 'Time' """String representation of the date and local time for this sample in yyyy-mm-dd HH:MM:SS format. """ DATE_TIME_LEGACY = 'Time (legacy)' """String representation of the date and local time for this sample in a format compatible with v1.3.0 and earlier versions of PySys. This is %d/%m/%y %H:%M:%S on Windows and %m/%d/%y %H:%M:%S on Unix. @deprecated: Use L{DATE_TIME} if possible. """ SAMPLE = 'Sample' """A counter starting from 1 and incrementing with each new set of sample data. The type is C{int}. """
[docs]class BaseProcessMonitorHandler(object): """ Interface to be implemented to provide a custom handler that records or processes data from a L{BaseProcessMonitor}. """
[docs] def handleData(self, data, **kwargs): """ Called on a background thread each time a new sample of monitoring data is available. :param data: a dictionary whose keys are from L{ProcessMonitorKey} (or any new keys added by custom process monitor implementations). The dictionary values are of type C{int}, C{float} or C{string}. :param kwargs: Reserved for future use. """ raise NotImplementedError('Not implemented yet')
[docs] def cleanup(self): """ Called on a background thread to perform cleanup for this handler, for example closing file handles. """ pass
[docs]class ProcessMonitorTextFileHandler(BaseProcessMonitorHandler): """Handles process monitor data by writing the data as delimited values to a file, usually with tab separated values (.tsv). A new line is written every time the process monitor polls for a new sample of data. By default a header line starting with `#` is included at the start of the file to indicate the column headings. If any value cannot be retrieved or is unavailable on this operating system, a -1 value will be written instead. Uses default values set on the instance unless keyword overrides are provided; see L{setDefaults}. :param str file: An absolute path string or open file handle to which process monitor data lines will be written. :param list[str] columns: An ordered list of the columns from L{ProcessMonitorKey} that should be included in the file. If not specified, the columns specified by L{DEFAULT_COLUMNS} will be used. :param str delimiter: The delimiter string used between each column. If not specified, the string specified by L{DEFAULT_DELIMITER} will be used. :param bool writeHeaderLine: Determines whether a header line prefixed by `#` will be written at the start of the file. If not overridden, the default is taken from L{DEFAULT_WRITE_HEADER_LINE}. """ DEFAULT_COLUMNS = [ ProcessMonitorKey.DATE_TIME, ProcessMonitorKey.CPU_CORE_UTILIZATION, ProcessMonitorKey.MEMORY_RESIDENT_KB, ProcessMonitorKey.MEMORY_VIRTUAL_KB, ] """The default columns to write to the file. If using a process monitor that comes with PySys, the values are from L{ProcessMonitorKey}. Additional columns may be added to the end of this list in future releases. See L{setDefaults} if you wish to change the defaults. """ DEFAULT_WRITE_HEADER_LINE = True """ Determines whether a header line prefixed by `#` will be written at the start of the file. See L{setDefaults}. """ DEFAULT_DELIMITER = u'\t' """ The delimiter used between each column. Usually a tab character. See L{setDefaults}. """
[docs] @staticmethod def setDefaults(columns, delimiter=None, writeHeaderLine=None): """Static helper method for setting the default columns or C{writeHeaderLine} setting for all tests that use the C{ProcessMonitorTextFileHandler}. This method could be called from a custom runner's L{pysys.baserunner.BaseRunner.setup} method in order to take effect for all tests. Do not call this from within an individual testcase since that could cause unwanted interference between different testcases. :param columns: A list of the colums to be included, using values from L{ProcessMonitorKey}. Since additional columns may be added to the end of L{DEFAULT_COLUMNS} in future releases, when calling this method you should specify all the columns you want explicitly including the current defaults rather than writing `DEFAULT_COLUMNS+[...]`. :param delimiter: The delimiter string used between each column. :param writeHeaderLine: Specifies whether a header line beginning with `#` should be written at the start of the file. """ if columns: ProcessMonitorTextFileHandler.DEFAULT_COLUMNS = list(columns) if writeHeaderLine is not None: ProcessMonitorTextFileHandler.DEFAULT_WRITE_HEADER_LINE = writeHeaderLine if delimiter is not None: ProcessMonitorTextFileHandler.DEFAULT_DELIMITER = delimiter
def __init__(self, file, columns=None, delimiter=None, writeHeaderLine=None): self.columns = columns or self.DEFAULT_COLUMNS self.delimiter = delimiter or self.DEFAULT_DELIMITER assert file, 'file must be specified' if isstring(file): assert os.path.isabs(file), 'File must be an absolute path: %s'%file self.stream = openfile(file, 'w', encoding='utf-8') self.__closeStream = True else: assert hasattr(file, 'write') self.stream = file self.__closeStream = False if writeHeaderLine is None: writeHeaderLine = self.DEFAULT_WRITE_HEADER_LINE if writeHeaderLine: self.stream.write(u'#%s\n'%self.delimiter.join(self.columns).replace(u' ',u'_')) self.stream.flush() def handleData(self, data): values = [data.get(k,None) for k in self.columns] line = self.delimiter.join([ (str(d) if d is not None else u'-1') for d in values]) self.stream.write(line) self.stream.write(u'\n') self.stream.flush() def cleanup(self): if self.__closeStream: self.stream.close()
[docs]class BaseProcessMonitor(object): """Process monitor for gathering statistics such as CPU and memory usage from a running process using a background thread. For detail on the statistic keys available from the standard process monitor classes see L{ProcessMonitorKey}. The most convenient way to start the default process monitor for this operating system is to use L{pysys.basetest.BaseTest.startProcessMonitor}. You can create a custom subclass if you need to add support for a new OS or return additional monitoring statistics. For example, you could create a custom process monitor for Java processes that returned heap size and thread count, or you could create a simple process monitor wrapper around a Python library that provides a wider set of monitoring statistics, such as psutil. The fields and methods starting with a single underscore `_` can be used or overridden by custom subclasses. To start the process monitor from a custom class, simply ensure you have passed the owner basetest to its constructor and then call the L{start} method. Monitors are automatically terminated during cleanup at the end of a test, or can be manually stopped before that using the L{stop} method. :param owner: The BaseTest owning this monitor. :param process: The process wrapper object. A numeric pid can be specified instead but with reduced functionality, so use a process object if you have one. :param interval: The interval in seconds between polling for each data sample. :param pmargs: Keyword arguments to allow parameterization of the returned data. An exception will be raised for any arguments not expected by this class. """ def __init__(self, owner, process, interval, handlers, **pmargs): # NB: this could be subclassed to support different platforms and/or add extra # data self.interval = interval # since 1.4.0 this is deprecated/undocumented, but keep it around # for compatibility self.__numProcessors=1 if "numProcessors" in pmargs: self.__numProcessors = int(pmargs.pop("numProcessors")) assert not pmargs, 'Unknown process monitor options: %s'%pmargs.keys() self.process = None if isinstance(process, int) else process """The process object. Can be None if a pid was passed directly. """ self.pid = process if isinstance(process, int) else process.pid """The pid to be monitored. """ self.owner = owner """The test object that owns this monitor. """ assert handlers, 'Must have at least one handler for the data generated by the process monitor' self.handlers = handlers """The list of handlers that will be notified each time the process is polled for new data. """ self.thread = None """ The background thread that responsible for monitoring the process. """ self._stopping = None """The C{threading.Event} that is set when the background thread should begin stopping. Call C{wait} on this object instead of sleeping to get fast wake-ups during cleanup. """ try: self._cpuCount = multiprocessing.cpu_count() """The count of available CPU cores on this host, used for scaling up the CPU_TOTAL_UTILIZATION. """ except Exception as ex: # pragma: no cover log.debug('Failed to get multiprocessing.cpu_count: %s', ex) self._cpuCount = 1
[docs] def start(self): """ Called on the main test thread to start monitoring in the background. Performs any required initialization of data structures then starts the background thread. :return: This instance. :rtype: L{BaseProcessMonitor} """ # executed on main thread - the best place to perform initial setup so we # get an immediate error if it fails self.thread = self.owner.startBackgroundThread(str(self), self.__backgroundThread) return self
def __str__(self): return 'ProcessMonitor<%s>'%('%s pid=%d'%(self.process,self.pid) if self.process else self.pid)
[docs] def _preprocessData(self, data): """ Called in the background thread with the data dictionary from each poll of the process, to allow OS-agnostic pre-processing and addition of derived data keys such as the date and time. :param data: The dictionary of process monitoring data. This method may add or modify the contents of this dictionary. """ datetime = time.localtime(time.time()) data[ProcessMonitorKey.DATE_TIME] = time.strftime("%Y-%m-%d %H:%M:%S", datetime) data[ProcessMonitorKey.DATE_TIME_LEGACY] = time.strftime( "%d/%m/%y %H:%M:%S" if IS_WINDOWS else "%m/%d/%y %H:%M:%S", datetime) if data.get(ProcessMonitorKey.CPU_CORE_UTILIZATION,None) is not None: data[ProcessMonitorKey.CPU_TOTAL_UTILIZATION] = int(data[ProcessMonitorKey.CPU_CORE_UTILIZATION] / self._cpuCount) if self.__numProcessors > 1: # undocumented, for compatibility only data[ProcessMonitorKey.CPU_CORE_UTILIZATION] = data[ProcessMonitorKey.CPU_CORE_UTILIZATION] / self.__numProcessors
def __backgroundThread(self, log, stopping): self._stopping = stopping # for use by other process monitor methods sample = 1 try: while not stopping.is_set(): d = self._getData(sample) assert d, 'No data returned' d[ProcessMonitorKey.SAMPLE] = sample self._preprocessData(d) for h in self.handlers: h.handleData(d) sample += 1 stopping.wait(self.interval) except Exception as ex: if self.process and not self.process.running(): log.debug('Ignoring process monitor error as the monitored process %s has already terminated: %s', self.process, ex) else: raise finally: log.debug('Calling cleanup on process monitor handler(s)') try: for l in self.handlers: if hasattr(l, 'cleanup'): l.cleanup() finally: self._cleanup()
[docs] def running(self): """Return the running status of the process monitor. :return: True if the process monitor background thread is still running. :rtype: bool """ return self.thread.is_alive()
[docs] def stop(self, timeout=None): """Request the process monitor thread to terminate. Waits for the termination to complete if possible, but does not throw an exception if it fails. :param float timeout: Overrides the maximum time we will wait for the monitor to stop. By default this is ``['WaitForProcessStop']`` from `constants.TIMEOUTS`. """ self.thread.stop() self.thread.join(timeout=timeout, abortOnError=False)
# for implementation by subclasses
[docs] def _getData(self, sample): """Implement gathering of latest monitoring data. Called on the background monitoring thread regularly. :param sample: An integer starting at 1 and incrementing each time this method is called. :return: A dictionary of (typically numeric) values, keyed by L{ProcessMonitorKey}. :rtype: dict """ raise NotImplementedError('_getData must be implemented by subclass')
[docs] def _cleanup(self): """Perform implementation-specific cleanup. Called on the background monitoring thread when the monitor is stopping. """ pass