Source code for SALib.util.problem

import warnings
import importlib
from types import MethodType

from multiprocess import Pool, cpu_count
from pathos.pp import ParallelPythonPool as pp_Pool
from functools import partial, wraps

import numpy as np

import SALib.sample as samplers
import SALib.analyze as analyzers

from SALib.util import avail_approaches
from SALib.util.results import ResultDict


ptqdm_available = True
try:
    from p_tqdm import p_imap
except ImportError:
    ptqdm_available = False

__all__ = ['ProblemSpec']


[docs]class ProblemSpec(dict): """Dictionary-like object representing an SALib Problem specification. """ def __init__(self, *args, **kwargs): super(ProblemSpec, self).__init__(*args, **kwargs) _check_spec_attributes(self) self._samples = None self._results = None self._analysis = None self['num_vars'] = len(self['names']) self._add_samplers() self._add_analyzers() @property def samples(self): return self._samples @samples.setter def samples(self, vals): cols = vals.shape[1] if cols != self['num_vars']: msg = "Mismatched sample size: Expected " msg += "{} cols, got {}".format(self['num_vars'], cols) raise ValueError(msg) self._samples = vals # Clear results to avoid confusion self._results = None @property def results(self): return self._results @results.setter def results(self, vals): val_shape = vals.shape if len(val_shape) == 1: cols = 1 else: cols = vals.shape[1] out_cols = self.get('outputs', None) if out_cols is None: if cols == 1: self['outputs'] = ['Y'] else: self['outputs'] = [f'Y{i}' for i in range(1, cols+1)] else: if cols != len(self['outputs']): msg = "Mismatched sample size: Expected " msg += "{} cols, got {}".format(self['outputs'], cols) raise ValueError(msg) self._results = vals @property def analysis(self): return self._analysis
[docs] def sample(self, func, *args, **kwargs): """Create sample using given function. Parameters ---------- func : function, Sampling method to use. The given function must accept the SALib problem specification as the first parameter and return a numpy array. *args : list, Additional arguments to be passed to `func` **kwargs : dict, Additional keyword arguments passed to `func` Returns ---------- self : ProblemSpec object """ # Clear model output and analysis results to avoid confusion # especially if samples are forcibly changed... self._analysis = None self._results = None self._samples = func(self, *args, **kwargs) return self
[docs] def set_samples(self, samples): """Set previous samples used.""" self.samples = samples return self
[docs] def set_results(self, results): """Set previously available model results.""" self.results = results # if self.samples is not None: # warnings.warn('Existing samples found - make sure these results are for those samples!') return self
[docs] def evaluate(self, func, *args, **kwargs): """Evaluate a given model. Parameters ---------- func : function, Model, or function that wraps a model, to be run/evaluated. The provided function is required to accept a numpy array of inputs as its first parameter and must return a numpy array of results. *args : list, Additional arguments to be passed to `func` **kwargs : dict, Additional keyword arguments passed to `func` Returns ---------- self : ProblemSpec object """ self._results = func(self._samples, *args, **kwargs) return self
[docs] def evaluate_parallel(self, func, *args, nprocs=None, **kwargs): """Evaluate model locally in parallel. All detected processors will be used if `nprocs` is not specified. Parameters ---------- func : function, Model, or function that wraps a model, to be run in parallel. The provided function needs to accept a numpy array of inputs as its first parameter and must return a numpy array of results. nprocs : int, Number of processors to use. Uses all available if not specified. *args : list, Additional arguments to be passed to `func` **kwargs : dict, Additional keyword arguments passed to `func` Returns ---------- self : ProblemSpec object """ warnings.warn("This is an experimental feature and may not work.") if self._samples is None: raise RuntimeError("Sampling not yet conducted") if nprocs is None: nprocs = cpu_count() # Create wrapped partial function to allow passing of additional args tmp_f = self._wrap_func(func, *args, **kwargs) # Split into even chunks chunks = np.array_split(self._samples, int(nprocs), axis=0) if ptqdm_available: # Display progress bar if available res = p_imap(tmp_f, chunks, num_cpus=nprocs) else: with Pool(nprocs) as pool: res = list(pool.imap(tmp_f, chunks)) self._results = self._collect_results(res) return self
[docs] def evaluate_distributed(self, func, *args, nprocs=1, servers=None, verbose=False, **kwargs): """Distribute model evaluation across a cluster. Usage Conditions: * The provided function needs to accept a numpy array of inputs as its first parameter * The provided function must return a numpy array of results Parameters ---------- func : function, Model, or function that wraps a model, to be run in parallel nprocs : int, Number of processors to use for each node. Defaults to 1. servers : list[str] or None, IP addresses or alias for each server/node to use. verbose : bool, Display job execution statistics. Defaults to False. *args : list, Additional arguments to be passed to `func` **kwargs : dict, Additional keyword arguments passed to `func` Returns ---------- self : ProblemSpec object """ if verbose: from pathos.parallel import stats warnings.warn("This is an untested experimental feature and may not work.") workers = pp_Pool(nprocs, servers=servers) # Split into even chunks chunks = np.array_split(self._samples, int(nprocs)*len(servers), axis=0) tmp_f = self._wrap_func(func) res = list(workers.map(tmp_f, chunks)) self._results = self._collect_results(res) if verbose: print(stats(), '\n') workers.clear() return self
[docs] def analyze(self, func, *args, **kwargs): """Analyze sampled results using given function. Parameters ---------- func : function, Analysis method to use. The provided function must accept the problem specification as the first parameter, X values if needed, Y values, and return a numpy array. *args : list, Additional arguments to be passed to `func` **kwargs : dict, Additional keyword arguments passed to `func` Returns ---------- self : ProblemSpec object """ if self._results is None: raise RuntimeError("Model not yet evaluated") if 'X' in func.__code__.co_varnames: # enforce passing of X if expected func = partial(func, *args, X=self._samples, **kwargs) out_cols = self.get('outputs', None) if out_cols is None: if len(self._results.shape) == 1: self['outputs'] = ['Y'] else: num_cols = self._results.shape[1] self['outputs'] = [f'Y{i}' for i in range(1, num_cols+1)] if len(self['outputs']) > 1: self._analysis = {} for i, out in enumerate(self['outputs']): self._analysis[out] = func(self, *args, Y=self._results[:, i], **kwargs) else: self._analysis = func(self, *args, Y=self._results, **kwargs) return self
[docs] def to_df(self): """Convert results to Pandas DataFrame.""" an_res = self._analysis if isinstance(an_res, ResultDict): return an_res.to_df() elif isinstance(an_res, dict): # case where analysis result is a dict of ResultDicts return [an.to_df() for an in list(an_res.values())] raise RuntimeError("Analysis not yet conducted")
[docs] def plot(self): """Plot results. Returns ------- axes : matplotlib axes object """ if self._analysis is None: raise RuntimeError("Analysis not yet conducted") num_rows = len(self['outputs']) if num_rows == 1: return self._analysis.plot() try: plt except: import matplotlib.pyplot as plt num_cols = 1 fk = list(self._analysis.keys())[0] if isinstance(self._analysis[fk].to_df(), (list, tuple)): # have to divide by 2 to account for CI columns num_cols = len(self._analysis[fk]) // 2 p_width = max(num_cols*3, 5) p_height = max(num_rows*3, 6) _, axes = plt.subplots(num_rows, num_cols, sharey=True, figsize=(p_width, p_height)) for res, ax in zip(self._analysis, axes): self._analysis[res].plot(ax=ax) try: ax[0].set_title(res) except TypeError: ax.set_title(res) plt.tight_layout() return axes
def _wrap_func(self, func, *args, **kwargs): # Create wrapped partial function to allow passing of additional args tmp_f = func if (len(args) > 0) or (len(kwargs) > 0): tmp_f = partial(func, *args, **kwargs) return tmp_f def _setup_result_array(self): if len(self['outputs']) > 1: res_shape = (len(self._samples), len(self['outputs'])) else: res_shape = len(self._samples) return np.empty(res_shape) def _collect_results(self, res): final_res = self._setup_result_array() # Collect results # Cannot enumerate over this as the length # of individual results may vary i = 0 for r in res: r_len = len(r) final_res[i:i+r_len] = r i += r_len return final_res def _method_creator(self, func, method): @wraps(func) def modfunc(self, *args, **kwargs): return getattr(self, method)(func, *args, **kwargs) return modfunc def _add_samplers(self): """Dynamically add available SALib samplers as ProblemSpec methods.""" for sampler in avail_approaches(samplers): func = getattr(importlib.import_module('SALib.sample.{}'.format(sampler)), 'sample') method_name = "sample_{}".format(sampler.replace('_sampler', '')) self.__setattr__(method_name, MethodType(self._method_creator(func, 'sample'), self)) def _add_analyzers(self): """Dynamically add available SALib analyzers as ProblemSpec methods.""" for analyzer in avail_approaches(analyzers): func = getattr(importlib.import_module('SALib.analyze.{}'.format(analyzer)), 'analyze') method_name = "analyze_{}".format(analyzer.replace('_analyzer', '')) self.__setattr__(method_name, MethodType(self._method_creator(func, 'analyze'), self)) def __repr__(self): if self._samples is not None: print('Samples:', self._samples.shape, "\n") if self._results is not None: print('Outputs:', self._results.shape, "\n") if self._analysis is not None: print('Analysis:\n') an_res = self._analysis allowed_types = (list, tuple) if isinstance(an_res, ResultDict): an_res = an_res.to_df() if not isinstance(an_res, allowed_types): print(an_res, "\n") else: for df in an_res: print(df, "\n") elif isinstance(an_res, dict): for res_name in an_res: print("{}:".format(res_name)) dfs = an_res[res_name].to_df() if isinstance(dfs, allowed_types): for df in dfs: print(df, "\n") else: print(dfs, "\n") return ''
def _check_spec_attributes(spec: ProblemSpec): assert 'names' in spec, "Names not defined" assert 'bounds' in spec, "Bounds not defined" assert len(spec['bounds']) == len(spec['names']), \ f"""Number of bounds do not match number of names Number of names: {len(spec['names'])} | {spec['names']} ---------------- Number of bounds: {len(spec['bounds'])} """