Source code for qlat_utils.data

import math
import copy
import functools
import numpy as np


class q:
    from qlat_utils.utils import (
        get_fname,
    )
    from qlat_utils.timer import (
        timer,
        displayln_info,
    )
    from qlat_utils.rng_state import (
        RngState,
    )


alpha_qed = 1.0 / 137.035999084
fminv_gev = 0.197326979  # hbar * c / (1e-15 m * 1e9 electron charge * 1 volt)

float_types = (float, np.float32, np.float64,)
complex_types = (complex, np.complex64, np.complex128,)
int_types = (int, np.int32, np.int64,)

try:
    float_types = float_types + (np.float128,)
    complex_types = complex_types + (np.complex256,)
except:
    pass

real_types = float_types + int_types
number_types = real_types + complex_types


class use_kwargs:

    """
    self.default_kwargs
    self.keys
    """

    def __init__(self, default_kwargs, keys=None):
        """
        If ``keys`` is specified, then only the specified keys will be passed to the underlying function.
        """
        self.default_kwargs = default_kwargs
        self.keys = None

    def __call__(self, func):
        @functools.wraps(func)
        def f(*args, **kwargs):
            if "is_default_kwargs_applied" not in kwargs:
                d = self.default_kwargs.copy()
                d.update(kwargs)
                kwargs = d
            if self.keys is not None:
                kwargs = {k: kwargs[k] for k in self.keys}
            return func(*args, **kwargs)
        return f

###


[docs] def interp_i_arr(data_x_arr, x_arr): r""" return ``i_arr`` `` q.interp(data_x_arr, i_arr) \approx x_arr `` ``x_arr`` can be either an 1-D array-like object or a single number. e.g.: `` data(x) data_arr[:] = data(data_x_arr) q.interp(data_arr, i_arr) \approx data(x_arr) `` """ data_i_arr = np.arange(len(data_x_arr)) i_arr = np.interp(x_arr, data_x_arr, data_i_arr) return i_arr
[docs] def interp(data_arr, i_arr, axis=-1): """ return approximately ``data_arr[..., i_arr]`` if ``axis=-1``. Note that ``i_arr`` can be non-integer. ``i_arr`` can be either an 1-D array-like object or a single number. """ v_arr = np.asarray(data_arr) v_arr = np.moveaxis(v_arr, axis, 0) i_arr = np.asarray(i_arr) shape = i_arr.shape if shape == (): i = i_arr.item() size = len(v_arr) i1 = math.floor(i) assert i1 >= 0 i2 = i1 + 1 if i2 >= size: return v_arr[size - 1] elif i1 < 0: return v_arr[0] v1 = v_arr[i1] v2 = v_arr[i2] a1 = i2 - i a2 = i - i1 return a1 * v1 + a2 * v2 elif shape == (len(i_arr),): iv_arr = np.array([interp(v_arr, i, 0) for i in i_arr], v_arr.dtype) iv_arr = np.moveaxis(iv_arr, 0, axis) return iv_arr else: fname = q.get_fname() raise Exception(f"{fname}: i_arr={i_arr}")
[docs] def interp_x(data_arr, data_x_arr, x_arr, axis=-1): """ return ``interpolated_data_arr`` ``x_arr`` can be either an 1-D array-like object or a single number. ``data_x_arr`` is the x values for ``data_arr`` ``x_arr`` is the x values for `interpolated_data_arr` `` data_x_arr.shape == (data_arr.shape[axis],) `` If len(x_arr) `` interpolated_data_arr.shape[axis] == len(x_arr) len(data_arr.shape) == len(interpolated_data_arr.shape) `` """ assert data_x_arr.shape == (data_arr.shape[axis],) i_arr = interp_i_arr(data_x_arr, x_arr) interpolated_data_arr = interp(data_arr, i_arr, axis) return interpolated_data_arr
[docs] def get_threshold_idx(arr, threshold): """ return ``x`` `` q.interp(arr, [ x, ]) = np.array([ threshold, ]) arr.shape == (len(arr),) `` """ i1 = 0 i2 = len(arr) - 1 v1 = arr[i1] v2 = arr[i2] if v1 >= v2: i1, i2 = i2, i1 v1, v2 = v2, v1 while True: assert v2 >= v1 if v1 <= threshold and threshold <= v2: if i2 - i1 == 1: d_v = v2 - v1 d_i = i2 - i1 i3 = i1 + (threshold - v1) / d_v * d_i return i3 i3 = (i1 + i2) // 2 v3 = arr[i3] if threshold <= v3: i2 = i3 v2 = v3 continue elif v3 <= threshold: i1 = i3 v1 = v3 continue else: assert False elif threshold <= v1: return i1 elif v2 <= threshold: return i2 else: assert False assert False
[docs] def get_threshold_i_arr(data_arr, threshold_arr, axis=-1): r""" return ``i_arr`` # let ``shape`` = ``np.moveaxis(data_arr, axis, -1)[..., 0].shape`` `` threshold_arr = np.broadcast_to(threshold_arr, shape) `` such that `` for index in np.ndindex(shape): q.interp(data_arr[index], i_arr[index]) \approx threshold_arr[index] `` """ v_arr = np.asarray(data_arr) threshold_arr = np.asarray(threshold_arr) v_arr = np.moveaxis(v_arr, axis, -1) shape = v_arr[..., 0].shape threshold_arr = np.broadcast_to(threshold_arr, shape) i_arr = np.zeros(shape, dtype=np.float64) for index in np.ndindex(shape): t = threshold_arr[index] arr = v_arr[index] i_arr[index] = get_threshold_idx(arr, t) return i_arr
[docs] def get_threshold_x_arr(data_arr, data_x_arr, threshold_arr, axis=-1): r""" return x_arr `` data_x_arr.shape == (data_arr.shape[axis],) `` let `shape` = `np.moveaxis(data_arr, axis, -1)[..., 0].shape` `` threshold_arr = np.broadcast_to(threshold_arr, shape) `` such that `` for index in np.ndindex(shape): q.interp_x(data_arr[index], data_x_arr, x_arr[index]) \approx threshold_arr[index] `` """ assert data_x_arr.shape == (data_arr.shape[axis],) i_arr = get_threshold_i_arr(data_arr, threshold_arr, axis) x_arr = np.zeros(i_arr.shape, dtype=np.float64) x_arr.ravel()[:] = interp(data_x_arr, i_arr.ravel()) return x_arr
def partial_sum_list(x, *, is_half_last=False): """Modify in-place, preserve length""" s = 0 for i, v in enumerate(x): sp = s s += v if is_half_last: x[i] = (s + sp) / 2 else: x[i] = s def partial_sum(x, *, is_half_last=False): """Modify in-place, preserve length""" shape = x.shape if len(shape) == 0: return elif len(shape) == 1: partial_sum_list(x, is_half_last=is_half_last) elif len(shape) == 2: for v in x: partial_sum_list(v, is_half_last=is_half_last) else: assert False
[docs] def check_zero(x): if isinstance(x, real_types) and 0 == x: return True return False
[docs] def qnorm(x): """ qnorm(2) == 4 """ if isinstance(x, np.ndarray): return np.abs(np.vdot(x, x)) elif isinstance(x, real_types): return x * x elif isinstance(x, complex_types): return x.real * x.real + x.imag * x.imag elif isinstance(x, (list, tuple,)): return sum([qnorm(x_i) for x_i in x]) else: return x.qnorm() assert False
class Data: def __init__(self, val): """ # supported value types: # numeric # numpy.array # q.LatData # list """ if isinstance(val, Data): self.val = val.val assert not isinstance(self.val, Data) else: self.val = val def __str__(self): return f"Data({self.val})" def get_val(self): return self.val def __copy__(self): return Data(copy.copy(self.val)) def __deepcopy__(self, memo): return Data(copy.deepcopy(self.val, memo)) def __add__(self, other): if isinstance(other, Data): if check_zero(self.val): return other elif check_zero(other.val): return self elif isinstance(self.val, list) and isinstance(other.val, list): assert len(self.val) == len(other.val) return Data([v1 + v2 for v1, v2 in zip(self.val, other.val)]) elif isinstance(self.val, list): return Data([v + other.val for v in self.val]) elif isinstance(other.val, list): return Data([self.val + v for v in other.val]) else: return Data(self.val + other.val) else: return self + Data(other) def __radd__(self, other): if isinstance(other, Data): assert False return None else: return Data(other) + self def __mul__(self, other): if isinstance(other, Data): if check_zero(self.val) or check_zero(other.val): return Data(0) elif isinstance(self.val, list) and isinstance(other.val, list): return Data([v1 * v2 for v1, v2 in zip(self.val, other.val)]) elif isinstance(self.val, list): return Data([v * other.val for v in self.val]) elif isinstance(other.val, list): return Data([self.val * v for v in other.val]) return Data(self.val * other.val) else: return self * Data(other) def __rmul__(self, other): if isinstance(other, Data): assert False return None else: return Data(other) * self def __neg__(self): if check_zero(self.val): return Data(0) elif isinstance(self.val, list): return Data([-v for v in self.val]) else: return Data(-self.val) def __pos__(self): return self def __sub__(self, other): if isinstance(other, Data): if check_zero(self.val): return Data(-other.val) elif check_zero(other.val): return self elif isinstance(self.val, list) and isinstance(other.val, list): return Data([v1 - v2 for v1, v2 in zip(self.val, other.val)]) elif isinstance(self.val, list): return Data([v - other.val for v in self.val]) elif isinstance(other.val, list): return Data([self.val - v for v in other.val]) else: return Data(self.val - other.val) else: return self - Data(other) def __rsub__(self, other): if isinstance(other, Data): assert False return None else: return Data(other) - self def qnorm(self): return qnorm(self.val) def glb_sum(self): from qlat.mpi import glb_sum return Data(glb_sum(self.val)) ### def filter_np_results(val): if not hasattr(val, "size"): return val if val.size != 1: return val if not hasattr(val, "item"): return val return val.item()
[docs] def average(data_list): n = len(data_list) v = sum(data_list) return filter_np_results(1/n * v)
def average_ignore_nan(value_arr_list): if len(value_arr_list) == 0: return None shape = value_arr_list[0].shape dtype = value_arr_list[0].dtype count_arr = np.zeros(shape, dtype=np.int64) sum_arr = np.zeros(shape, dtype=dtype) for v_arr in value_arr_list: assert v_arr.shape == shape assert v_arr.dtype == dtype sel = ~np.isnan(v_arr) count_arr[sel] += 1 sum_arr[sel] += v_arr[sel] avg_arr = np.zeros(shape, dtype=dtype) sel = count_arr > 0 avg_arr[sel] = sum_arr[sel] / count_arr[sel] avg_arr[~sel] = np.nan return avg_arr
[docs] def block_data(data_list, block_size, is_overlapping=True): """ return the list of block averages the blocks may overlap if is_overlapping == True """ if block_size == 1: return data_list assert block_size >= 1 size = len(data_list) if block_size >= size: return [average(data_list), ] blocks = [] start = 0 stop = block_size while stop <= size: b = average(data_list[start:stop]) blocks.append(b) if is_overlapping: start += 1 stop += 1 else: start += block_size stop += block_size return blocks
[docs] def avg_err(data_list, *, eps=1, block_size=1): """ Compute `(avg, err)` of `data_list`. -- :param data_list: list of data :param eps: additional scaling factor for the error :param block_size: blocking the list of data :return: (avg, err,) where avg and err have the same type as data :rtype: (avg, err,) """ assert block_size >= 1 avg = average(data_list) n = len(data_list) if n <= 1: err = abs(eps) * avg err = filter_np_results(err) return (avg, err,) if n < 2 * block_size: block_size = 1 assert n > block_size blocks = block_data(data_list, block_size) diff_sqr = average([fsqr(d - avg) for d in blocks]) fac = abs(eps) * math.sqrt(block_size / (n - block_size)) err = fac * fsqrt(diff_sqr) err = filter_np_results(err) return (avg, err,)
[docs] def jackknife(data_list, *, eps=1): r""" Return jk[i] = avg - \frac{eps}{N} (v[i] - avg) normal jackknife uses eps=1, scale the fluctuation by eps """ is_np_arr = isinstance(data_list, np.ndarray) data_list_real = [d for d in data_list if d is not None] n = len(data_list_real) fac = eps / n avg = average(data_list_real) jks = [avg, ] for data in data_list: if data is None: jks.append(avg) else: jks.append(avg - fac * (data - avg)) if is_np_arr: jks = np.array(jks, dtype=data_list.dtype) return jks
[docs] def fsqr(data): """ Separately square real and imag part in case of complex types. -- :param data: real, complex, np.ndarray like objects. :return: squared `data`. :rtype: same type as `data`. """ if isinstance(data, real_types): return data * data elif isinstance(data, complex_types): r = data.real i = data.imag return complex(r * r, i * i) elif isinstance(data, Data): return Data(fsqr(data.val)) else: # Assuming np.ndarray like object if data.dtype in real_types: return np.square(data) elif data.dtype in complex_types: return np.square(data.real) + 1j * np.square(data.imag) else: raise Exception(f"fsqr data={data} type not supported")
[docs] def fsqrt(data): """ Separately calculate the square root real and imag part in case of complex types. -- :param data: real, complex, np.ndarray like objects. :return: squared `data`. :rtype: same type as `data`. """ if isinstance(data, real_types): return math.sqrt(data) elif isinstance(data, complex_types): r = data.real i = data.imag return complex(math.sqrt(r), math.sqrt(i)) elif isinstance(data, Data): return Data(fsqrt(data.val)) else: # Assuming np.ndarray like object if data.dtype in real_types: return np.sqrt(data) elif data.dtype in complex_types: return np.sqrt(data.real) + 1j * np.sqrt(data.imag) else: raise Exception(f"fsqr data={data} type not supported")
[docs] def err_sum(*vs): """ e.g.: `q.err_sum(1.4, 2.1, 1.0)` ==> `2.7147743920996454` """ err_sqr = sum([fsqr(v) for v in vs]) err = fsqrt(err_sqr) return err
[docs] def jk_avg(jk_arr): val = jk_arr[0] return filter_np_results(val)
[docs] def jk_err(jk_arr, *, eps=1, block_size=1): r""" Return $$ \frac{1}{eps} \sqrt{ N/(N-block_size) \sum_{i=1}^N (jk[i] - jk_avg)^2 }. $$ when ``block_size=1``. Note: ``len(jk_arr) = N + 1``. Same ``eps`` as the ``eps`` used in the ``jackknife`` function. Does not properly honor the $(N-1)$ formula in error calculation if there were missing data in the original ``data_list`` in the ``jackknife`` function. """ assert block_size >= 1 avg = jk_avg(jk_arr) n = len(jk_arr) - 1 if n <= 1: fac = 1 / abs(eps) val = fac * avg val = filter_np_results(val) return val if n < 2 * block_size: block_size = 1 assert n > block_size blocks = block_data(jk_arr[1:], block_size) diff_sqr = average([fsqr(jk - avg) for jk in blocks]) fac = math.sqrt(block_size / (n - block_size)) * n / abs(eps) val = fac * fsqrt(diff_sqr) val = filter_np_results(val) return val
[docs] def jk_avg_err(jk_arr, *, eps=1, block_size=1): return jk_avg(jk_arr), jk_err(jk_arr, eps=eps, block_size=block_size)
[docs] @q.timer def sjackknife( data_list, jk_idx_list, *, avg=None, is_hash_jk_idx=True, jk_idx_hash_size=None, rng_state=None, all_jk_idx=None, get_all_jk_idx=None, jk_blocking_func=None, eps=1, ): """ Super jackknife. Return ``jk_arr``. ``len(jk_idx_list) == len(data_list)`` ``len(jk_arr) == len(all_jk_idx)`` ``jk_idx_list`` (after processed by ``jk_blocking_func``) should be contained in ``all_jk_idx``, otherwise, if ``is_hash_jk_idx`` is true, then, a hash of ``jk_idx`` will be used instead. Ideally, ``all_jk_idx`` should only contain distinct indices. However, if there are repeatations, indices appear later take precedence. if ``all_jk_idx`` and ``get_all_jk_idx`` are both ``None``, then a trivial ``all_jk_idx`` will be created based on ``jk_idx_hash_size``. """ if jk_idx_hash_size is None: jk_idx_hash_size = 1024 if rng_state is None: rng_state = q.RngState("rejk") rs = rng_state assert len(jk_idx_list) == len(data_list) if isinstance(data_list, np.ndarray): dtype = data_list.dtype else: dtype = None data_list_real = [d for d in data_list if d is not None] data_arr = np.array(data_list_real, dtype=dtype) if avg is None: avg = average(data_arr) dtype = data_arr.dtype jk_idx_list = [ jk_idx for jk_idx, d in zip(jk_idx_list, data_list) if d is not None ] if jk_blocking_func is not None: jk_idx_list = [ jk_blocking_func(0, jk_idx) for jk_idx in jk_idx_list ] n = len(data_arr) assert n == len(jk_idx_list) if all_jk_idx is None: if get_all_jk_idx is None: assert is_hash_jk_idx all_jk_idx = ["avg", ] + list(range(jk_idx_hash_size)) else: all_jk_idx = get_all_jk_idx() assert all_jk_idx[0] == "avg" n_super_sample = len(all_jk_idx) - 1 i_dict = dict() for i, jk_idx in enumerate(all_jk_idx): jk_idx_str = str(jk_idx) i_dict[jk_idx_str] = i i_arr = np.zeros(n, dtype=np.int32) for j in range(n): jk_idx = jk_idx_list[j] jk_idx_str = str(jk_idx) if jk_idx_str in i_dict: i = i_dict[jk_idx_str] else: assert is_hash_jk_idx rsi = rs.split(jk_idx_str) i = 1 + int(rsi.rand_gen() % n_super_sample) assert i > 0 i_arr[j] = i count_dict = dict() for j in range(n): i = i_arr[j] if i in count_dict: count_dict[i] += 1 else: count_dict[i] = 1 jk_arr = np.empty((1 + n_super_sample, *data_arr[0].shape,), dtype=dtype) jk_arr[:] = avg data_diff = data_arr - avg for j in range(n): i = i_arr[j] assert i > 0 assert i in count_dict if n > count_dict[i]: n_b = n - count_dict[i] fac = -eps * np.sqrt(1 / (n * n_b)) jk_arr[i] += fac * data_diff[j] return jk_arr
[docs] @q.timer def sjk_mk_jk_val( rs_tag, val, err, *, is_hash_jk_idx=True, jk_idx_hash_size=None, rng_state=None, all_jk_idx=None, get_all_jk_idx=None, eps=1, ): """ return jk_arr n = n_rand_sample len(jk_arr) == 1 + n jk_arr[i] = val + err * r[i] for i in 1..n where r[i] ~ N(0, 1) """ if jk_idx_hash_size is None: jk_idx_hash_size = 1024 assert jk_idx_hash_size >= 0 if rng_state is None: rng_state = q.RngState("rejk") rs = rng_state if all_jk_idx is None: if get_all_jk_idx is None: assert is_hash_jk_idx all_jk_idx = ["avg", ] + list(range(jk_idx_hash_size)) else: all_jk_idx = get_all_jk_idx() assert all_jk_idx[0] == "avg" n_super_sample = len(all_jk_idx) - 1 assert n_super_sample >= 0 assert isinstance(rng_state, q.RngState) assert isinstance(val, real_types) assert isinstance(err, real_types) rs = rng_state.split(str(rs_tag)) jk_arr = np.zeros((n_super_sample + 1,), dtype=np.float64) jk_arr[0] = val r_arr = rs.g_rand_arr((n_super_sample,)) r_arr_qnorm = qnorm(r_arr) r_arr = r_arr * np.sqrt(1 / r_arr_qnorm) assert abs(qnorm(r_arr) - 1) < 1e-8 jk_arr[1:] = val + eps * r_arr * err return jk_arr
[docs] def sjk_avg(jk_arr): return jk_avg(jk_arr)
[docs] def sjk_err(jk_arr, *, eps=1): r""" Return $$ \frac{1}{eps} \sqrt{ \sum_{i=1}^N (jk[i] - jk_avg)^2 }. $$ Note: ``len(jk_arr) = N + 1``. Same ``eps`` as the ``eps`` used in the ``jackknife`` function. """ avg = jk_avg(jk_arr) n = len(jk_arr) - 1 if n <= 1: fac = 1 / abs(eps) val = fac * avg val = filter_np_results(val) return val diff_sqr = average([fsqr(jk - avg) for jk in jk_arr[1:]]) fac = math.sqrt(n) / abs(eps) val = fac * fsqrt(diff_sqr) val = filter_np_results(val) return val
[docs] def sjk_avg_err(jk_arr, *, eps=1): return sjk_avg(jk_arr), sjk_err(jk_arr, eps=eps)
# ---------- @q.timer def mk_r_i_j_mat( n_rand_sample, jk_idx_list, rng_state, *, jk_blocking_func, is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift, is_use_old_rand_alg, ): assert n_rand_sample >= 0 rs = rng_state n = len(jk_idx_list) r_arr = np.empty((n_rand_sample, n,), dtype=np.float64) b_arr = np.empty((n_rand_sample, n,), dtype=np.int32) jk_idx_str_arr = np.empty((n_rand_sample, n,), dtype=object) jk_idx_str_set = set() if jk_blocking_func is None: is_apply_rand_sample_jk_idx_blocking_shift = False @q.timer def set_jk_idx(): if is_apply_rand_sample_jk_idx_blocking_shift: for i in range(n_rand_sample): count_dict = dict() for j in range(n): jk_idx = jk_blocking_func(i + 1, jk_idx_list[j]) jk_idx_str = str(jk_idx) jk_idx_str_arr[i, j] = jk_idx_str jk_idx_str_set.add(jk_idx_str) if jk_idx_str in count_dict: count_dict[jk_idx_str] += 1 else: count_dict[jk_idx_str] = 1 for j in range(n): jk_idx_str = jk_idx_str_arr[i, j] b_arr[i, j] = count_dict[jk_idx_str] else: count_dict = dict() for j in range(n): jk_idx = jk_idx_list[j] if jk_blocking_func is not None: jk_idx = jk_blocking_func(0, jk_idx) jk_idx_str = str(jk_idx) jk_idx_str_arr[:, j] = jk_idx_str jk_idx_str_set.add(jk_idx_str) if jk_idx_str in count_dict: count_dict[jk_idx_str] += 1 else: count_dict[jk_idx_str] = 1 for j in range(n): jk_idx_str = jk_idx_str_arr[0, j] b_arr[:, j] = count_dict[jk_idx_str] set_jk_idx() if is_use_old_rand_alg == "v1": assert not is_normalizing_rand_sample for i in range(n_rand_sample): rsi = rs.split(str(i)) r = [rsi.split(jk_idx_str).g_rand_gen() for jk_idx_str in jk_idx_str_arr[i]] for j in range(n): r_arr[i, j] = r[j] return r_arr, b_arr assert is_use_old_rand_alg == False r_arr_dict = dict() @q.timer def set_r(): for jk_idx_str in jk_idx_str_set: rsi = rs.split(jk_idx_str) garr = rsi.g_rand_arr(n_rand_sample) if is_normalizing_rand_sample: # garr_qnorm \approx n_rand_sample garr_qnorm = qnorm(garr) garr = garr * np.sqrt(n_rand_sample / garr_qnorm) assert abs(qnorm(garr) / n_rand_sample - 1) < 1e-8 r_arr_dict[jk_idx_str] = garr set_r() @q.timer def set_r_arr(): if is_apply_rand_sample_jk_idx_blocking_shift: for i in range(n_rand_sample): for j in range(n): jk_idx_str = jk_idx_str_arr[i, j] garr = r_arr_dict[jk_idx_str] r_arr[i, j] = garr[i] else: for j in range(n): jk_idx_str = jk_idx_str_arr[0, j] garr = r_arr_dict[jk_idx_str] r_arr[:, j] = garr set_r_arr() return r_arr, b_arr
[docs] @q.timer def rjackknife( data_list, jk_idx_list, *, avg=None, rng_state=None, n_rand_sample=None, jk_blocking_func=None, is_normalizing_rand_sample=False, is_apply_rand_sample_jk_idx_blocking_shift=True, is_use_old_rand_alg=False, eps=1, ): r""" Jackknife-bootstrap hybrid resampling. Return ``jk_arr``. ``len(jk_arr) == 1 + n_rand_sample`` distribution of ``jk_arr`` should be similar as the distribution of ``avg``. ``r_{i,j} ~ N(0, 1)`` `` if is_normalizing_rand_sample: n_j = \sum_i r_{i,j}^2 r_{i,j} <- \sqrt{n_rand_sample / n_j} r_{i,j} data_list_real = [d for d in data_list if d is not None] data_arr = np.array(data_list_real, dtype=dtype) avg = average(data_arr) len(data_list_real) = n jk_arr[0] = avg jk_arr[i] = avg + \sum_{j=1}^{n} (-eps/\sqrt{n (n - b(i,j))}) r_{i,j} (data_list_real[j] - avg) `` where ``b(i,j)`` represent the ``block_size``. if ``jk_blocking_func`` is provided: ``jk_blocking_func(i, jk_idx) => blocked jk_idx`` `` jk_arr[i] = avg + \sum_{j=1}^{n} r_{i,jk_block_func(j)} (jk_arr[j] - avg) `` """ if n_rand_sample is None: n_rand_sample = 1024 if rng_state is None: rng_state = q.RngState("rejk") assert len(data_list) == len(jk_idx_list) assert isinstance(n_rand_sample, int_types) assert n_rand_sample >= 0 assert isinstance(rng_state, q.RngState) if isinstance(data_list, np.ndarray): dtype = data_list.dtype else: dtype = None data_list_real = [d for d in data_list if d is not None] data_arr = np.array(data_list_real, dtype=dtype) if avg is None: avg = average(data_arr) dtype = data_arr.dtype jk_idx_list = [ jk_idx for jk_idx, d in zip(jk_idx_list, data_list) if d is not None ] n = len(data_arr) r_arr, b_arr = mk_r_i_j_mat( n_rand_sample, jk_idx_list, rng_state, jk_blocking_func=jk_blocking_func, is_normalizing_rand_sample=is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift=is_apply_rand_sample_jk_idx_blocking_shift, is_use_old_rand_alg=is_use_old_rand_alg, ) n_b_arr = n - b_arr n_b_arr[n <= b_arr] = 1 fac_arr = -eps / np.sqrt(n * n_b_arr) fac_arr[n <= b_arr] = 0 fac_r_arr = fac_arr * r_arr pad_shape = (1,) * len(data_arr[0].shape) fac_r_arr = fac_r_arr.reshape(fac_r_arr.shape + pad_shape) data_diff = data_arr - avg jk_arr = np.empty((1 + n_rand_sample, *data_arr[0].shape,), dtype=dtype) jk_arr[0] = avg jk_arr[1:] = avg + np.sum(fac_r_arr * data_diff, axis=1) return jk_arr
[docs] @q.timer def rjk_mk_jk_val( rs_tag, val, err, *, n_rand_sample=None, rng_state=None, eps=1, ): """ return jk_arr n = n_rand_sample len(jk_arr) == 1 + n jk_arr[i] = val + err * r[i] for i in 1..n where r[i] ~ N(0, 1) """ if n_rand_sample is None: n_rand_sample = 1024 if rng_state is None: rng_state = q.RngState("rejk") assert n_rand_sample >= 0 assert isinstance(rng_state, q.RngState) assert isinstance(val, real_types) assert isinstance(err, real_types) rs = rng_state.split(str(rs_tag)) jk_arr = np.zeros((n_rand_sample + 1,), dtype=np.float64) jk_arr[0] = val r_arr = rs.g_rand_arr((n_rand_sample,)) r_arr_qnorm = qnorm(r_arr) r_arr = r_arr * np.sqrt(n_rand_sample / r_arr_qnorm) assert abs(qnorm(r_arr) / n_rand_sample - 1) < 1e-8 jk_arr[1:] = val + eps * r_arr * err return jk_arr
[docs] def rjk_avg(jk_arr): return jk_avg(jk_arr)
[docs] def rjk_err(jk_arr, eps=1): r""" Return $$ \frac{1}{eps} \sqrt{ 1/N \sum_{i=1}^N (jk[i] - jk_avg)^2 }. $$ Note: `` len(jk_arr) = N + 1. jk_avg = jk_arr[0] `` Same ``eps`` as the ``eps`` used in the ``jackknife`` function. """ avg = jk_avg(jk_arr) n = len(jk_arr) - 1 if n <= 0: fac = 1 / abs(eps) val = fac * avg val = filter_np_results(val) return val diff_sqr = average([fsqr(jk - avg) for jk in jk_arr[1:]]) fac = 1 / abs(eps) val = fac * fsqrt(diff_sqr) val = filter_np_results(val) return val
[docs] def rjk_avg_err(rjk_list, eps=1): return rjk_avg(rjk_list), rjk_err(rjk_list, eps)
# ---------- default_g_jk_kwargs = dict() def mk_g_jk_kwargs(): """ Return the predefined `default_g_jk_kwargs`. """ g_jk_kwargs = dict() # g_jk_kwargs["jk_type"] = "rjk" # choices: "rjk", "super" g_jk_kwargs["eps"] = 1 # # for jk_type = "rjk" g_jk_kwargs["n_rand_sample"] = 1024 g_jk_kwargs["is_normalizing_rand_sample"] = False g_jk_kwargs["is_apply_rand_sample_jk_idx_blocking_shift"] = True # # for jk_type = "super" g_jk_kwargs["is_hash_jk_idx"] = True g_jk_kwargs["jk_idx_hash_size"] = 1024 # # Is only needed to reproduce old results # Possible choice: "v1" (also need default_g_jk_kwargs["is_normalizing_rand_sample"] == False) g_jk_kwargs["is_use_old_rand_alg"] = False # # these parameters are used in jk_blocking_func_default g_jk_kwargs["block_size"] = 1 g_jk_kwargs["block_size_dict"] = { "job_tag": 1, } # # Below are items which are not touched in # ``get_jk_state`` or ``set_jk_state`` # g_jk_kwargs["rng_state"] = q.RngState("rejk") # g_jk_kwargs["all_jk_idx"] = None g_jk_kwargs["get_all_jk_idx"] = None # g_jk_kwargs["all_jk_idx_set"] = set() # # jk_blocking_func(i, jk_idx) => blocked_jk_idx g_jk_kwargs["jk_blocking_func"] = jk_blocking_func_default # return g_jk_kwargs def reset_default_g_jk_kwargs(): default_g_jk_kwargs.clear() default_g_jk_kwargs.update(mk_g_jk_kwargs())
[docs] @use_kwargs(default_g_jk_kwargs) def get_jk_state( *, jk_type, eps, n_rand_sample, is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift, is_hash_jk_idx, jk_idx_hash_size, is_use_old_rand_alg, block_size, block_size_dict, **_kwargs, ): """ Currently only useful if we set # q.default_g_jk_kwargs["jk_type"] = "rjk" # this is the default now and q.default_g_jk_kwargs["jk_blocking_func"] = jk_blocking_func_default # Used for `q.cache_call`. Example: q.cache_call(get_state=q.get_jk_state) def func(...): ... """ assert jk_type == "rjk" return ( jk_type, eps, n_rand_sample, is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift, is_hash_jk_idx, jk_idx_hash_size, is_use_old_rand_alg, block_size, block_size_dict, )
[docs] def set_jk_state(state): ( jk_type, eps, n_rand_sample, is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift, is_hash_jk_idx, jk_idx_hash_size, is_use_old_rand_alg, block_size, block_size_dict, ) = state g_dict = default_g_jk_kwargs g_dict["jk_type"] = jk_type g_dict["eps"] = eps g_dict["n_rand_sample"] = n_rand_sample g_dict["is_normalizing_rand_sample"] = is_normalizing_rand_sample g_dict["is_apply_rand_sample_jk_idx_blocking_shift"] = is_apply_rand_sample_jk_idx_blocking_shift g_dict["is_hash_jk_idx"] = is_hash_jk_idx g_dict["jk_idx_hash_size"] = jk_idx_hash_size g_dict["is_use_old_rand_alg"] = is_use_old_rand_alg g_dict["block_size"] = block_size g_dict["block_size_dict"] = block_size_dict
jk_blocking_traj_shift_arr = ( q.RngState("jk_blocking_traj_shift_arr").rand_arr(16 * 1024) % (1024 * 1024 * 1024 * 1024) ) @use_kwargs(default_g_jk_kwargs) def jk_blocking_func_default( i, jk_idx, *, block_size, block_size_dict, all_jk_idx_set, **_kwargs, ): """ return ``blocked_jk_idx``. ``blocked_jk_idx`` should uniquely identify the block that configuration identified by ``jk_idx`` belongs to. The block scheme can be different for different J-B hybrid sample. The J-B hybrid sample is indexed by ``i`` (``1 <= i <= n_rand_sample``). `` block_size_for_this_job_tag = block_size_dict.get(job_tag, block_size) `` use default_g_jk_kwargs for block_size, block_size_dict, all_jk_idx_set """ if i == 0: shift = 0 else: assert i >= 1 shift = int(jk_blocking_traj_shift_arr[ (i - 1) % len(jk_blocking_traj_shift_arr) ]) if block_size_dict is None: block_size_dict = dict() if all_jk_idx_set is not None: all_jk_idx_set.add(jk_idx) if isinstance(jk_idx, int_types): traj = jk_idx b_shift = shift % block_size return (traj + b_shift) // block_size elif isinstance(jk_idx, tuple) and len(jk_idx) == 2 and isinstance(jk_idx[1], int_types): job_tag, traj = jk_idx assert isinstance(job_tag, str) assert isinstance(traj, int_types) block_size_for_this_job_tag = block_size_dict.get(job_tag, block_size) assert isinstance(block_size_for_this_job_tag, int_types) b_shift = shift % block_size_for_this_job_tag return (job_tag, (traj + b_shift) // block_size_for_this_job_tag,) else: return jk_idx assert False
[docs] @use_kwargs(default_g_jk_kwargs) @q.timer def g_mk_jk( data_list, jk_idx_list, *, avg=None, jk_type, all_jk_idx, get_all_jk_idx, n_rand_sample, rng_state, jk_blocking_func, is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift, is_use_old_rand_alg, is_hash_jk_idx, jk_idx_hash_size, eps, **_kwargs, ): """ Perform (randomized) Super-Jackknife for the Jackknife data set. -- :param data_list: initial un-jackknifed data. :param jk_idx_list: should be list of indices that names the ``jk_arr``. :param jk_type: ``[ "rjk", "super", ]`` :param eps: Error scaling factor. :return: (randomized) Super-Jackknife data set. Note that:: len(data_list) == len(jk_idx_list) jk_idx_list = [(job_tag, traj,) for traj in traj_list] We can set ``eps`` to be factor ``len(data_list)`` larger. """ if jk_type == "super": jk_arr = sjackknife( data_list, jk_idx_list, avg=avg, is_hash_jk_idx=is_hash_jk_idx, jk_idx_hash_size=jk_idx_hash_size, rng_state=rng_state, all_jk_idx=all_jk_idx, get_all_jk_idx=get_all_jk_idx, jk_blocking_func=jk_blocking_func, eps=eps, ) elif jk_type == "rjk": jk_arr = rjackknife( data_list, jk_idx_list, avg=avg, n_rand_sample=n_rand_sample, rng_state=rng_state, jk_blocking_func=jk_blocking_func, is_normalizing_rand_sample=is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift=is_apply_rand_sample_jk_idx_blocking_shift, is_use_old_rand_alg=is_use_old_rand_alg, eps=eps, ) else: assert False return jk_arr
[docs] @use_kwargs(default_g_jk_kwargs) @q.timer def g_mk_jk_val( rs_tag, val, err, *, jk_type, all_jk_idx, get_all_jk_idx, n_rand_sample, rng_state, is_hash_jk_idx, jk_idx_hash_size, eps, **_kwargs, ): """ Create a jackknife sample with random numbers based on central value ``val`` and error ``err``. -- Need: default_g_jk_kwargs["jk_type"] = "rjk" default_g_jk_kwargs["n_rand_sample"] = n_rand_sample # e.g. n_rand_sample = 1024 default_g_jk_kwargs["rng_state"] = rng_state # e.g. rng_state = q.RngState("rejk") """ if jk_type == "super": jk_val = sjk_mk_jk_val( rs_tag, val, err, is_hash_jk_idx=is_hash_jk_idx, jk_idx_hash_size=jk_idx_hash_size, rng_state=rng_state, all_jk_idx=all_jk_idx, get_all_jk_idx=get_all_jk_idx, eps=eps, ) elif jk_type == "rjk": jk_val = rjk_mk_jk_val( rs_tag, val, err, n_rand_sample=n_rand_sample, rng_state=rng_state, eps=eps, ) else: assert False return jk_val
[docs] def g_jk_avg(jk_arr, **_kwargs): """ Return ``avg`` of the ``jk_arr``. """ if isinstance(jk_arr, number_types): return jk_arr return jk_avg(jk_arr)
[docs] @use_kwargs(default_g_jk_kwargs) def g_jk_err(jk_arr, *, eps, jk_type, **_kwargs): """ Return ``err`` of the ``jk_arr``. """ if isinstance(jk_arr, number_types): return 0 if jk_type == "super": return sjk_err(jk_arr, eps=eps) elif jk_type == "rjk": return rjk_err(jk_arr, eps=eps) else: assert False return None
[docs] @q.timer def g_jk_avg_err(jk_arr, **kwargs): """ Return ``(avg, err,)`` of the ``jk_arr``. """ return g_jk_avg(jk_arr), g_jk_err(jk_arr, **kwargs)
[docs] @q.timer def g_jk_avg_err_arr(jk_arr, **kwargs): """ Return ``avg_err_arr`` of the ``jk_arr``. `` avg_err_arr.shape = jk_arr[0].shape + (2,) `` """ avg, err = g_jk_avg_err(jk_arr, **kwargs) avg_err_arr = np.stack([avg, err, ]) avg_err_arr = np.moveaxis(avg_err_arr, 0, -1).copy() return avg_err_arr
[docs] @use_kwargs(default_g_jk_kwargs) def g_jk_size( *, jk_type, all_jk_idx, get_all_jk_idx, n_rand_sample, is_hash_jk_idx, jk_idx_hash_size, **_kwargs, ): """ Return number of samples for the (randomized) Super-Jackknife data set. """ if jk_type == "super": if all_jk_idx is None: if get_all_jk_idx is None: assert is_hash_jk_idx all_jk_idx = ["avg", ] + list(range(jk_idx_hash_size)) else: all_jk_idx = get_all_jk_idx() assert all_jk_idx[0] == "avg" n_super_sample = len(all_jk_idx) - 1 assert n_super_sample >= 0 return 1 + n_super_sample elif jk_type == "rjk": return 1 + n_rand_sample else: assert False return None
[docs] @use_kwargs(default_g_jk_kwargs) def g_jk_blocking_func( i, jk_idx, *, jk_blocking_func, **_kwargs, ): """ Return ``jk_blocking_func(jk_idx)``. """ if jk_blocking_func is None: return jk_idx else: return jk_blocking_func(i, jk_idx)
@use_kwargs(default_g_jk_kwargs) def g_jk_sample_size( job_tag, traj_list, **_kwargs, ): jk_idx_list = [(job_tag, traj,) for traj in traj_list] b_jk_idx_set = set(g_jk_blocking_func(0, jk_idx, **kwargs) for jk_idx in jk_idx_list) return len(b_jk_idx_set) reset_default_g_jk_kwargs() # ---- default_show_val_kwargs = dict() def mk_show_val_kwargs(): d = dict() d["is_latex"] = True d["num_float_digit"] = None d["num_exp_digit"] = None d["exponent"] = None return d default_show_val_kwargs.update(mk_show_val_kwargs()) def get_val_exp(val, exp=0): """ return val, exp where `val * 10**exp` is the same as input """ assert isinstance(val, (int, float)) assert isinstance(exp, int) if val == 0.0: return 0.0, 0 while abs(val) >= 10.0: val /= 10 exp += 1 while abs(val) < 1.0: val *= 10 exp -= 1 return val, exp @use_kwargs(default_show_val_kwargs) def show_val( val, *, is_latex, num_float_digit, num_exp_digit, exponent, ): """ `is_latex` can be in [ None, False, True, ] `num_float_digit` or `num_exp_digit` can be in [ None, False, True, int, ] `exponent` can be in [ None, int, ] """ assert isinstance(val, (int, float)) if is_latex is None: is_latex = True if exponent is not None: assert isinstance(exponent, int) num_float_digit = False assert num_exp_digit is not False e = exponent v = val / 10**e else: v, e = get_val_exp(val) if (num_float_digit is None) and (num_exp_digit is None): if -2 <= e <= 4: num_float_digit = True num_exp_digit = False else: num_exp_digit = True num_float_digit = False if num_float_digit is None: if num_exp_digit is False: num_float_digit = True else: num_float_digit = False if num_exp_digit is None: if num_float_digit is False: num_exp_digit = True else: num_exp_digit = False if num_float_digit is True: num_float_digit = max(1, 5 - e) else: assert (num_float_digit is False) or isinstance(num_float_digit, int) if num_exp_digit is True: num_exp_digit = 5 else: assert (num_exp_digit is False) or isinstance(num_exp_digit, int) assert not ((num_float_digit is False) and (num_exp_digit is False)) if num_exp_digit is False: assert isinstance(num_float_digit, int) assert num_float_digit >= 0 return (f"{{:.{num_float_digit}f}}").format(val) else: assert isinstance(num_exp_digit, int) assert num_exp_digit >= 0 v_str = (f"{{:.{num_exp_digit}f}}").format(v) if is_latex: return f"{v_str} \\times 10^{{{e}}}" else: return f"{v_str}E{e}" @use_kwargs(default_show_val_kwargs) def show_val_err( val_err, *, is_latex, num_float_digit, num_exp_digit, exponent, ): """ `is_latex` can be in [ None, False, True, ] `num_float_digit` or `num_exp_digit` can be in [ None, False, True, int, ] `exponent` can be in [ None, int, ] # Examples: print(show_val_err((1.12e16, 12), num_float_digit=1)) print(show_val_err((1.12e16, 12e6), num_exp_digit=True)) print(show_val_err((1.12e16, 12e6))) print(show_val_err((1.12e16, 12e7), exponent=10)) print(show_val_err((1.12e16, 12e7), exponent=10, is_latex=False)) """ if isinstance(val_err, (int, float)): val = val_err return show_val( val, is_latex=is_latex, num_float_digit=num_float_digit, num_exp_digit=num_exp_digit, exponent=exponent, ) val, err = val_err if err == 0: return show_val( val, is_latex=is_latex, num_float_digit=num_float_digit, num_exp_digit=num_exp_digit, exponent=exponent, ) assert isinstance(val, (int, float)) assert isinstance(err, (int, float)) if is_latex is None: is_latex = True e_v, e_e = get_val_exp(err) if abs(e_v) <= 2.5: e_v *= 100 e_e -= 2 else: e_v *= 10 e_e -= 1 if exponent is not None: assert isinstance(exponent, int) num_float_digit = False assert num_exp_digit is not False e = exponent v = val / 10**e else: v, e = get_val_exp(val) if (e_e > e) or (v == 0.0): e = e_e v = val / 10**e if (num_float_digit is None) and (num_exp_digit is None): if -2 <= e <= 4: num_float_digit = True num_exp_digit = False else: num_exp_digit = True num_float_digit = False if num_float_digit is None: if num_exp_digit is False: num_float_digit = True else: num_float_digit = False if num_exp_digit is None: if num_float_digit is False: num_exp_digit = True else: num_exp_digit = False if num_float_digit is True: num_float_digit = max(0, -e_e) else: assert (num_float_digit is False) or isinstance(num_float_digit, int) if num_exp_digit is True: num_exp_digit = max(0, e - e_e) else: assert (num_exp_digit is False) or isinstance(num_exp_digit, int) assert not ((num_float_digit is False) and (num_exp_digit is False)) if num_exp_digit is False: assert isinstance(num_float_digit, int) assert num_float_digit >= 0 if abs(err) >= 1.0: return (f"{{0:.{num_float_digit}f}}({{1:.{num_float_digit}f}})").format(val, err) else: e_e = -num_float_digit e_v = err / 10**e_e return (f"{{0:.{num_float_digit}f}}({{1}})").format(val, round(e_v)) else: assert isinstance(num_exp_digit, int) assert num_exp_digit >= 0 e_e = e e_v = err / 10**e_e if abs(e_v) >= 1.0: v_str = (f"{{0:.{num_exp_digit}f}}({{1:.{num_exp_digit}f}})").format( v, e_v) else: e_e = e - num_exp_digit e_v = err / 10**e_e v_str = (f"{{0:.{num_exp_digit}f}}({{1}})").format(v, round(e_v)) if is_latex: return f"{v_str} \\times 10^{{{e}}}" else: return f"{v_str}E{e}" # ---- class NewDictValues: """ Example: # with q.NewDictValues(dictionary, k1=v1, k2=v2, ...): ... # """ def __init__(self, dictionary, **kwargs): self.dictionary = dictionary self.new_kwargs = kwargs self.original = dict() def __enter__(self): for key in self.new_kwargs.keys(): self.original[key] = self.dictionary[key] self.dictionary[key] = self.new_kwargs[key] def __exit__(self, exc_type, exc_value, traceback): assert exc_type is None assert exc_value is None assert traceback is None for key in self.new_kwargs.keys(): self.dictionary[key] = self.original[key] self.new_kwargs = None self.original = None # ---- class JkKwargs(NewDictValues): """ Example: # with q.JkKwargs(n_rand_sample=1024, block_size=10, block_size_dict={ "48I": 20, }): ... # """ def __init__(self, **kwargs): super().__init__(default_g_jk_kwargs, **kwargs) # ---- class ShowKwargs(NewDictValues): """ Example: # with q.ShowKwargs(is_latex=True, exponent=-10): ... # """ def __init__(self, **kwargs): super().__init__(default_show_val_kwargs, **kwargs) # ---- # ---- old funcs def merge_jk_idx(*jk_idx_list_list): for jk_idx_list in jk_idx_list_list: assert jk_idx_list[0] == "avg" return ["avg", ] + [jk_idx for jk_idx_list in jk_idx_list_list for jk_idx in jk_idx_list[1:]] @q.timer def rejk_list(jk_list, jk_idx_list, all_jk_idx): """ Super jackknife ``jk_idx_list`` should be contained in ``all_jk_idx`` and have the same order. Does not properly honor the (N-1) formula in error calculation. """ assert jk_idx_list[0] == "avg" assert all_jk_idx[0] == "avg" assert len(jk_idx_list) == len(jk_list) assert len(jk_idx_list) <= len(all_jk_idx) is_np_arr = isinstance(jk_list, np.ndarray) jk_avg = jk_list[0] size_new = len(all_jk_idx) i_new = 0 jk_list_new = [] for i, idx in enumerate(jk_idx_list): while all_jk_idx[i_new] != idx: jk_list_new.append(jk_avg) i_new += 1 assert i_new < size_new jk_list_new.append(jk_list[i]) i_new += 1 while i_new < size_new: jk_list_new.append(jk_avg) i_new += 1 assert i_new == size_new assert size_new == len(jk_list_new) if is_np_arr: jk_list_new = np.array(jk_list_new, dtype=jk_list.dtype) return jk_list_new @q.timer def rjk_jk_list( jk_list, jk_idx_list, n_rand_sample, rng_state, jk_blocking_func=None, is_normalizing_rand_sample=False, is_apply_rand_sample_jk_idx_blocking_shift=True, is_use_old_rand_alg=False, ): r""" return jk_list len(jk_list) == 1 + n_rand_sample distribution of jk_list should be similar as the distribution of avg r_{i,j} ~ N(0, 1) if is_normalizing_rand_sample: n_j = \sum_i r_{i,j}^2 r_{i,j} <- \sqrt{n_rand_sample / n_j} r_{i,j} avg = jk_list[0] len(jk_list) = n + 1 jk_list[i] = avg + \sum_{j=1}^{n} r_{i,j} (jk_list[j] - avg) # if `jk_blocking_func` is provided: `` jk_blocking_func(i, jk_idx) => blocked jk_idx `` Note that: ``1 <= i <= n_rand_sample`` `` jk_list[i] = avg + \sum_{j=1}^{n} r_{i,jk_block_func(i, j)} (jk_list[j] - avg) `` """ assert jk_idx_list[0] == "avg" assert isinstance(n_rand_sample, int_types) assert n_rand_sample >= 0 assert isinstance(rng_state, q.RngState) is_np_arr = isinstance(jk_list, np.ndarray) n = len(jk_list) - 1 r_arr, b_arr = mk_r_i_j_mat( n_rand_sample, jk_idx_list[1:], rng_state, jk_blocking_func=jk_blocking_func, is_normalizing_rand_sample=is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift=is_apply_rand_sample_jk_idx_blocking_shift, is_use_old_rand_alg=is_use_old_rand_alg, ) avg = jk_list[0] if is_np_arr: jk_arr = jk_list jk_diff = jk_arr[1:] - avg rjk_arr = np.empty((1 + n_rand_sample, *avg.shape,), dtype=jk_arr.dtype) rjk_arr[:] = avg for j in range(n): for i in range(n_rand_sample): rjk_arr[i + 1] += r_arr[i, j] * jk_diff[j] return rjk_arr else: rjk_list = [avg, ] jk_diff = [jk_list[j] - avg for j in range(1, n + 1)] for i in range(n_rand_sample): rjk_list.append( avg + sum([r_arr[i, j] * jk_diff[j] for j in range(n)])) return rjk_list @use_kwargs(default_g_jk_kwargs) @q.timer def g_jk(data_list, *, eps, **_kwargs): """ Obsolete, call ``g_mk_jk`` instead. -- Perform initial Jackknife for the original data set.\n """ return jackknife(data_list, eps=eps) @use_kwargs(default_g_jk_kwargs) @q.timer def g_rejk( jk_list, jk_idx_list, *, jk_type, all_jk_idx, get_all_jk_idx, n_rand_sample, rng_state, jk_blocking_func, is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift, is_use_old_rand_alg, **_kwargs, ): """ Obsolete, call ``g_mk_jk`` instead. -- Perform (randomized) Super-Jackknife for the Jackknife data set. -- :jk_list: usually the Jackknife data set obtained with ``g_jk(data_list)``. :jk_idx_list: should be list of indices that names the ``jk_list``. :jk_type: ``[ "rjk", "super", ]`` :returns: (randomized) Super-Jackknife data set. Note that:: len(jk_list) == len(jk_idx_list) jk_idx_list[0] == "avg" """ if jk_type == "super": if jk_blocking_func is not None: q.displayln_info( f"g_rejk: jk_type={jk_type} does not support jk_blocking_func={jk_blocking_func}") if all_jk_idx is None: assert get_all_jk_idx is not None all_jk_idx = get_all_jk_idx() return rejk_list( jk_list, jk_idx_list, all_jk_idx, ) elif jk_type == "rjk": return rjk_jk_list( jk_list, jk_idx_list, n_rand_sample, rng_state, jk_blocking_func, is_normalizing_rand_sample, is_apply_rand_sample_jk_idx_blocking_shift, is_use_old_rand_alg, ) else: assert False return None def interpolate_list(data_arr, i): """ Old function. return approximately data_arr[i] Use `q.interp(data_arr, i, 0)` instead """ return interp(data_arr, i, 0) def mk_jk_blocking_func(block_size=1, block_size_dict=None, all_jk_idx_set=None): """ Recommend to use `jk_blocking_func_default` instead. # block_size_for_this_job_tag = block_size_dict.get(job_tag, block_size) """ if block_size_dict is None: block_size_dict = dict() def jk_blocking_func(jk_idx): if all_jk_idx_set is not None: all_jk_idx_set.add(jk_idx) if isinstance(jk_idx, int_types): traj = jk_idx return traj // block_size elif isinstance(jk_idx, tuple) and len(jk_idx) == 2 and isinstance(jk_idx[1], int_types): job_tag, traj = jk_idx assert isinstance(job_tag, str) assert isinstance(traj, int_types) block_size_for_this_job_tag = block_size_dict.get( job_tag, block_size) assert isinstance(block_size_for_this_job_tag, int_types) return (job_tag, traj // block_size_for_this_job_tag,) else: return jk_idx return jk_blocking_func def interpolate(data_arr, i_arr): """ Old function. Use `q.interp(data_arr, i_arr, -1)` instead. # return approximately data_arr[..., i_arr] """ vt = data_arr.transpose() if isinstance(i_arr, real_types): return interpolate_list(vt, i_arr).transpose() else: return np.array([interpolate_list(vt, i) for i in i_arr], data_arr.dtype).transpose() def add_jk_idx(arr): """ arr: no jk index return: add trivial jk index in the LAST axis """ return arr.reshape(arr.shape + (1,)) def jk_transpose(arr): """ arr: jk index is the 0th axis return: jk index is the last axis """ shape = arr.shape ndim = len(shape) if ndim <= 1: return arr axes = list(range(1, ndim)) + [0, ] return arr.transpose(axes) def jk_transpose_back(arr): """ jk_transpose_back(jk_transpose(arr)) == arr """ shape = arr.shape ndim = len(shape) if ndim <= 1: return arr axes = [ndim - 1, ] + list(range(0, ndim - 1)) return arr.transpose(axes)