Source code for qlat_utils.data

from .rng_state import *
from .timer import *

import math
import copy
import functools
import numpy as np

alpha_qed = 1.0 / 137.035999084
fminv_gev = 0.197326979 # hbar * c / (1e-15 m * 1e9 electron charge * 1 volt)

float_types = (float, np.float32, np.float64,)
complex_types = (complex, np.complex64, np.complex128,)
int_types = (int, np.int32, np.int64,)

try:
    float_types = float_types + (np.float128,)
    complex_types = complex_types + (np.complex256,)
except:
    pass

real_types = float_types + int_types
number_types = real_types + complex_types

class use_kwargs:

    """
    self.default_kwargs
    self.keys
    """

    def __init__(self, default_kwargs, keys = None):
        self.default_kwargs = default_kwargs
        self.keys = None

    def __call__(self, func):
        @functools.wraps(func)
        def f(*args, **kwargs):
            if "is_default_kwargs_applied" not in kwargs:
                d = self.default_kwargs.copy()
                d.update(kwargs)
                kwargs = d
            if self.keys is not None:
                kwargs = { k: kwargs[k] for k in self.keys }
            return func(*args, **kwargs)
        return f

###

def interpolate_list(v, i):
    size = len(v)
    i1 = math.floor(i)
    assert i1 >= 0
    i2 = i1 + 1
    if i2 >= size:
        return v[size - 1]
    elif i1 < 0:
        return v[0]
    v1 = v[i1]
    v2 = v[i2]
    a1 = i2 - i
    a2 = i - i1
    return a1 * v1 + a2 * v2

def interpolate(v_arr, i_arr):
    vt = v_arr.transpose()
    if isinstance(i_arr, real_types):
        return interpolate_list(vt, i_arr).transpose()
    else:
        return np.array([ interpolate_list(vt, i) for i in i_arr ], v_arr.dtype).transpose()

def partial_sum_list(x, *, is_half_last = False):
    """Modify in-place, preserve length"""
    s = 0
    for i, v in enumerate(x):
        sp = s
        s += v
        if is_half_last:
            x[i] = (s + sp) / 2
        else:
            x[i] = s

def partial_sum(x, *, is_half_last = False):
    """Modify in-place, preserve length"""
    shape = x.shape
    if len(shape) == 0:
        return
    elif len(shape) == 1:
        partial_sum_list(x, is_half_last = is_half_last)
    elif len(shape) == 2:
        for v in x:
            partial_sum_list(v, is_half_last = is_half_last)
    else:
        assert False

def check_zero(x):
    if isinstance(x, real_types) and 0 == x:
        return True
    return False

def qnorm(x):
    """
    qnorm(2) == 4
    """
    if isinstance(x, np.ndarray):
        return np.abs(np.vdot(x, x))
    elif isinstance(x, real_types):
        return x * x
    elif isinstance(x, complex_types):
        return x.real * x.real + x.imag * x.imag
    elif isinstance(x, (list, tuple,)):
        return sum([ qnorm(x_i) for x_i in x ])
    else:
        return x.qnorm()
    assert False

class Data:

    def __init__(self, val):
        """
        # supported value types:
        # numeric
        # numpy.array
        # q.LatData
        # list
        """
        if isinstance(val, Data):
            self.val = val.val
            assert not isinstance(self.val, Data)
        else:
            self.val = val

    def __str__(self):
        return f"Data({self.val})"

    def get_val(self):
        return self.val

    def __copy__(self):
        return Data(copy.copy(self.val))

    def __deepcopy__(self, memo):
        return Data(copy.deepcopy(self.val, memo))

    def __add__(self, other):
        if isinstance(other, Data):
            if check_zero(self.val):
                return other
            elif check_zero(other.val):
                return self
            elif isinstance(self.val, list) and isinstance(other.val, list):
                assert len(self.val) == len(other.val)
                return Data([ v1 + v2 for v1, v2 in zip(self.val, other.val) ])
            elif isinstance(self.val, list):
                return Data([ v + other.val for v in self.val ])
            elif isinstance(other.val, list):
                return Data([ self.val + v for v in other.val ])
            else:
                return Data(self.val + other.val)
        else:
            return self + Data(other)

    def __radd__(self, other):
        if isinstance(other, Data):
            assert False
            return None
        else:
            return Data(other) + self

    def __mul__(self, other):
        if isinstance(other, Data):
            if check_zero(self.val) or check_zero(other.val):
                return Data(0)
            elif isinstance(self.val, list) and isinstance(other.val, list):
                return Data([ v1 * v2 for v1, v2 in zip(self.val, other.val) ])
            elif isinstance(self.val, list):
                return Data([ v * other.val for v in self.val ])
            elif isinstance(other.val, list):
                return Data([ self.val * v for v in other.val ])
            return Data(self.val * other.val)
        else:
            return self * Data(other)

    def __rmul__(self, other):
        if isinstance(other, Data):
            assert False
            return None
        else:
            return Data(other) * self

    def __neg__(self):
        if check_zero(self.val):
            return Data(0)
        elif isinstance(self.val, list):
            return Data([ -v for v in self.val ])
        else:
            return Data(-self.val)

    def __pos__(self):
        return self

    def __sub__(self, other):
        if isinstance(other, Data):
            if check_zero(self.val):
                return Data(-other.val)
            elif check_zero(other.val):
                return self
            elif isinstance(self.val, list) and isinstance(other.val, list):
                return Data([ v1 - v2 for v1, v2 in zip(self.val, other.val) ])
            elif isinstance(self.val, list):
                return Data([ v - other.val for v in self.val ])
            elif isinstance(other.val, list):
                return Data([ self.val - v for v in other.val ])
            else:
                return Data(self.val - other.val)
        else:
            return self - Data(other)

    def __rsub__(self, other):
        if isinstance(other, Data):
            assert False
            return None
        else:
            return Data(other) - self

    def qnorm(self):
        return qnorm(self.val)

    def glb_sum(self):
        from qlat.mpi import glb_sum
        return Data(glb_sum(self.val))

###

def add_jk_idx(arr):
    """
    arr: no jk index
    return: add trivial jk index in the LAST axis
    """
    return arr.reshape(arr.shape + (1,))

def jk_transpose(arr):
    """
    arr: jk index is the 0th axis
    return: jk index is the last axis
    """
    shape = arr.shape
    ndim = len(shape)
    if ndim <= 1:
        return arr
    axes = list(range(1, ndim)) + [ 0, ]
    return arr.transpose(axes)

def jk_transpose_back(arr):
    """
    jk_transpose_back(jk_transpose(arr)) == arr
    """
    shape = arr.shape
    ndim = len(shape)
    if ndim <= 1:
        return arr
    axes = [ ndim - 1, ] + list(range(0, ndim - 1))
    return arr.transpose(axes)

def average(data_list):
    n = len(data_list)
    v = sum(data_list)
    return 1/n * v

def average_ignore_nan(value_arr_list):
    if len(value_arr_list) == 0:
        return None
    shape = value_arr_list[0].shape
    dtype = value_arr_list[0].dtype
    count_arr = np.zeros(shape, dtype=np.int64)
    sum_arr = np.zeros(shape, dtype=dtype)
    for v_arr in value_arr_list:
        assert v_arr.shape == shape
        assert v_arr.dtype == dtype
        sel = ~np.isnan(v_arr)
        count_arr[sel] += 1
        sum_arr[sel] += v_arr[sel]
    avg_arr = np.zeros(shape, dtype=dtype)
    sel = count_arr > 0
    avg_arr[sel] = sum_arr[sel] / count_arr[sel]
    avg_arr[~sel] = np.nan
    return avg_arr

def block_data(data_list, block_size, is_overlapping=True):
    """
    return the list of block averages
    the blocks may overlap if is_overlapping == True
    """
    if block_size == 1:
        return data_list
    assert block_size >= 1
    size = len(data_list)
    if block_size >= size:
        return [ average(data_list), ]
    blocks = []
    start = 0
    stop = block_size
    while stop <= size:
        b = average(data_list[start:stop])
        blocks.append(b)
        if is_overlapping:
            start += 1
            stop += 1
        else:
            start += block_size
            stop += block_size
    return blocks

def avg_err(data_list, eps=1, *, block_size=1):
    avg = average(data_list)
    blocks = block_data(data_list, block_size)
    diff_sqr = average([ fsqr(d - avg) for d in blocks ])
    fac = abs(eps) * math.sqrt(block_size / (len(data_list) - 1))
    err = fac * fsqrt(diff_sqr)
    return (avg, err,)

def jackknife(data_list, eps=1):
    """
    Return jk[i] = avg - \\frac{eps}{N} (v[i] - avg)
    normal jackknife uses eps=1, scale the fluctuation by eps
    """
    is_np_arr = isinstance(data_list, np.ndarray)
    data_list_real = [ d for d in data_list if d is not None ]
    n = len(data_list_real)
    fac = eps / n
    avg = average(data_list_real)
    jks = [ avg, ]
    for data in data_list:
        if data is None:
            jks.append(avg)
        else:
            jks.append(avg - fac * (data - avg))
    if is_np_arr:
        jks = np.array(jks, dtype=data_list.dtype)
    return jks

def fsqr(data):
    if isinstance(data, real_types):
        return data * data
    elif isinstance(data, complex_types):
        r = data.real
        i = data.imag
        return complex(r * r, i * i)
    elif isinstance(data, Data):
        return Data(fsqr(data.val))
    else:
        # Assuming np.ndarray like object
        if data.dtype in real_types:
            return np.square(data)
        elif data.dtype in complex_types:
            return np.square(data.real) + 1j * np.square(data.imag)
        else:
            raise Exception(f"fsqr data={data} type not supported")

def fsqrt(data):
    if isinstance(data, real_types):
        return math.sqrt(data)
    elif isinstance(data, complex_types):
        r = data.real
        i = data.imag
        return complex(math.sqrt(r), math.sqrt(i))
    elif isinstance(data, Data):
        return Data(fsqrt(data.val))
    else:
        # Assuming np.ndarray like object
        if data.dtype in real_types:
            return np.sqrt(data)
        elif data.dtype in complex_types:
            return np.sqrt(data.real) + 1j * np.sqrt(data.imag)
        else:
            raise Exception(f"fsqr data={data} type not supported")

def jk_avg(jk_list):
    is_np_arr = isinstance(jk_list, np.ndarray)
    val = jk_list[0]
    if is_np_arr and val.size == 1:
        return val.item()
    else:
        return val

def jk_err(jk_list, eps=1, *, block_size=1):
    """
    Return \\frac{1}{eps} \\sqrt{ \\sum_{i=1}^N (jk[i] - jk_avg)^2 } when block_size=1
    Note: len(jk_list) = N + 1
    Same eps as the eps used in the 'jackknife' function
    """
    is_np_arr = isinstance(jk_list, np.ndarray)
    avg = jk_avg(jk_list)
    blocks = block_data(jk_list[1:], block_size)
    diff_sqr = average([ fsqr(jk - avg) for jk in blocks ])
    fac = math.sqrt(block_size * (len(jk_list) - 1)) / abs(eps)
    val = fac * fsqrt(diff_sqr)
    if is_np_arr and val.size == 1:
        return val.item()
    else:
        return val

def jk_avg_err(jk_list, eps=1, *, block_size=1):
    return jk_avg(jk_list), jk_err(jk_list, eps, block_size=block_size)

def merge_jk_idx(*jk_idx_list):
    for jk_idx in jk_idx_list:
        assert jk_idx[0] == "avg"
    return [ "avg", ] + [ idx for jk_idx in jk_idx_list for idx in jk_idx[1:] ]

def rejk_list(jk_list, jk_idx_list, all_jk_idx):
    """
    super jackknife
    """
    assert jk_idx_list[0] == "avg"
    assert all_jk_idx[0] == "avg"
    assert len(jk_idx_list) == len(jk_list)
    assert len(jk_idx_list) <= len(all_jk_idx)
    is_np_arr = isinstance(jk_idx_list, np.ndarray)
    jk_avg = jk_list[0]
    size_new = len(all_jk_idx)
    i_new = 0
    jk_list_new = []
    for i, idx in enumerate(jk_idx_list):
        while all_jk_idx[i_new] != idx:
            jk_list_new.append(jk_avg)
            i_new += 1
            assert i_new < size_new
        jk_list_new.append(jk_list[i])
        i_new += 1
    while i_new < size_new:
        jk_list_new.append(jk_avg)
        i_new += 1
    assert i_new == size_new
    assert size_new == len(jk_list_new)
    if is_np_arr:
        jk_list_new = np.array(jk_list_new, dtype=jk_list.dtype)
    return jk_list_new

# ----------

def mk_jk_blocking_func(block_size=1, block_size_dict=None, all_jk_idx_set=None):
    """
    block_size_for_this_job_tag = block_size_dict.get(job_tag, block_size)
    """
    if block_size_dict is None:
        block_size_dict = dict()
    def jk_blocking_func(jk_idx):
        if all_jk_idx_set is not None:
            all_jk_idx_set.add(jk_idx)
        if isinstance(jk_idx, int_types):
            traj = jk_idx
            return traj // block_size
        elif isinstance(jk_idx, tuple) and len(jk_idx) == 2 and isinstance(jk_idx[1], int_types):
            job_tag, traj = jk_idx
            assert isinstance(job_tag, str)
            assert isinstance(traj, int_types)
            block_size_for_this_job_tag = block_size_dict.get(job_tag, block_size)
            assert isinstance(block_size_for_this_job_tag, int_types)
            return (job_tag, traj // block_size_for_this_job_tag,)
        else:
            return jk_idx
    return jk_blocking_func

@timer
def rjk_jk_list(jk_list, jk_idx_list, n_rand_sample, rng_state, jk_blocking_func=None, is_normalizing_rand_sample=True, is_use_old_rand_alg=False):
    """
    return rjk_list
    len(rjk_list) == 1 + n_rand_sample
    distribution of rjk_list should be similar as the distribution of avg
    r_{i,j} ~ N(0, 1)
    avg = jk_avg(jk_list)
    len(jk_list) = n + 1
    rjk_list[i] = avg + \\sum_{j=1}^{n} r_{i,j} (jk_list[j] - avg)
    #
    jk_blocking_func(jk_idx) => blocked jk_idx
    """
    assert jk_idx_list[0] == "avg"
    assert isinstance(n_rand_sample, int_types)
    assert n_rand_sample >= 0
    assert isinstance(rng_state, RngState)
    is_np_arr = isinstance(jk_idx_list, np.ndarray)
    rs = rng_state
    n = len(jk_list) - 1
    if jk_blocking_func is None:
        blocked_jk_idx_list = jk_idx_list
    else:
        blocked_jk_idx_list = [ jk_blocking_func(idx) for idx in jk_idx_list[:] ]
    assert len(blocked_jk_idx_list[1:]) == n
    r_arr = np.empty((n_rand_sample, n,), dtype=np.float64)
    if is_use_old_rand_alg:
        assert not is_normalizing_rand_sample
        for i in range(n_rand_sample):
            rsi = rs.split(str(i))
            r = [ rsi.split(str(idx)).g_rand_gen() for idx in blocked_jk_idx_list[1:] ]
            for j in range(n):
                r_arr[i, j] = r[j]
    else:
        r_arr_dict = dict()
        for jk_idx in blocked_jk_idx_list[1:]:
            jk_idx_str = str(jk_idx)
            if jk_idx_str in r_arr_dict:
                continue
            rsi = rs.split(str(jk_idx))
            garr = rsi.g_rand_arr(n_rand_sample)
            if is_normalizing_rand_sample:
                garr_qnorm = qnorm(garr) # garr_qnorm \approx n_rand_sample
                garr = garr * np.sqrt(n_rand_sample / garr_qnorm)
                assert abs(qnorm(garr) / n_rand_sample - 1) < 1e-8
            r_arr_dict[jk_idx_str] = garr
        for j, jk_idx in enumerate(blocked_jk_idx_list[1:]):
            jk_idx_str = str(jk_idx)
            r_arr[:, j] = r_arr_dict[jk_idx_str]
    avg = jk_avg(jk_list)
    if is_np_arr:
        jk_arr = jk_list
        jk_diff = jk_arr[1:] - avg
        rjk_arr = np.empty((1 + n_rand_sample, *avg.shape,), dtype=jk_arr.dtype)
        rjk_arr[:] = avg
        for j in range(n):
            for i in range(n_rand_sample):
                rjk_arr[i + 1] += r_arr[i, j] * jk_diff[j]
        return rjk_arr
    else:
        rjk_list = [ avg, ]
        jk_diff = [ jk_list[j] - avg for j in range(1, n + 1) ]
        for i in range(n_rand_sample):
            rjk_list.append(avg + sum([ r_arr[i, j] * jk_diff[j] for j in range(n) ]))
        return rjk_list

@timer
def rjk_mk_jk_val(rs_tag, val, err, n_rand_sample, rng_state):
    """
    return rjk_list
    n = n_rand_sample
    len(rjk_list) == 1 + n
    rjk_list[i] = val + err * r[i] for i in 1..n
    where r[i] ~ N(0, 1)
    """
    assert n_rand_sample >= 0
    assert isinstance(rng_state, RngState)
    assert isinstance(val, real_types)
    assert isinstance(err, real_types)
    rs = rng_state.split(str(rs_tag))
    rjk_arr = np.zeros((n_rand_sample + 1,), dtype=np.float64)
    rjk_arr[0] = val
    r_arr = rs.g_rand_arr((n_rand_sample,))
    rjk_arr[1:] = val + r_arr * err
    return rjk_arr

def rjackknife(data_list, jk_idx_list, n_rand_sample, rng_state, *, eps=1):
    jk_list = jackknife(data_list, eps)
    return rjk_jk_list(jk_list, jk_idx_list, n_rand_sample, rng_state)

def rjk_avg(rjk_list):
    return jk_avg(rjk_list)

def rjk_err(rjk_list, eps=1):
    """Return \\frac{1}{eps} \\sqrt{ \\frac{1}{N} \\sum_{i=1}^N (jk[i] - jk_avg)^2 }
    Note: len(jk_list) = N + 1
    Same eps as the eps used in the 'jackknife' function"""
    n = len(rjk_list) - 1
    return jk_err(rjk_list, abs(eps) * np.sqrt(n))

def rjk_avg_err(rjk_list, eps=1):
    return rjk_avg(rjk_list), rjk_err(rjk_list, eps)

# ----------

default_g_jk_kwargs = {}

default_g_jk_kwargs["jk_type"] = "super"  # choices: "rjk", "super"
default_g_jk_kwargs["eps"] = 1

# for jk_type = "super"
default_g_jk_kwargs["all_jk_idx"] = None
default_g_jk_kwargs["get_all_jk_idx"] = None

# for jk_type = "rjk"
default_g_jk_kwargs["n_rand_sample"] = 1024
default_g_jk_kwargs["rng_state"] = RngState("rejk")
default_g_jk_kwargs["is_normalizing_rand_sample"] = True
default_g_jk_kwargs["is_use_old_rand_alg"] = False # only need to reproduce old results (need is_normalizing_rand_sample == False)

# jk_blocking_func(jk_idx) => blocked jk_idx
# jk_blocking_func = mk_jk_blocking_func(block_size=1, block_size_dict=None, all_jk_idx_set=None)
default_g_jk_kwargs["jk_blocking_func"] = None

[docs] @use_kwargs(default_g_jk_kwargs) @timer def g_jk(data_list, *, eps, **_kwargs): """ Perform initial Jackknife for the original data set.\n """ return jackknife(data_list, eps)
[docs] @use_kwargs(default_g_jk_kwargs) @timer def g_rejk(jk_list, jk_idx_list, *, jk_type, all_jk_idx, get_all_jk_idx, n_rand_sample, rng_state, jk_blocking_func, is_normalizing_rand_sample, is_use_old_rand_alg, **_kwargs,): """ Perform (randomized) Super-Jackknife for the Jackknife data set. :jk_list: usually the Jackknife data set obtained with ``g_jk(data_list)``. :jk_idx_list: should be list of indices that names the ``jk_list``. :jk_type: [ "rjk", "super", ]``\n :returns: (randomized) Super-Jackknife data set. Note that::\n len(jk_list) == len(jk_idx_list) jk_idx_list[0] == "avg" Example """ if jk_type == "super": if jk_blocking_func is not None: displayln_info(f"g_rejk: jk_type={jk_type} does not support jk_blocking_func={jk_blocking_func}") if all_jk_idx is None: assert get_all_jk_idx is not None all_jk_idx = get_all_jk_idx() return rejk_list(jk_list, jk_idx_list, all_jk_idx) elif jk_type == "rjk": return rjk_jk_list(jk_list, jk_idx_list, n_rand_sample, rng_state, jk_blocking_func, is_normalizing_rand_sample, is_use_old_rand_alg) else: assert False return None
[docs] @use_kwargs(default_g_jk_kwargs) @timer def g_mk_jk_val(rs_tag, val, err, *, jk_type, n_rand_sample, rng_state, **_kwargs): """ Create a jackknife sample with random numbers based on central value ``val`` and error ``err``.\n Need::\n default_g_jk_kwargs["jk_type"] = "rjk" default_g_jk_kwargs["n_rand_sample"] = n_rand_sample # e.g. n_rand_sample = 1024 default_g_jk_kwargs["rng_state"] = rng_state # e.g. rng_state = RngState("rejk") """ assert jk_type == "rjk" return rjk_mk_jk_val(rs_tag, val, err, n_rand_sample, rng_state)
[docs] def g_jk_avg(jk_list): """ Return ``avg`` of the ``jk_list``. """ if isinstance(jk_list, number_types): return jk_list return jk_avg(jk_list)
[docs] @use_kwargs(default_g_jk_kwargs) def g_jk_err(jk_list, *, eps, jk_type, **_kwargs): """ Return ``err`` of the ``jk_list``. """ if isinstance(jk_list, number_types): return 0 if jk_type == "super": return jk_err(jk_list, eps) elif jk_type == "rjk": return rjk_err(jk_list, eps) else: assert False return None
[docs] @timer def g_jk_avg_err(jk_list, **kwargs): """ Return ``(avg, err,)`` of the ``jk_list``. """ return g_jk_avg(jk_list), g_jk_err(jk_list, **kwargs)
@timer def g_jk_avg_err_arr(jk_list, **kwargs): avg, err = g_jk_avg_err(jk_list, **kwargs) avg_err_arr = np.stack([ avg, err, ]) avg_err_arr = np.moveaxis(avg_err_arr, 0, -1).copy() return avg_err_arr
[docs] @use_kwargs(default_g_jk_kwargs) def g_jk_size(**kwargs): """ Return number of samples for the (randomized) Super-Jackknife data set. """ jk_type = kwargs["jk_type"] all_jk_idx = kwargs["all_jk_idx"] get_all_jk_idx = kwargs["get_all_jk_idx"] n_rand_sample = kwargs["n_rand_sample"] # jk_type in [ "rjk", "super", ] if jk_type == "super": if all_jk_idx is None: assert get_all_jk_idx is not None all_jk_idx = get_all_jk_idx() return len(all_jk_idx) elif jk_type == "rjk": return 1 + n_rand_sample else: assert False return None
[docs] @use_kwargs(default_g_jk_kwargs) def g_jk_blocking_func(idx, *, jk_blocking_func, **_kwargs): """ Return ``jk_blocking_func(idx)``. """ if jk_blocking_func is None: return idx else: return jk_blocking_func(idx)