""" provides sort-of generic parallel framework For debugging: import multistar.parallel multistar.parallel.PARALLELMODE = 'serial' IMPORTANT: This needs to be done before any modules that depend on the parallel features are imported as it us used to derive classes. You may also set the environment variable export MULTISTAR_PARALLELMODE=serial to ensure it is done first. """ from time import time from os import nice, getenv from itertools import product from re import compile, Pattern from pickle import load, dump from io import IOBase from uuid import uuid1 from multiprocessing import Process, cpu_count, set_start_method, \ Manager, ProcessError from pathlib import Path from types import FunctionType from copy import copy from queue import Empty from numpy import iterable from utils import cpickle, cload from human import time2human STARTMETHOD = 'spawn' PARALLELMODE = 'parallel' # 'parallel', 'pool', 'serial' def save_model(data, task, filename, path): # data = abu_format(data) if isinstance(filename, FunctionType): filename = filename(**data) elif isinstance(filename, str): filename = filename.format(**data) elif filename is Ellipsis: filename = '_'.join(f'{k}={v:<5}'.replace(' ','') for k,v in data.items()) +'.pickle.xz' if hasattr(task, 'save'): task.save(filename, path) else: if filename is None: filename = uuid1().hex + '.pickle.xz' if path is not None: filename = Path(path) / filename filename = Path(filename).expanduser() cpickle(task, filename) class ParallelProcess(Process): def __init__(self, qi, qo, nice=19, task=None, return_data=True): super().__init__() self.qi = qi self.qo = qo self.nice = nice self.task = task self.return_data = return_data def run(self): nice(self.nice) while True: data = self.qi.get() if data is None: self.qi.task_done() try: self.qo.close() except AttributeError: pass break path = data.pop('path', None) filename = data.pop('filename', None) return_data = data.pop('return_data', self.return_data) result = data.pop('task', self.task)(**data) if filename is not None or path is not None: save_model(data, result, filename, path) elif self.qo is not None: if return_data: self.qo.put((data, result)) else: self.qo.put(result) self.qi.task_done() class PoolFunction(object): def __init__(self, task=None, return_data=True): self.task = task self.return_data = return_data def __call__(self, data): path = data.pop('path', None) filename = data.pop('filename', None) return_data = data.pop('return_data', self.return_data) result = data.pop('task', self.task)(**data) if filename is not None or path is not None: save_model(data, result, filename, path) if return_data: return data, result else: return result class Result(object): def __init__(self, data, result): self.result = result self.data = data def __getattr__(self, attr): if attr != 'data' and hasattr(self, 'data'): if attr in self.data: return self.data[attr] raise AttributeError() def __iter__(self): yield self.data yield self.result def __repr__(self): return f'{self.__class__.__name__}({repr(self.data).replace(" ","")} : {repr(self.result)})' def __lt__(self, other): for k,v in self.data.items(): vo = other.data[k] try: if v < vo: return True elif v > vo: return False except: v = str(v) vo = str(vo) if v < vo: return True elif v > vo: return False return False class Base(object): _pickle_exclude = ( compile(r'_plot_.*_(?:ax|fig)\d*'), ) _pickle_exclude_class = ( 'NucPlot', 'AxesSubplot', 'Figure', ) def __getstate__(self): store = dict() for k,v in self.__dict__.items(): exclude = False for r in self._pickle_exclude: if isinstance(r, str): if k == r: exclude = True break elif r.fullmatch(k) is not None: exclude = True break for r in self._pickle_exclude_class: n = type(v).__name__ if isinstance(r, str): if n == r: exclude = True break elif isinstance(r, Pattern): if r.fullmatch(n) is not None: exclude = True break elif isinstance(r, type): if isinstance(v, r): exclude = True break else: raise AttributeError(f'unknown exclude {r}') if not exclude: store[k] = v return store # these routines seem somewhat generic def save(self, filename = None, path = None): if isinstance(filename, IOBase): dump(self, filename) else: if filename is None: if not hasattr(self, 'uuid'): self.uuid = uuid1() filename = f'{self.__class__.__name__}_{self.uuid.hex}.pickle.xz' # filename = eval(f"f'{filename}'") if path is not None: filename = Path(path) / filename cpickle(self, filename) def __setstate__(self, state): self.__dict__.update(state) if not hasattr(self, 'version'): self.version = -1 else: print(f' [{self.__class__.__name__}] Version {self.version}') @classmethod def load(cls, filename): if isinstance(filename, IOBase): self = load(filename) else: self = cload(filename) if not isinstance(self, cls): print(f' [{self.__class__.__name__}] WARNING: "{filename}" may be wrong type.') # raise AttributeError(f'"{filename}" is wrong type.') return self class Results(Base): VERSION = 10000 def __init__(self, results=None): if results is None: results = list() if len(results) > 0 and not isinstance(results[0], Result): results = [Result(*r) for r in results] self.results = sorted(results) self.version = self.VERSION def add(self, result): self.results.append(result) self.results.sort() def __add__(self, other): # TODO - eliminate duplicates assert other.__class__.__name__ == self.__class__.__name__ results = self.results + other.results return self.__class__(results) def __call__(self, **kwargs): results = list() for r in self.results: ok = True for k,v in kwargs.items(): vr = r.data[k] try: if vr != v: ok = False break except: vr = str(vr) v = str(v) if vr != v: ok = False break if ok: results.append(r) return Results(results) def __iter__(self): for r in self.results: yield r def __getitem__(self, key): if isinstance(key, slice): return self.__class__(self.results[key]) return self.results[key] def to_list(self): return self.results.copy() def data(self): return [r.data for r in self.results] def result(self, rslice=None): if rslice is None: rslice = slice(None) return [r.result for r in self.results[rslice]] def __len__(self): return len(self.results) def __repr__(self): return ( f'{self.__class__.__name__}(\n' + ',\n'.join(repr(r) for r in self.results) + ')' ) def save_results(self, filename=None, path=None): """Bulk-save all completed results/tasks. If you provide a 'path' then output files will be written into that directory. 'filename' should be a string with format instructions so that variables in there can be replaced from the keywords. e.g., 'an={an:g}_en={en:g}_i={i:g}_q={q:g}.xz' If no filename is provided, the UUID will be used. """ for p,r in self.results: save_model(p, r, filename, path) class Processor(Base): VERSION = 10000 """ Interface for Parallel execution of runs """ def __init__(self, nparallel=None, task=None, startmethod=STARTMETHOD, **kwargs): """Provide patameters for runs you want to loop over as as iterable types (list, tuple, numpy array) to the corresponding Be sure that iterable items you do not want to iterate over inside, e.g., a 1-list, 1-tuples, 1-set. Be default, sting, and dictionary types should already be dealt with automatically. If you provide a 'path' then output files will be written into that directory. 'filename' should be a string with format instructions so that variables in there can be replaced from the keywords. e.g., filename='Q{Q:g}_Mdot{mdot:12.5e}_{abu}.xz' If no filename is provided, but only a 'path', the a UUID will be used. if neither filename nor path are provided, the result objects will be returned and storded on the 'results' variable. The Results object can then be iterated over these results. 'task' is an object (class or function) that executes the task when called and returns an object (as classes would naturally do) that has save function. if there is no save function, a UUID will be picked as filename. Examples -------- from multistar.parallel import ParallelProcessor as P p = P(ymax=[1e6,1e7],Q=[1,2,3],abu=(dict(he3=1),dict(h1=1))) p.save(filename='Q{Q:g}_Mdot={ymax:05g}_{ABU}.xz') """ self.tstart = time() try: set_start_method(startmethod) except RuntimeError: pass result = kwargs.pop('result', None) if result is None: result = Result results = kwargs.pop('results', None) if results is None: results = Results data = kwargs.pop('data', None) self.timeout = kwargs.pop('timeout', None) if data is None: # prepare inherently iterable types in kwargs for star map operation for k,v in kwargs.items(): if isinstance(v, (str, dict, Path, FunctionType, type)): kwargs[k] = (v,) # here we need code to add tasks as dictionaries to qi base = dict() data = list() values = list() keys = list() for k,v in kwargs.items(): if iterable(v): values.append(v) keys.append(k) else: base[k]=v for vec in product(*values): data.append(base | dict(zip(keys, vec))) self.data = data if nparallel is None: nparallel = cpu_count() print(f' [{self.__class__.__name__}] Running {nparallel:g} processes.') self.nparallel = nparallel self.task = task self.version = self.VERSION self.result = result self.results = results print(f' [{self.__class__.__name__}] Running {len(data):g} jobs.') def __iter__(self): for r in self.results: yield r def __add__(self, other): # TODO - eliminate duplicates assert other.__class__.__name__ == self.__class__.__name__ results = self.results + other.results new = copy(self) new.results = results return new class PoolProcessor(Processor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) manager = Manager() try: manager.start() except ProcessError: pass function = PoolFunction(task=self.task) results = list() with manager.Pool(self.nparallel) as pool: for r in pool.imap_unordered(function, self.data): results.append(self.result(*r)) manager.shutdown() self.results = self.results(results) print(f' [{self.__class__.__name__}] generated in {time2human(time() - self.tstart)}.') class ParallelProcessor(Processor): @staticmethod def _make_process(qi, qo, task): p = ParallelProcess(qi, qo, task=task) p.daemon = False p.start() return p def __init__(self, *args, save=None, **kwargs): super().__init__(*args, **kwargs) manager = Manager() try: manager.start() except ProcessError: pass qi = manager.JoinableQueue() qo = manager.JoinableQueue() self.nparallel = min(self.nparallel, len(self.data)) processes = list() for i in range(self.nparallel): processes.append(self._make_process(qi, qo, self.task)) for d in self.data: qi.put(d) for _ in range(len(processes)): qi.put(None) try: qi.close() except AttributeError: pass # we collect up results timeout = self.timeout if timeout is None: timeout = 2**30 starttime = time() ndata = len(self.data) results = list() fails = 0 for _ in range(len(self.data)): try: results.append(self.result(*qo.get(True, timeout))) except Empty: fails += 1 # presumably the taks died. # We start a new one in case all runs stopped. processes.append(self._make_process(qi, qo, self.task)) qo.task_done() now = time() runtime = now - starttime ndone = len(results) + fails frac = ndone / ndata eta = runtime / ndone * (ndata - ndone) print(f' [{self.__class__.__name__}] done {ndone:_d} / {ndata:_d} ({frac * 100:0.2f} %, {time2human(runtime)}) ETA: {time2human(eta)}.') qi.join() qo.join() manager.shutdown() self.results = self.results(results) if fails > 0: print(f' [{self.__class__.__name__}] there were {fails} Failtures.') print(f' [{self.__class__.__name__}] generated in {time2human(time() - self.tstart)}.') if save is not None: if save is True: save = f'{self.__class__.__name__}-{uuid1().hex}.xz' if isinstance(save, (str, Path)): self.save(save) class SerialProcessor(Processor): """ Debug plugin class """ def __init__(self, *args, save=None, **kwargs): nparallel = kwargs.pop('nparallel', None) if nparallel is not None and nparallel != 1: print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] Using serial mode, NOT {nparallel=}.') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') kwargs['nparallel'] = 1 super().__init__(*args, **kwargs) # we collect up results results = list() for d in self.data: r = self.task(**d) results.append(self.result(d, r)) print(f' [{self.__class__.__name__}] done {len(results)} / {len(self.data)}.') self.results = self.results(results) print(f' [{self.__class__.__name__}] generated in {time2human(time() - self.tstart)}.') if save is not None: if save is True: save = f'{self.__class__.__name__}-{uuid1().hex}.xz' if isinstance(save, (str, Path)): self.save(save) _processors = dict( parallel = ParallelProcessor, pool = PoolProcessor, serial = SerialProcessor, ) _parallel = getenv('MULTISTAR_PARALLELMODE') if _parallel in _processors: PARALLELMODE = _parallel print(f' [multistar.parallel] Using {PARALLELMODE=}') del _parallel def __getattr__(attr): if attr == 'TheProcessor': return _processors[PARALLELMODE] raise AttributeError(f'[{__name__}] "{attr} not found.')