Source code for ezmsg.sigproc.util.profile

import functools
import logging
import os
from pathlib import Path
import time
import typing

import ezmsg.core as ez


HEADER = "Time,Source,Topic,SampleTime,PerfCounter,Elapsed"


[docs] def get_logger_path() -> Path: # Retrieve the logfile name from the environment variable logfile = os.environ.get("EZMSG_PROFILE", None) # Determine the log file path, defaulting to "ezprofiler.log" if not set logpath = Path(logfile or "ezprofiler.log") # If the log path is not absolute, prepend it with the user's home directory and ".ezmsg/profile" if not logpath.is_absolute(): logpath = Path.home() / ".ezmsg" / "profile" / logpath return logpath
def _setup_logger(append: bool = False) -> logging.Logger: logpath = get_logger_path() logpath.parent.mkdir(parents=True, exist_ok=True) write_header = True if logpath.exists() and logpath.is_file(): if append: with open(logpath) as f: first_line = f.readline().rstrip() if first_line == HEADER: write_header = False else: # Remove the file if appending, but headers do not match ezmsg_logger = logging.getLogger("ezmsg") ezmsg_logger.warning( "Profiling header mismatch: please make sure to use the same version of ezmsg for all processes." ) logpath.unlink() else: # Remove the file if not appending logpath.unlink() # Create a logger with the name "ezprofile" _logger = logging.getLogger("ezprofile") # Set the logger's level to EZMSG_LOGLEVEL env var value if it exists, otherwise INFO _logger.setLevel(os.environ.get("EZMSG_LOGLEVEL", "INFO").upper()) # Create a file handler to write log messages to the log file fh = logging.FileHandler(logpath) fh.setLevel(logging.DEBUG) # Set the file handler log level to DEBUG # Add the file handler to the logger _logger.addHandler(fh) # Add the header if writing to new file or if header matched header in file. if write_header: _logger.debug(HEADER) # Set the log message format formatter = logging.Formatter( "%(asctime)s,%(message)s", datefmt="%Y-%m-%dT%H:%M:%S%z" ) fh.setFormatter(formatter) return _logger logger = _setup_logger(append=True) def _process_obj(obj, trace_oldest: bool = True): samp_time = None if hasattr(obj, "axes") and ("time" in obj.axes or "win" in obj.axes): axis = "win" if "win" in obj.axes else "time" ax = obj.get_axis(axis) len = obj.data.shape[obj.get_axis_idx(axis)] if len > 0: idx = 0 if trace_oldest else (len - 1) if hasattr(ax, "data"): samp_time = ax.data[idx] else: samp_time = ax.value(idx) if ax == "win" and "time" in obj.axes: if hasattr(obj.axes["time"], "data"): samp_time += obj.axes["time"].data[idx] else: samp_time += obj.axes["time"].value(idx) return samp_time
[docs] def profile_method(trace_oldest: bool = True): """ Decorator to profile a method by logging its execution time and other details. Args: trace_oldest (bool): If True, trace the oldest sample time; otherwise, trace the newest. Returns: Callable: The decorated function with profiling. """ def profiling_decorator(func: typing.Callable): @functools.wraps(func) def wrapped_func(caller, *args, **kwargs): start = time.perf_counter() res = func(caller, *args, **kwargs) stop = time.perf_counter() source = ".".join((caller.__class__.__module__, caller.__class__.__name__)) topic = f"{caller.address}" samp_time = _process_obj(res, trace_oldest=trace_oldest) logger.debug( ",".join( [ source, topic, f"{samp_time}", f"{stop}", f"{(stop - start) * 1e3:0.4f}", ] ) ) return res return wrapped_func if logger.level == logging.DEBUG else func return profiling_decorator
[docs] def profile_subpub(trace_oldest: bool = True): """ Decorator to profile a subscriber-publisher method in an ezmsg Unit by logging its execution time and other details. Args: trace_oldest (bool): If True, trace the oldest sample time; otherwise, trace the newest. Returns: Callable: The decorated async task with profiling. """ def profiling_decorator(func: typing.Callable): @functools.wraps(func) async def wrapped_task(unit: ez.Unit, msg: typing.Any = None): source = ".".join((unit.__class__.__module__, unit.__class__.__name__)) topic = f"{unit.address}" start = time.perf_counter() async for stream, obj in func(unit, msg): stop = time.perf_counter() samp_time = _process_obj(obj, trace_oldest=trace_oldest) logger.debug( ",".join( [ source, topic, f"{samp_time}", f"{stop}", f"{(stop - start) * 1e3:0.4f}", ] ) ) start = stop yield stream, obj return wrapped_task if logger.level == logging.DEBUG else func return profiling_decorator