Source code for spicelib.sim.sim_runner

#!/usr/bin/env python

# -------------------------------------------------------------------------------
#
#  ███████╗██████╗ ██╗ ██████╗███████╗██╗     ██╗██████╗
#  ██╔════╝██╔══██╗██║██╔════╝██╔════╝██║     ██║██╔══██╗
#  ███████╗██████╔╝██║██║     █████╗  ██║     ██║██████╔╝
#  ╚════██║██╔═══╝ ██║██║     ██╔══╝  ██║     ██║██╔══██╗
#  ███████║██║     ██║╚██████╗███████╗███████╗██║██████╔╝
#  ╚══════╝╚═╝     ╚═╝ ╚═════╝╚══════╝╚══════╝╚═╝╚═════╝
#
# Name:        sim_runner.py
# Purpose:     Tool used to launch LTSpice simulation in batch mode.
#
# Author:      Nuno Brum (nuno.brum@gmail.com)
#
# Created:     23-12-2016
# License:     refer to the LICENSE file
# -------------------------------------------------------------------------------
"""
Allows launching LTSpice simulations from a Python Script, thus allowing to overcome the 3 dimensions STEP limitation on
LTSpice, update resistor values, or component models.

The code snipped below will simulate a circuit with two different diode models, set the simulation
temperature to 80 degrees, and update the values of R1 and R2 to 3.3k. ::

    from spicelib.sim.sim_runner import SimRunner
    from spicelib.sim.sweep import sweep
    from spicelib.editor.spice_editor import SpiceEditor
    from spicelib.sim.ltspice_simulator import LTspice

    runner = SimRunner(simulator=LTspice, parallel_sims=4)
    editor = SpiceEditor("my_circuit.net")
    editor.set_parameters(temp=80)  # Sets the simulation temperature to be 80 degrees
    editor.set_component_value('R2', '3.3k')  #  Updates the resistor R2 value to be 3.3k
    for dmodel in ("BAT54", "BAT46WJ"):
        editor.set_element_model("D1", model)  # Sets the Diode D1 model
        for res_value in sweep(2.2, 2,4, 0.2):  # Steps from 2.2 to 2.4 with 0.2 increments
            editor.set_component_value('R1', res_value)  #  Updates the resistor R1 value to be 3.3k
            runner.run()

    runner.wait_completion()  # Waits for the LTSpice simulations to complete

    print(f"Total Simulations: {runner.runno}")
    print(f"Successful Simulations: {runner.okSim}")
    print(f"Failed Simulations: {runner.failSim}")

The first line will create a python class instance that represents the LTSpice file or netlist that is to be
simulated. This object implements methods that are used to manipulate the spice netlist. For example, the method
set_parameters() will set or update existing parameters defined in the netlist. The method set_component_value() is
used to update existing component values or models.

---------------
Multiprocessing
---------------

For making better use of today's computer capabilities, the SimRunner spawns several simulation processes
each executing in parallel a simulation.

By default, the number of parallel simulations is 4, however the user can override this in two ways. Either
using the class constructor argument ``parallel_sims`` or by forcing the allocation of more processes in the
run() call by setting ``wait_resource=False``. ::

    `runner.run(wait_resource=False)`

The recommended way is to set the parameter ``parallel_sims`` in the class constructor. ::

    `runner = SimRunner(simulator=LTspice, parallel_sims=8)`

The user then can launch a simulation with the updates done to the netlist by calling the run() method. Since the
processes are not executed right away, but rather just scheduled for simulation, the wait_completion() function is
needed if the user wants to execute code only after the completion of all scheduled simulations.

The usage of wait_completion() is optional. Just note that the script will only end when all the scheduled tasks are
executed.

---------
Callbacks
---------

As seen above, the `wait_completion()` can be used to wait for all the simulations to be finished. However, this is
not efficient from a multiprocessor point of view. Ideally, the post-processing should be also handled while other
simulations are still running. For this purpose, the user can use a function call back.

The callback function is called when the simulation has finished directly by the thread that has handling the
simulation. A function callback receives two arguments.
The RAW file and the LOG file names. Below is an example of a callback function::

    def processing_data(raw_filename, log_filename):
        '''This is a call back function that just prints the filenames'''
        print("Simulation Raw file is %s. The log is %s" % (raw_filename, log_filename)
        # Other code below either using ltsteps.py or raw_read.py
        log_info = LTSpiceLogReader(log_filename)
        rise, measures = log_info.dataset["rise_time"]

The callback function is optional. If  no callback function is given, the thread is terminated just after the
simulation is finished.
"""
__author__ = "Nuno Canto Brum <nuno.brum@gmail.com>"
__copyright__ = "Copyright 2020, Fribourg Switzerland"

__all__ = ['SimRunner', 'SimRunnerTimeoutError', 'AnyRunner', 'ProcessCallback', 'RunTask', 'CallbackType', 'CallbackArgsType']

import inspect  # Library used to get the arguments of the callback function
import logging
import shutil
import time
from pathlib import Path
from time import sleep, thread_time as clock
from typing import Protocol, Any, TypeAlias

from collections.abc import Callable, Iterator

from .process_callback import ProcessCallback, CallbackType, CallbackArgsType
from ..sim.run_task import RunTask
from ..sim.simulator import Simulator
from ..editor.base_editor import BaseEditor
from ..raw.raw_read import RawRead
from ..raw.raw_write import RawWrite
from ..editor.updates import Update


_logger = logging.getLogger("spicelib.SimRunner")
END_LINE_TERM = '\n'


class SimRunnerTimeoutError(TimeoutError):
    """Timeout Error class"""
    ...


IteratorFilterType: TypeAlias = Callable[[RunTask], bool] | dict | None
"""This is the type used for filtering RunTasks. See the TaskIterator.conditions parameter documentation."""


class TaskIterator:
    """SimRunner Helper class to iterate tasks. It returns all completed tasks, and if the wait parameter is True,
    it will wait for all the tasks to complete. The conditions parameter can be used to filter tasks that respect a
    given condition. The conditions are specified by the user by a function that receives the task object and
    returns a True equivalent for the task to be returned or a False equivalent when the task is to be rejected.
    In most cases, the user can use the functions task.value(ref) and task.param(name) to compose the conditions
    The return_function parameter specifies what the iterator is returning.

    :param runner: SimRunner class to process
    :type runner: SimRunner
    :param return_function: a callable that receives a RunTask and should return something to the user
    :type return_function: function(task: RunTask) -> Any
    :param conditions: Filter to be used in the iterator. If not given it returns all finished tasks.
        If given, this filter can take two forms:

            * the form of a function that receives a RunTask and returns True or False whether the
            task is included or not. Example condition=lambda x: x.edits[

            * the form of a dictionary where keys are the names of components updated and keys are their
            values. The values can either be a single value, a list, a set or a tuple of values. This means,
            all these possibilities are valid: {'R1': '1k', 'R2':('1k','2k'), 'R3':{'1k', '2k'}, R4:['1k', '2k']}

    :type conditions: None or dict or function(task: RunTask) -> bool
    :param wait: If True, the iterator will wait for the tasks to complete, if False, the iterator only considers
        already completed tasks.
    """

    def __init__(self, runner: "SimRunner", return_function: Callable[[RunTask], Any], wait: bool,
                 conditions: IteratorFilterType = None):
        self.runner = runner
        self.return_function = return_function if return_function is not None else lambda _: True
        self.conditions = conditions
        self.wait = wait
        self._iterator_counter = 0

    def match_conditions(self, runtask: RunTask) -> bool:
        if self.conditions is None:
            return True  # No filter was set
        if not runtask.edits:
            return False   # There are no edits, so a match is not possible
        if isinstance(self.conditions, dict):
            for name, value in self.conditions.items():
                # all conditions must be met. Any failure to match will result in a rejection
                for update in runtask.edits:
                    if isinstance(update, Update):
                        if name == update.name and (
                            value == update.value or (isinstance(value, (list, tuple, set)) and update.value in value)
                        ):
                            break  # force the next iteration of `for name, value in self.conditions.items():`, making it an AND comparison
                    else:
                        # this means: update is a list, but that can never happen, as we do for..in and not a slice
                        # anyway, this construction keeps lint happy
                        continue  # or we should raise a programmer error exception
                else:
                    return False  # No match found.
            return True  # All items on the dictionary were found
        else:
            # if `conditions` is not a dict, then it is a function that will return True or False
            return self.conditions(runtask)

    def __iter__(self):
        self._iterator_counter = 0
        return self

    def __next__(self):
        while True:
            self.runner.update_completed()  # Updates the active_tasks and completed_tasks lists
            # First go through the completed tasks
            if self._iterator_counter < len(self.runner.completed_tasks):
                task: RunTask = self.runner.completed_tasks[self._iterator_counter]
                self._iterator_counter += 1
                if self.match_conditions(task):
                    if task.retcode == 0:
                        return self.return_function(task)
                    else:
                        _logger.error(f"Skipping {task.runno} because simulation failed.")
            else:
                # Then check if there are any active tasks
                if len(self.runner.active_tasks) == 0 or self.wait is False:
                    raise StopIteration

                # Then go through the active tasks to get the maximum timeout
                stop_time = self.runner._maximum_stop_time()

                if stop_time is not None and time.time() > stop_time:  # All tasks are on timeout condition
                    raise SimRunnerTimeoutError(f"Exceeded {self.runner.timeout} seconds waiting for tasks to finish")

                # Wait for the active tasks to finish with a timeout
                sleep(0.2)  # Go asleep for a while


class AnyRunner(Protocol):
    def run(self, netlist: str | Path | BaseEditor, *,
            wait_resource: bool = True,
            callback: CallbackType = None,
            callback_args: CallbackArgsType = None,
            switches: list | None = None,
            timeout: float | None = None,
            run_filename: str | None = None,
            callback_on_error: bool = False,
            exe_log: bool = False) -> RunTask | None:
        ...

    def wait_completion(self, timeout: float | None = None, abort_all_on_timeout: bool = False) -> bool:
        ...

    @property
    def runno(self) -> int:
        """number of total runs"""
        ...

    @property
    def failSim(self) -> int:
        """number of failed simulations"""
        ...

    @property
    def okSim(self) -> int:
        """number of successful completed simulations"""
        ...


[docs] class SimRunner(AnyRunner): """ The SimRunner class implements all the methods required for launching batches of Spice simulations. It is iterable, but with a catch: The iteration will only return the completed tasks (succeeded or not), in the order they were completed. If all completed tasks have been returned, and there are still running tasks, it will wait for the completion of the next task. If you used no callbacks, the result is a tuple with the raw and log file names. If you used callbacks, it will return the return code of the callback function, or None if there was an error. Also see `sim_info()` for more details on the completed tasks. :param simulator: Forcing a given simulator executable. :type simulator: Simulator, optional :param parallel_sims: Defines the number of parallel simulations that can be executed at the same time. Ideally this number should be aligned to the number of CPUs (processor cores) available on the machine. :type parallel_sims: int, optional :param timeout: Timeout parameter as specified on the OS subprocess.run() function. Default is 600 seconds, i.e. 10 minutes. For no timeout, set to None. :type timeout: float, optional :param verbose: If True, it enables a richer printout of the program execution. :type verbose: bool, optional :param output_folder: specifying which directory shall be used for simulation files (raw and log files). :type output_folder: str, optional :param cwd: The current working directory to run the command in. If None, no change will be done of the working directory. :type cwd: str or pathlib.Path, optional :raises FileNotFoundError: When the file is not found. !This will be changed. """ def __init__(self, *, simulator=None, parallel_sims: int = 4, timeout: float = 600.0, verbose=False, output_folder: str | Path | None = None, cwd: str | Path | None = None): # The '*' in the parameter list forces the user to use named parameters for the rest of the parameters. # This is a good practice to avoid confusion. self.verbose = verbose self.timeout = timeout self.cmdline_switches = [] if output_folder is not None: # If not None converts to Path() object if not isinstance(output_folder, Path): self.output_folder = Path(output_folder) else: self.output_folder = output_folder if not self.output_folder.exists(): self.output_folder.mkdir() else: self.output_folder = None if cwd is not None: # If not None converts to Path() object if not isinstance(cwd, Path): self.cwd = Path(cwd) else: self.cwd = cwd if not self.cwd.exists(): self.cwd.mkdir() else: self.cwd = None self.parallel_sims = parallel_sims self.active_tasks: list[RunTask] = [] self.completed_tasks: list[RunTask] = [] self._runno = 0 # number of total runs self._failSim = 0 # number of failed simulations self._okSim = 0 # number of successful completed simulations # self.failParam = [] # collects for later user investigation of failed parameter sets # Gets a simulator. if simulator is None: raise ValueError("No default simulator defined, please specify a simulator") elif issubclass(simulator, Simulator): self.simulator = simulator else: raise TypeError("Invalid simulator type.") _logger.info("SimRunner initialized") def __del__(self): """Class Destructor : Closes Everything""" # _logger.debug("Waiting for all spawned sim_tasks to finish.") self.wait_completion(abort_all_on_timeout=True) # Kill all pending simulations # _logger.debug("Exiting SimRunner") @property def runno(self) -> int: return self._runno @property def failSim(self) -> int: return self._failSim @property def okSim(self) -> int: return self._okSim
[docs] def sim_info(self) -> dict: """ Returns a dictionary with detailed information of all completed tasks. It is best to be called after the completion of all tasks. The dictionary keys are the run numbers. The values are: * netlist_file: Path to the netlist file * raw_file: Path to the raw file * log_file: Path to the log file * retcode: Return code of the simulator. -2 means an exception was raised, -1 means the simulation is undefined. * exception_text: Exception information in case of an exception during simulation. None if no exception was raised. * callback_return: Return value of the callback function. None if no callback was used. * start_time: Start time of the simulation * stop_time: Stop time of the simulation Example: ```{ 1: {'netlist_file': 'circuit1.net', 'raw_file': 'circuit1.raw', 'log_file': 'circuit1.log'```, etc.... :return: Dictionary with detailed information of all completed tasks. """ rv = {} for task in self.completed_tasks: task: RunTask run_no = task.runno v = {'netlist_file': task.netlist_file, 'raw_file': task.raw_file, 'log_file': task.log_file, 'retcode': task.retcode, 'exception_text': task.exception_text, 'callback_return': task.callback_return, 'start_time': task.start_time, 'stop_time': task.stop_time} if task.edits: v['edits'] = task.edits.netlist_updates rv[run_no] = v return rv
[docs] def set_simulator(self, spice_tool: type[Simulator]) -> None: """ Manually overriding the simulator to be used. :param spice_tool: String containing the path to the spice tool to be used, or alternatively the Simulator object. :return: Nothing """ if issubclass(spice_tool, Simulator): self.simulator = spice_tool else: raise TypeError("Expecting str or Simulator objects")
[docs] def clear_command_line_switches(self): """Clear all the command line switches added previously""" self.cmdline_switches.clear()
[docs] def add_command_line_switch(self, switch, path=''): """ Used to add an extra command line argument such as '-I<path>' to add symbol search path or '-FastAccess' to convert the raw file into Fast Access. The argument is a string as is defined in the command line documentation of the used simulator. It is preferred that you use the Simulator's class `valid_switch()` method for validation of the switch. :param switch: switch to be added. See Command Line Switches documentation of the used simulator. :type switch: str :param path: path to the file related to the switch being given. :type path: str, optional :returns: Nothing """ self.cmdline_switches.append(switch) if path is not None: self.cmdline_switches.append(path)
def _on_output_folder(self, afile): if self.output_folder: return self.output_folder / Path(afile).name else: return Path(afile) def _to_output_folder(self, afile: Path, *, copy: bool, new_name: str = ''): if self.output_folder: if new_name: ddst = self.output_folder / new_name else: ddst = self.output_folder if copy: dest = shutil.copy(afile, ddst) else: dest = shutil.move(afile, ddst) return Path(dest) else: if new_name: dest = shutil.copy(afile, afile.parent / new_name) return Path(dest) else: return afile def _run_file_name(self, netlist): if not isinstance(netlist, Path): netlist = Path(netlist) if netlist.suffix == '.qsch': # The Qsch files can't be simulated, so, they have to be converted to netlist first. netlist = netlist.with_suffix('.net') return "%s_%i%s" % (netlist.stem, self._runno, netlist.suffix) def _prepare_sim(self, netlist: str | Path | BaseEditor, run_filename: str | None): """Internal function""" # update number of simulation self._runno += 1 # Incrementing internal simulation number # Harmonize the netlist into a Path object pointing to a netlist file on the right output folder if isinstance(netlist, BaseEditor): if run_filename is None: run_filename = self._run_file_name(netlist.circuit_file) # Calculates the path where to store the new netlist. run_netlist_file = self._on_output_folder(run_filename) netlist.save_netlist(run_netlist_file) elif isinstance(netlist, (Path, str)): if run_filename is None: run_filename = self._run_file_name(netlist) if isinstance(netlist, str): netlist = Path(netlist) run_netlist_file = self._to_output_folder(netlist, copy=True, new_name=run_filename) else: raise TypeError("'netlist' parameter shall be a SpiceEditor, Path or a plain str") return run_netlist_file
[docs] @staticmethod def validate_callback_args(callback: CallbackType, callback_args: CallbackArgsType) -> dict | None: """ It validates that the callback_args are matching the callback function. Note that the first two parameters of the callback functions need to be the raw and log files. """ if callback is None: return None # No callback function, hence callback_args have no effect if inspect.isclass(callback) and issubclass(callback, ProcessCallback): args = inspect.signature(callback.callback).parameters else: args = inspect.signature(callback).parameters if len(args) < 2: raise ValueError("Callback function must have at least two arguments") if len(args) > 2: if callback_args is None: raise ValueError("Callback function has more than two arguments, but no callback_args are given") if isinstance(callback_args, dict): for pos, param in enumerate(args): if pos > 1: if param not in callback_args: raise ValueError("Callback argument '%s' not found in callback_args" % param) if len(args) - 2 != len(callback_args): raise ValueError("Callback function has %d arguments, but %d callback_args are given" % (len(args), len(callback_args)) ) if isinstance(callback_args, tuple): # Convert into a dictionary return {param: callback_args[pos - 2] for pos, param in enumerate(args) if pos > 1} else: return callback_args
[docs] def run(self, netlist: str | Path | BaseEditor, *, wait_resource: bool = True, callback: CallbackType = None, callback_args: CallbackArgsType = None, switches: list | None = None, timeout: float | None = None, run_filename: str | None = None, callback_on_error: bool = False, exe_log: bool = False) -> RunTask | None: """ Executes a simulation run with the conditions set by the user. Conditions are set by the set_parameter, set_component_value or add_instruction functions. :param netlist: The name of the netlist can be optionally overridden if the user wants to have a better control of how the simulations files are generated. :param wait_resource: Setting this parameter to False will force the simulation to start immediately, irrespective of the number of simulations already active. By default, the SimRunner class uses only four processors. This number can be overridden by setting the parameter ´parallel_sims´ to a different number. If there are more than ´parallel_sims´ simulations being done, the new one will be placed on hold till one of the other simulations are finished. :param callback: The user can optionally give a callback function for when the simulation finishes so that processing can be done immediately. The callback can either be a function or a class derived from ProcessCallback. A callback function must receive two at least input parameters that correspond the raw and log files created by the simulation. These need to be the first two parameters of the callback function. The other parameters are passed as a dictionary or a tuple in the callback_args parameter. If the callback is a class derived from ProcessCallback, then the callback is executed in a separate process. The callback function must be defined in the callback() method of the class. As for the callback function, the first two parameters are the raw and log files. The other parameters are passed as dictionary in the callback_args parameter. :param callback_args: The callback function arguments. This parameter is passed as keyword arguments to the callback function. :param switches: Command line switches override :param timeout: Timeout to be used in waiting for resources. Default time is value defined in this class constructor. :param run_filename: Name to be used for the log and raw file. :param callback_on_error: If False (default), the callback function is not called if the simulation fails. If True, the callback function is called even if the simulation fails. Know that in that case it is not guaranteed that the raw and log files will be available. :param exe_log: If True, the simulator's execution console messages will be written to a log file (named ...exe.log) instead of console. This is especially useful when running under wine or when running simultaneous tasks. :returns: The task object of type RunTask. For internal use only. """ callback_kwargs = self.validate_callback_args(callback, callback_args) if switches is None: switches = [] run_netlist_file = self._prepare_sim(netlist, run_filename) if timeout is None: timeout = self.timeout t0 = clock() # Store the time for timeout calculation while clock() - t0 < timeout + 1: # Give one second slack in relation to the task timeout cmdline_switches = switches or self.cmdline_switches # If switches are passed, they override the ones # inside the class. if (wait_resource is False) or (self.active_threads() < self.parallel_sims): t = RunTask( simulator=self.simulator, runno=self._runno, netlist_file=run_netlist_file, callback=callback, callback_args=callback_kwargs, switches=cmdline_switches, timeout=timeout, verbose=self.verbose, cwd=self.cwd, callback_on_error=callback_on_error, exe_log=exe_log ) if isinstance(netlist, BaseEditor) and len(netlist.netlist_updates) > 0: t.edits = netlist.netlist_updates # Copy is made in this assignment self.active_tasks.append(t) t.start() sleep(0.01) # Give slack for the thread to start return t # Returns the task object sleep(0.1) # Give Time for other simulations to end else: _logger.error("Timeout waiting for resources for simulation %d" % self._runno) if self.verbose: _logger.warning("Timeout on launching simulation %d." % self._runno) return None
[docs] def run_now(self, netlist: str | Path | BaseEditor, *, switches=None, run_filename: str | None = None, timeout: float | None = None, exe_log: bool = False) -> tuple[Path | None, Path | None]: """ Executes a simulation run with the conditions set by the user. Conditions are set by the `set_parameter`, `set_component_value` or `add_instruction functions`. :param netlist: The name of the netlist can be optionally overridden if the user wants to have a better control of how the simulations files are generated. :param switches: Command line switches override :type switches: list :param run_filename: Name to be used for the log and raw file. :param timeout: Timeout to be used in waiting for resources. Default time is value defined in this class constructor. :param exe_log: If True, the simulator's execution console messages will be written to a log file (named ...exe.log) instead of console. This is especially useful when running under wine or when running simultaneous tasks. :returns: the raw and log filenames """ if switches is None: switches = [] run_netlist_file = self._prepare_sim(netlist, run_filename) cmdline_switches = switches or self.cmdline_switches # If switches are passed, they override the ones inside # the class. if timeout is None: timeout = self.timeout def dummy_callback(raw, log): """Dummy call back that does nothing""" return None t = RunTask( simulator=self.simulator, runno=self._runno, netlist_file=run_netlist_file, callback=dummy_callback, callback_args=None, switches=cmdline_switches, timeout=timeout, verbose=self.verbose, cwd=self.cwd, callback_on_error=False, exe_log=exe_log ) if isinstance(netlist, BaseEditor) and len(netlist.netlist_updates) > 0: t.edits = netlist.netlist_updates # Copy is made in this assignment t.start() sleep(0.01) # Give slack for the thread to start t.join(timeout + 1) # Give one second slack in relation to the task timeout self.completed_tasks.append(t) if t.retcode == 0: self._okSim += 1 else: # simulation failed self._failSim += 1 return t.raw_file, t.log_file # Returns the raw and log file
[docs] def active_threads(self): """Returns the number of active simulation runs""" self.update_completed() return len(self.active_tasks)
def update_completed(self): """ This function updates the `active_tasks` and `completed_tasks` lists. It moves the finished task from the `active_tasks` list to the `completed_tasks` list. It should be called periodically to update the status of the simulations. :returns: Nothing :meta private: """ i = 0 while i < len(self.active_tasks): if self.active_tasks[i].is_alive() or self.active_tasks[i].start_time is None: # running or not yet started i += 1 else: if self.active_tasks[i].retcode == 0: self._okSim += 1 else: # simulation failed self._failSim += 1 task = self.active_tasks.pop(i) self.completed_tasks.append(task)
[docs] def kill_all_ltspice(self): """ .. deprecated:: 1.0 Use `kill_all_spice()` instead. This is only here for compatibility with previous code. Function to terminate LTSpice""" self.kill_all_spice()
[docs] def kill_all_spice(self): """Function to terminate xxSpice processes""" simulator = Simulator process_name = simulator.process_name import psutil for proc in psutil.process_iter(): # check whether the process name matches if proc.name() == process_name: _logger.info("killing Spice", proc.pid) proc.kill()
def _maximum_stop_time(self): """ This function will return the maximum timeout time of all active tasks. :return: Maximum timeout time or None, if there is no timeout defined. :rtype: float or None """ alarm = None for task in self.active_tasks: tout = task.timeout if task.timeout is not None else self.timeout if tout is not None and task.start_time is not None: stop = task.start_time + tout if alarm is None: alarm = stop elif stop > alarm: alarm = stop return alarm
[docs] def wait_completion(self, timeout: float | None = None, abort_all_on_timeout: bool = False) -> bool: """ This function will wait for the execution of all scheduled simulations to complete. :param timeout: Cancels the wait after the number of seconds specified by the timeout. This timeout is reset everytime that a simulation is completed. The difference between this timeout and the one defined in the SimRunner instance, is that the latter is implemented by the subprocess class, and this one just cancels the wait. :param abort_all_on_timeout: attempts to stop all LTSpice processes if timeout is expired. :returns: True if all simulations were executed successfully """ self.update_completed() if timeout is not None: stop_time = time.time() + timeout else: stop_time = None while len(self.active_tasks) > 0: sleep(1) self.update_completed() if timeout is None: stop_time = self._maximum_stop_time() if stop_time is not None: # This can happen if timeout was set as none everywhere if time.time() > stop_time: if abort_all_on_timeout: self.kill_all_spice() return False return self._failSim == 0
@staticmethod def _del_file_if_exists(workfile: Path | None): """ Deletes a file if it exists. :param workfile: File to be deleted :return: Nothing """ if workfile is not None and workfile.exists(): _logger.info("Deleting..." + workfile.name) workfile.unlink() @staticmethod def _del_file_ext_if_exists(workfile: Path, ext: str): """ Deletes a file extension if it exists. :param workfile: File to be deleted :param ext: Extension to be deleted :return: Nothing """ sim_file = workfile.with_suffix(ext) SimRunner._del_file_if_exists(sim_file)
[docs] def cleanup_files(self): """ Will delete all log and raw files that were created by the script. This should only be executed at the end of data processing. """ self.update_completed() # Updates the active_tasks and completed_tasks lists for task in self.completed_tasks: netlistfile = task.netlist_file self._del_file_if_exists(netlistfile) # Delete the netlist file if still exists self._del_file_if_exists(task.log_file) # Delete the log file if was created self._del_file_if_exists(netlistfile.with_suffix('.exe.log')) # Delete the log file if was created self._del_file_if_exists(task.raw_file) # Delete the raw file if was created if netlistfile.suffix == '.net' or netlistfile.suffix == '.asc': # Delete the files that have been potentially created by LTSpice for ext in ('.log.raw', '.op.raw', '.db'): self._del_file_ext_if_exists(netlistfile, ext) if netlistfile.suffix == '.asc': # If simulated from an asc file, delete the .net file self._del_file_ext_if_exists(netlistfile, '.net')
[docs] def file_cleanup(self): """ .. deprecated:: 1.0 Use `cleanup_files()` instead. """ self.cleanup_files() # Alias for backward compatibility, this will be deleted in the future
# ############ Iterator methods # def __len__(self): # return len(self.completed_tasks) def __iter__(self): """Legacy Iterator, returns the get_results() from the task""" return TaskIterator(self, lambda x: x.get_results(), True, None)
[docs] def tasks(self, conditions: IteratorFilterType = None) -> Iterator[RunTask]: """ Returns an iterator which iterates all completed tasks :param conditions: Filter to be used in the iterator. See TaskIterator conditions parameter documentation. :return: Iterator[RunTask] """ return TaskIterator(self, lambda x: x, True, conditions)
[docs] def create_raw_file_with(self, raw_filename: Path | str, save: list[str], conditions: IteratorFilterType) -> bool: """ Creates a new raw_file, with traces belonging to different runs. The type of the raw file is the same as the first raw file that is matching the conditions. See filter_completed_tasks() method. :param raw_filename: The new RAW filename :param save: A list with traces that are going to be saved in the new raw file :param conditions: A filter as specified on the TaskIterator class :return: True if the raw file was created successfully and includes ALL runs, False otherwise """ retval = True # Obtain a first task first_task: RunTask = next(self.tasks(conditions)) # Initialize a raw file based on the contents of the first raw if not first_task.raw_file: _logger.error(f"Raw file {first_task.raw_file} does not exist. Cannot create new raw file.") return False template = RawRead(first_task.raw_file) new_raw = RawWrite( template.raw_params['Title'], fastacces=True, numtype='auto', encoding='utf_16_le' ) # Go through the tasks that match the conditions given for run_task in self.tasks(conditions): # Open the raw file if not run_task.raw_file: _logger.error(f"Raw file {run_task.raw_file} does not exist. Skipping this task.") retval = False continue source_raw = RawRead(run_task.raw_file, traces_to_read=save) new_raw.add_traces_from_raw(source_raw, save, force_axis_alignment=True, add_tag=run_task.runno) new_raw.save(raw_filename) return retval
[docs] def export_sim_log(self, logfile: Path | str): """Exports the simulation log to a file. :param logfile: The path to the log file """ import pprint logfile = self._on_output_folder(logfile) with open(logfile, 'w') as log_file: pprint.pprint(self.sim_info(), log_file)