"""Collection of approximation functions."""
import logging
import numpy
import chaospy
[docs]def approximate_inverse(
distribution,
idx,
qloc,
bounds=None,
cache=None,
parameters=None,
xloc0=None,
iterations=300,
tolerance=1e-12,
):
"""
Calculate the approximation of the inverse Rosenblatt transformation.
Uses a hybrid Newton-Raphson and binary search method to converge to the
inverse values. Includes forward Rosenblatt transformations, probability
density function (its derivative), and if not provided, boundary function.
Args:
distribution (Distribution):
Distribution to estimate inverse Rosenblatt on.
idx (int):
The dimension to take approximation along.
qloc (numpy.ndarray):
Input values. All values must be on unit interval ``(0, 1)`` and
``qloc.shape == (dim,size)`` where dim is the number of dimensions
in distribution and size is the number of values to calculate
simultaneously.
bounds (Optional[Tuple[numpy.ndarray, numpy.ndarray]]):
Assuming lower and upper bounds is not available, this provides
outer bounds for lower and upper to use instead.
cache (Optional[Dict[Distribution, numpy.ndarray]]):
Memory cache for the location in the evaluation so far.
parameters (Optional[Dict[str, Any]]):
The parameters to use. If omitted, get the parameters from
distribution.
iterations (int):
The maximum number of iterations allowed.
tolerance (float):
Tolerance criterion determining convergence.
Returns:
(numpy.ndarray):
Approximation of inverse Rosenblatt transformation.
Example:
>>> distribution = chaospy.Normal(1000, 10)
>>> qloc = numpy.array([0.1, 0.2, 0.9])
>>> inverse = distribution.inv(qloc)
>>> inverse.round(4)
array([ 987.1845, 991.5838, 1012.8155])
>>> distribution._ppf = None
>>> numpy.allclose(
... approximate_inverse(distribution, 0, qloc), inverse)
True
"""
logger = logging.getLogger(__name__)
logger.debug("init approximate_inverse: %s", distribution)
logger.debug("cache: %s", cache)
# lots of initial values:
if cache is None:
cache = {}
if bounds is None:
xlower = distribution._get_lower(idx, cache.copy())
xupper = distribution._get_upper(idx, cache.copy())
else:
xlower = numpy.broadcast_to(bounds[0], qloc.shape)
xupper = numpy.broadcast_to(bounds[1], qloc.shape)
_lower, distribution._lower = distribution._lower, lambda **kws: xlower
_upper, distribution._upper = distribution._upper, lambda **kws: xupper
xloc = xlower + qloc * (xlower + xupper) if xloc0 is None else xloc0
uloc = numpy.zeros(qloc.shape)
ulower = -qloc
uupper = 1 - qloc
indices = numpy.ones(qloc.shape[-1], dtype=bool)
cache_copy = cache.copy()
parameters_copy = distribution._parameters.copy()
if parameters is not None:
assert not any(
[isinstance(value, chaospy.Distribution) for value in parameters.values()]
)
distribution._parameters.update(parameters)
for idx_ in range(2 * iterations):
cache.clear()
cache.update(cache_copy)
# evaluate function:
uloc = numpy.where(
indices, distribution._get_fwd(xloc, idx, cache) - qloc, uloc
)
# convergence criteria:
indices &= numpy.abs(uloc) > tolerance
if not numpy.any(indices):
break
# narrow down boundaries:
ulower = numpy.where(indices & (uloc < 0), uloc, ulower)
xlower = numpy.where(indices & (uloc < 0), xloc, xlower)
uupper = numpy.where(indices & (uloc > 0), uloc, uupper)
xupper = numpy.where(indices & (uloc > 0), xloc, xupper)
# Newton increment every second iteration:
xloc_ = numpy.inf
if idx_ % 2 == 0:
cache.clear()
cache.update(cache_copy)
try:
derivative = distribution._get_pdf(xloc, idx, cache)
except chaospy.UnsupportedFeature:
cache.clear()
cache.update(cache_copy)
derivative = approximate_density(distribution, idx, xloc, cache)
derivative = numpy.where(derivative, derivative, 1)
xloc_ = xloc - uloc / derivative
# use binary search if Newton increment is outside bounds:
weight = numpy.random.random()
xloc_ = numpy.where(
(xloc_ < xupper) & (xloc_ > xlower),
xloc_,
weight * xupper + (1 - weight) * xlower,
)
xloc = numpy.where(indices, xloc_, xloc)
else:
logger.warning("Too many iterations required to estimate inverse.")
logger.info("%d out of %d did not converge.", numpy.sum(indices), len(indices))
# print("Too many iterations required to estimate inverse.")
# print("%d out of %d did not converge." % (numpy.sum(indices), len(indices)))
cache.clear()
cache.update(cache_copy)
distribution._parameters.clear()
distribution._parameters.update(parameters_copy)
if bounds is not None:
distribution._lower = _lower
distribution._upper = _upper
logger.debug("%s: ppf approx used %d steps", distribution, idx_ / 2)
# print("%s: ppf approx used %d steps" % (distribution, idx_/2))
return xloc
MOMENTS_QUADS = {}
[docs]def approximate_moment(
distribution, k_loc, order=100000, rule="clenshaw_curtis", **kwargs
):
"""
Approximation method for estimation of raw statistical moments.
Uses quadrature integration to estimate the values.
Args:
distribution (Distribution):
Distribution domain with dim=len(distribution)
k_loc (Sequence[int, ...]):
The exponents of the moments of interest with ``shape == (dim,)``.
order (int):
The quadrature order used in approximation.
rule (str):
Quadrature rule for integrating moments.
kwargs:
Extra args passed to :func:`chaospy.generate_quadrature`.
Examples:
>>> distribution = chaospy.Uniform(1, 4)
>>> round(chaospy.approximate_moment(distribution, (1,)), 4)
2.5
>>> round(chaospy.approximate_moment(distribution, (2,)), 4)
7.0
"""
assert isinstance(distribution, chaospy.Distribution)
k_loc = numpy.asarray(k_loc)
if len(distribution) > 1:
assert (
not distribution.stochastic_dependent
), "Dependent distributions does not support moment approximation."
assert len(k_loc) == len(distribution), "incorrect size of exponents"
return numpy.prod(
[
approximate_moment(
distribution[idx], (k_loc[idx],), order=order, rule=rule, **kwargs
)
for idx in range(len(distribution))
],
axis=0,
)
k_loc = tuple(k_loc.tolist())
assert all(
[isinstance(k, int) for k in k_loc]
), "exponents must be integers: %s found" % type(k_loc[0])
order = int(1e5 if order is None else order)
if (distribution, order) not in MOMENTS_QUADS:
MOMENTS_QUADS[distribution, order] = chaospy.generate_quadrature(
order, distribution, rule=rule, **kwargs
)
X, W = MOMENTS_QUADS[distribution, order]
if k_loc in distribution._mom_cache:
return distribution._mom_cache[k_loc]
out = float(numpy.sum(numpy.prod(X.T**k_loc, 1) * W))
distribution._mom_cache[k_loc] = out
return out
[docs]def approximate_density(distribution, idx, xloc, cache=None, step_size=1e-7):
"""
Approximate the probability density function.
Args:
distribution (Distribution):
Distribution in question. May not be an advanced variable.
idx (int):
The dimension to take approximation along.
xloc (numpy.ndarray):
Location coordinates. Requires that xloc.shape=(len(distribution), K).
cache (Optional[Dict[Distribution, Tuple[numpy.ndarray, numpy.ndarray]]]):
Current state in the evaluation graph. If omitted, assume that
evaluations should be done from scratch.
step_size (float):
The relative step size between two points used to calculate the
derivative.
Returns:
numpy.ndarray: Local probability density function with
``out.shape == xloc.shape``. To calculate actual density function,
evaluate ``numpy.prod(out, 0)``.
Example:
>>> distribution = chaospy.Normal(1000, 10)
>>> xloc = numpy.array([990, 1000, 1010])
>>> approximate_density(distribution, 0, xloc).round(4)
array([0.0242, 0.0399, 0.0242])
>>> distribution.pdf(xloc).round(4)
array([0.0242, 0.0399, 0.0242])
"""
cache = {} if cache is None else cache
assert (idx, distribution) not in cache
xloc = numpy.asfarray(xloc)
assert xloc.ndim == 1
lower, upper = numpy.min(xloc), numpy.max(xloc)
middle = 0.5 * (lower + upper)
step_size = numpy.where(xloc < middle, step_size, -step_size) * numpy.clip(
numpy.abs(xloc), 1, None
)
cache1 = cache.copy()
floc1 = distribution._get_fwd(xloc, idx, cache=cache1)
cache2 = cache.copy()
floc2 = distribution._get_fwd(xloc + step_size, idx, cache=cache2)
floc = numpy.abs((floc2 - floc1) / step_size)
# weave a history of pdf from two cdf streams
for key in set(cache1).difference(cache):
cache[key] = cache1[key][0], (
(cache2[key][1] - cache1[key][1]) / (cache2[key][0] - cache1[key][0])
)
assert floc.shape == xloc.shape
return floc