Source code for pysys.utils.osutils

#!/usr/bin/env python
# PySys System Test Framework, Copyright (C) 2006-2023 M.B.Grieve

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""
Contains operating system helper utilities such as the `getUsableCPUCount` function. 

"""

import sys, os
import threading
import logging
import time
import traceback
import math
import logging

from pysys.constants import *

__all__ = ['getUsableCPUCount']


__USABLE_CPU_COUNT = None

[docs]def getUsableCPUCount() -> float: """ The number of CPUs that are usable from this PySys process, as a floating point number. This may be less than the total number of CPUs due to restrictions from the operating system such as the process affinity mask and container cgroup (``cpu.cfs_quota_us`` / ``cpu.max``) limits. .. versionadded:: 2.2 """ if __USABLE_CPU_COUNT: return __USABLE_CPU_COUNT return _initUsableCPUCount()
def _initUsableCPUCount(): """ Internal, do not use. Called after importing BaseRunner (not when this module is imported) so that it's possible to monkey-patch it in user code (e.g. when the custom runner is imported) if required e.g. for a new platform. :meta private: Not public API """ log = logging.getLogger('pysys.initUsableCPUCount') global __USABLE_CPU_COUNT if __USABLE_CPU_COUNT: log.debug('Calling _initUsableCPUCount when it was previously called and returned %s', __USABLE_CPU_COUNT) try: cpus = len(os.sched_getaffinity(0)) # as recommended in Python docs, use the allocated CPUs for current process multiprocessing.cpu_count() except Exception: # no always available, e.g. on Windows cpus = os.cpu_count() assert cpus, cpus if (not IS_WINDOWS) and os.getenv('PYSYS_IGNORE_CGROUPS','').lower()!='true' and os.path.exists('/proc/self/cgroup'): # if https://github.com/python/cpython/issues/80235 is implemented we can defer to Python to calculate this cgroupslog = logging.getLogger('pysys.cgroups') cgroups = CgroupConfig() try: cfs_quota_us = int(cgroups.readFile('cpu.cfs_quota_us', v1Controller='cpu') or '0') cfs_period_us = int(cgroups.readFile('cpu.cfs_period_us', v1Controller='cpu') or '0') shares = int(cgroups.readFile('cpu.shares', v1Controller='cpu') or '0') # just for information cgroupsLimits = [] if cfs_quota_us>0 and cfs_period_us>0: cgroupsLimits.append(float(cfs_quota_us) / float(cfs_period_us)) # quota is per CPU, i.e. quota>period if multiple CPUs permitted cpuMax = cgroups.readFile('cpu.max', v1Controller=None).split(' ') if len(cpuMax)==2 and cpuMax[0].lower()!='max': cgroupsLimits.append(float(cpuMax[0]) / float(cpuMax[1])) # seems to work the same as the v1 quota and period # do NOT use cpu.shares as it's not possible to do reliably (e.g. cf https://bugs.openjdk.org/browse/JDK-8281181) cgroupsLimits.append(cpus) # don't ever use more than the total CPUs in the machine so add that to the list of limits cgroupslog.debug('Read cgroups configuration: v1 cpu.cfs_quota_us/cfs_period_us=%s/%s (ignored: cpu.shares=%s), v2 cpu.max=%s; limiting to min of: %s CPUs', cfs_quota_us or '?', cfs_period_us or '?', shares or '?', '/'.join(cpuMax) or '?', cgroupsLimits) reducedCPUs = min(cgroupsLimits) # use whatever limit is lowest if reducedCPUs<=0: reducedCPUs = 1 # should not happen, just a failsafe in case of weird cgroup config if reducedCPUs<cpus: cgroupslog.info('Reduced usable CPU count from %s to %s due to Cgroups configuration', cpus, reducedCPUs) cpus = reducedCPUs except Exception as ex: cgroupslog.info('Failed to read cgroups configuration to determine available CPUs: %r', ex) # cgroupslog.debug('Failed to read cgroups information due to:', exc_info=True) log.debug('Usable CPU count for process = %d', cpus) __USABLE_CPU_COUNT = cpus return cpus class CgroupConfig: """ Helper class for reading cgroups configuration for the current process, supporting both cgroups v1 and v2. This is a minimal implementation, and assumes the mount point is "/sys/fs/cgroup". Consider using a dedicated library if you need something more advanced. :meta private: This API is not yet stable enough for general use, and may change at any time. """ def __init__(self, mountRoot=None): self.pid = 'self' self.controllers = {} """ Maps a controller name (in v1, or "" for v2) to the full path of that controller. """ self.mountRoot = mountRoot or os.getenv('PYSYS_CGROUPS_ROOT_MOUNT', '/sys/fs/cgroup') # Would be more correct to look this up from /proc/self/mountinfo, but probably not necessary as almost everyone has it mounted in the standard location if IS_WINDOWS: self.mountRoot = None elif not os.path.exists(self.mountRoot): self._debuglog('No cgroup root is mounted at %s', mountRoot) self.mountRoot = None def __getControllerDir(self, cgroupsv1Controller): if not self.mountRoot: return None d = mountRoot = self.mountRoot if cgroupsv1Controller: d = d+'/'+cgroupsv1Controller elif not os.path.exists(mountRoot+'/cgroup.controllers'): return None # can't be cgroups v2 if there is no controllers file with open('/proc/'+self.pid+'/cgroup') as f: # should always exist # Get the PATH from a matching HIERARCHY:CONTROLLER_LIST:PATH line, where CONTROLLER_LIST is a comma-separated list m = re.search(r'\d+:%s:(.*)'%('' if not cgroupsv1Controller else "([^:]+,)?"+cgroupsv1Controller+"(,[^:]+)?"), f.read()) if not m: return None # return and log nothing if the relevant cgroup cgroup_path = m.groups()[-1].rstrip('/') # if it's "/" convert to "" if os.path.exists(d+cgroup_path): d = d+cgroup_path self._debuglog('Reading cgroup configuration for %s controller from "%s" as given by /proc/self/cgroup file', cgroupsv1Controller or '<cgroup v2>', d) else: # seems to often not exist in docker containers, as it's a path in the docker host that the container can't see self._debuglog('Reading cgroup configuration for %s controller from root dir "%s" since the path "%s" given by /proc/self/cgroup file was not found under the root dir', cgroupsv1Controller or '<cgroup v2>', d, cgroup_path) return d def readFile(self, filename, v1Controller=None): """ Return as a string the first line of the specified cgroups filename belonging to the specified controller (always "" for cgroups v2). :param str filename: The base filename within the controller, e.g. "cpu.max" :param str v1controller: To read cgroups v2 this should be None, for cgroups v1 this is the controller name, e.g. "cpu". :return: A string containing the first line of the specified cgroups file, or empty string '' if the file or controller is missing. """ v1Controller = v1Controller or '' assert ',' not in v1Controller, v1Controller if v1Controller not in self.controllers: # cache this where possible self.controllers[v1Controller] = self.__getControllerDir(v1Controller) dirname = self.controllers[v1Controller] if dirname and os.path.exists(dirname+'/'+filename): with open(dirname+'/'+filename) as f: return f.readline().strip() return '' def _debuglog(self, msg, *args): logging.getLogger('pysys.cgroups').debug(msg, *args)