Source code for auto_contractor.compile

#    Qlattice (https://github.com/jinluchang/qlattice)
#
#    Copyright (C) 2021
#
#    Author: Luchang Jin (ljin.luchang@gmail.com)
#    Author: Masaaki Tomii
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License along
#    with this program; if not, write to the Free Software Foundation, Inc.,
#    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

try:
    from .wick import *
    from . import auto_fac_funcs as aff
except:
    from wick import *
    import auto_fac_funcs as aff

from itertools import permutations

import qlat as q

class Var(Op):

    def __init__(self, name : str):
        Op.__init__(self, "Var")
        self.name = name

    def __repr__(self):
        return f"{self.otype}({self.name!r})"

    def list(self):
        return [self.otype, self.name]

    def __eq__(self, other) -> bool:
        return self.list() == other.list()

### ----

def get_var_name_type(x):
    """
    types include: V_S (wilson matrix), V_G (spin matrix), V_U (color matrix), V_a (c-number)
    """
    if x.startswith("V_S_"):
        return "V_S"
    elif x.startswith("V_U_"):
        return "V_U"
    elif x.startswith("V_tr_"):
        return "V_a"
    elif x.startswith("V_prod_GG_"):
        return "V_G"
    elif x.startswith("V_prod_GS_"):
        return "V_S"
    elif x.startswith("V_prod_SG_"):
        return "V_S"
    elif x.startswith("V_prod_SS_"):
        return "V_S"
    elif x.startswith("V_prod_UU_"):
        return "V_U"
    elif x.startswith("V_prod_UG_"):
        return "V_S"
    elif x.startswith("V_prod_GU_"):
        return "V_S"
    elif x.startswith("V_prod_US_"):
        return "V_S"
    elif x.startswith("V_prod_SU_"):
        return "V_S"
    else:
        assert False

def get_op_type(x):
    """
    ``x`` should be an Op
    return a string which indicate the type of the op
    """
    if not isinstance(x, Op):
        return None
    if x.otype == "S":
        return "S"
    elif x.otype == "G":
        return "G"
    elif x.otype == "U":
        return "U"
    elif x.otype == "Tr":
        return "Tr"
    elif x.otype == "Var":
        return get_var_name_type(x.name)
    else:
        assert False
    return None

def add_positions(s, x):
    """
    ``s`` is set of str
    ``x`` is expr/sub-expr
    """
    if isinstance(x, Term):
        add_positions(s, x.coef)
        for op in x.c_ops:
            add_positions(s, op)
        for op in x.a_ops:
            add_positions(s, op)
    elif isinstance(x, Op):
        if x.otype == "S":
            if isinstance(x.p1, str):
                s.add(x.p1)
            if isinstance(x.p2, str):
                s.add(x.p2)
        elif x.otype == "G":
            if isinstance(x.tag, str):
                s.add(x.tag)
        elif x.otype == "U":
            if isinstance(x.p, str):
                s.add(x.p)
            if isinstance(x.mu, str):
                s.add(x.mu)
        elif x.otype == "Tr":
            for op in x.ops:
                add_positions(s, op)
        elif x.otype == "Qfield":
            if isinstance(x.p, str):
                s.add(x.p)
    elif isinstance(x, Expr):
        for t in x.terms:
            add_positions(s, t)
    elif isinstance(x, ea.Expr):
        for t in x.terms:
            add_positions(s, t)
    elif isinstance(x, ea.Term):
        for f in x.factors:
            add_positions(s, f)
    elif isinstance(x, ea.Factor):
        for v in x.variables:
            s.add(v)

def get_positions(term):
    s = set()
    add_positions(s, term)
    return sorted(list(s))

@q.timer
def collect_position_in_cexpr(named_terms, named_exprs):
    s = set()
    for name, term in named_terms:
        add_positions(s, term)
    for name, expr_list in named_exprs:
        for coef, term_name in expr_list:
            add_positions(s, coef)
    positions = sorted(list(s))
    return positions

@q.timer
def find_common_prod_in_factors(variables_factor):
    """
    return None or (var_name_1, var_name_2,)
    """
    subexpr_count = {}
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        for t in ea_coef.terms:
            x = t.factors
            if len(x) < 1:
                continue
            for i, f in enumerate(x[:-1]):
                assert f.otype == "Var"
                f1 = x[i+1]
                assert f1.otype == "Var"
                prod = (f.code, f1.code,)
                count = subexpr_count.get(prod, 0)
                subexpr_count[prod] = count + 1
    max_num_repeat = 0
    best_match = None
    for prod, num_repeat in subexpr_count.items():
        if num_repeat > max_num_repeat:
            max_num_repeat = num_repeat
            best_match = prod
    return best_match

@q.timer
def collect_common_prod_in_factors(variables_factor, common_prod, var):
    """
    common_prod = find_common_prod_in_factors(variables_factor)
    var = ea.Factor(name, variables=[], otype="Var")
    """
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        for t in ea_coef.terms:
            x = t.factors
            if len(x) < 1:
                continue
            for i, f in enumerate(x[:-1]):
                assert f.otype == "Var"
                f1 = x[i+1]
                assert f1.otype == "Var"
                prod = (f.code, f1.code,)
                if prod == common_prod:
                    x[i] = var
                    x[i+1] = None
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        for t in ea_coef.terms:
            x = t.factors
            x_new = []
            for v in x:
                if v is not None:
                    x_new.append(v)
            t.factors = x_new

@q.timer
def find_common_sum_in_factors(variables_factor):
    """
    return None or (var_name_1, var_name_2,)
    """
    subexpr_count = {}
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        x = ea_coef.terms
        if len(x) < 1:
            continue
        for i, t in enumerate(x[:-1]):
            t1 = x[i+1]
            assert t.coef == 1
            assert len(t.factors) == 1
            assert t1.coef == 1
            assert len(t1.factors) == 1
            f = t.factors[0]
            f1 = t1.factors[0]
            assert f.otype == "Var"
            assert f1.otype == "Var"
            pair = (f.code, f1.code,)
            count = subexpr_count.get(pair, 0)
            subexpr_count[pair] = count + 1
    max_num_repeat = 0
    best_match = None
    for pair, num_repeat in subexpr_count.items():
        if num_repeat > max_num_repeat:
            max_num_repeat = num_repeat
            best_match = pair
    return best_match

@q.timer
def collect_common_sum_in_factors(variables_factor, common_pair, var):
    """
    common_pair = find_common_sum_in_factors(variables_factor)
    var = ea.Factor(name, variables=[], otype="Var")
    """
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        x = ea_coef.terms
        if len(x) < 1:
            continue
        for i, t in enumerate(x[:-1]):
            t1 = x[i+1]
            assert t.coef == 1
            assert len(t.factors) == 1
            assert t1.coef == 1
            assert len(t1.factors) == 1
            f = t.factors[0]
            f1 = t1.factors[0]
            assert f.otype == "Var"
            assert f1.otype == "Var"
            pair = (f.code, f1.code,)
            if pair == common_pair:
                x[i].factors[0] = var
                x[i+1] = None
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        x = ea_coef.terms
        x_new = []
        for v in x:
            if v is not None:
                x_new.append(v)
        ea_coef.terms = x_new

@q.timer
def collect_factor_in_cexpr(named_exprs):
    """
    collect the factors in all ea_coef
    collect common sub-expressions in ea_coef
    """
    variables_factor_intermediate = []
    variables_factor = []
    var_nameset = set()
    # Make variables to all coefs of all the terms
    var_counter = 0
    var_dataset = {}
    for _, expr in named_exprs:
        for i, (ea_coef, term_name,) in enumerate(expr):
            key = repr(ea_coef)
            if key in var_dataset:
                var = var_dataset[key]
            else:
                s_ea_coef = ea.mk_expr(ea.simplified(ea_coef))
                key2 = repr(s_ea_coef)
                if key2 in var_dataset:
                    var = var_dataset[key2]
                else:
                    while True:
                        name = f"V_factor_coef_{var_counter}"
                        var_counter += 1
                        if name not in var_nameset:
                            break
                    var_nameset.add(name)
                    variables_factor.append((name, s_ea_coef,))
                    var = ea.Factor(name, variables=[], otype="Var")
                    var_dataset[key] = var
                    var_dataset[key2] = var
            expr[i] = (ea.mk_expr(var), term_name,)
    # Add ea.Factor with otype "Expr" to variables_factor_intermediate
    var_counter = 0
    var_dataset = {} # var_dataset[factor_code] = factor_var
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        for t in ea_coef.terms:
            x = t.factors
            for i, f in enumerate(x):
                if f.otype != "Var":
                    assert f.otype == "Expr"
                    if f.code in var_dataset:
                        x[i] = var_dataset[f.code]
                    else:
                        while True:
                            name = f"V_factor_{var_counter}"
                            var_counter += 1
                            if name not in var_nameset:
                                break
                        var_nameset.add(name)
                        variables_factor_intermediate.append((name, ea.mk_expr(f),))
                        var = ea.Factor(name, f.variables)
                        x[i] = var
                        var_dataset[f.code] = var
    # Add numerical coef to variables_factor_intermediate
    var_counter = 0
    var_dataset = {} # var_dataset[factor_code] = factor_var
    for _, ea_coef in variables_factor:
        assert isinstance(ea_coef, ea.Expr)
        for t in ea_coef.terms:
            x = t.coef
            if x == 1:
                continue
            code = ea.compile_py_complex(x)
            if code in var_dataset:
                t.coef = 1
                t.factors.append(var_dataset[code])
            else:
                while True:
                    name = f"V_factor_coef_{var_counter}"
                    var_counter += 1
                    if name not in var_nameset:
                        break
                var_nameset.add(name)
                variables_factor_intermediate.append((name, ea.mk_expr(t.coef),))
                t.coef = 1
                var = ea.Factor(name, variables=[], otype="Var")
                t.factors.append(var)
                var_dataset[code] = var
    # Common product elimination
    var_counter = 0
    var_dataset = {} # var_dataset[(code1, code2,)] = factor_var
    while True:
        prod = find_common_prod_in_factors(variables_factor)
        if prod is None:
            break
        code1, code2 = prod
        var1 = ea.Factor(code1, variables=[], otype="Var")
        var2 = ea.Factor(code2, variables=[], otype="Var")
        prod_expr = ea.mk_expr(var1) * ea.mk_expr(var2)
        while True:
            name = f"V_factor_prod_{var_counter}"
            var_counter += 1
            if name not in var_nameset:
                break
        var_nameset.add(name)
        variables_factor_intermediate.append((name, prod_expr,))
        var = ea.Factor(name, variables=[], otype="Var")
        var_dataset[prod] = var
        collect_common_prod_in_factors(variables_factor, prod, var)
    # Common summation elimination
    var_counter = 0
    var_dataset = {} # var_dataset[(code1, code2,)] = factor_var
    while True:
        pair = find_common_sum_in_factors(variables_factor)
        if pair is None:
            break
        code1, code2 = pair
        var1 = ea.Factor(code1, variables=[], otype="Var")
        var2 = ea.Factor(code2, variables=[], otype="Var")
        pair_expr = ea.mk_expr(var1) + ea.mk_expr(var2)
        while True:
            name = f"V_factor_sum_{var_counter}"
            var_counter += 1
            if name not in var_nameset:
                break
        var_nameset.add(name)
        variables_factor_intermediate.append((name, pair_expr,))
        var = ea.Factor(name, variables=[], otype="Var")
        var_dataset[pair] = var
        collect_common_sum_in_factors(variables_factor, pair, var)
    return variables_factor_intermediate, variables_factor

@q.timer
def collect_prop_in_cexpr(named_terms):
    """
    collect the propagators
    modify the named_terms in-place and return the prop variable definitions as variables
    """
    variables_prop = []
    var_counter = 0
    var_dataset = {} # var_dataset[op_repr] = op_var
    var_nameset = set()
    def add_prop_variables(x):
        nonlocal var_counter
        if isinstance(x, list):
            for i, op in enumerate(x):
                if op.otype in [ "S", ]:
                    op_repr = repr(op)
                    if op_repr in var_dataset:
                        x[i] = var_dataset[op_repr]
                    else:
                        while True:
                            name = f"V_S_{var_counter}"
                            var_counter += 1
                            if name not in var_nameset:
                                break
                        var_nameset.add(name)
                        variables_prop.append((name, op,))
                        var = Var(name)
                        x[i] = var
                        var_dataset[op_repr] = var
                elif op.otype == "Tr":
                    add_prop_variables(op.ops)
        elif isinstance(x, Term):
            add_prop_variables(x.c_ops)
        elif isinstance(x, Expr):
            for t in x.terms:
                add_prop_variables(t)
    for name, term in named_terms:
        add_prop_variables(term)
        term.sort()
    return variables_prop

@q.timer
def collect_color_matrix_in_cexpr(named_terms):
    """
    collect the color matrices
    modify the named_terms in-place and return the color matrix variable definitions as variables
    """
    variables_color_matrix = []
    var_counter = 0
    var_dataset = {} # var_dataset[op_repr] = op_var
    var_nameset = set()
    def add_variables(x):
        nonlocal var_counter
        if isinstance(x, list):
            for i, op in enumerate(x):
                if op.otype in [ "U", ]:
                    op_repr = repr(op)
                    if op_repr in var_dataset:
                        x[i] = var_dataset[op_repr]
                    else:
                        while True:
                            name = f"V_U_{var_counter}"
                            var_counter += 1
                            if name not in var_nameset:
                                break
                        var_nameset.add(name)
                        variables_color_matrix.append((name, op,))
                        var = Var(name)
                        x[i] = var
                        var_dataset[op_repr] = var
                elif op.otype == "Tr":
                    add_variables(op.ops)
        elif isinstance(x, Term):
            add_variables(x.c_ops)
        elif isinstance(x, Expr):
            for t in x.terms:
                add_variables(t)
    for name, term in named_terms:
        add_variables(term)
        term.sort()
    return variables_color_matrix

@q.timer
def collect_tr_in_cexpr(named_terms):
    """
    collect common traces
    modify named_terms in-place and return definitions as variables_tr
    variables_tr = [ (name, value,), ... ]
    possible (name, value,) includes
    ("V_tr_0", op,) where op.otype == "Tr"
    """
    var_nameset = set()
    variables_tr = []
    var_counter = 0
    var_dataset_tr = {} # var_dataset[op_repr] = op_var
    def add_tr_varibles(x):
        nonlocal var_counter
        if isinstance(x, Term):
            add_tr_varibles(x.c_ops)
        elif isinstance(x, Op) and x.otype == "Tr":
            add_tr_varibles(x.ops)
        elif isinstance(x, list):
            for op in x:
                add_tr_varibles(op)
            for i, op in enumerate(x):
                if isinstance(op, Op) and op.otype == "Tr":
                    op_repr = repr(op)
                    if op_repr in var_dataset_tr:
                        x[i] = var_dataset_tr[op_repr]
                    else:
                        while True:
                            name = f"V_tr_{var_counter}"
                            var_counter += 1
                            if name not in var_nameset:
                                break
                        var_nameset.add(name)
                        variables_tr.append((name, op,))
                        var = Var(name)
                        x[i] = var
                        var_dataset_tr[op_repr] = var
    for name, term in named_terms:
        add_tr_varibles(term)
        term.sort()
    return variables_tr

@q.timer
def find_common_subexpr_in_tr(variables_tr):
    """
    return None or (op, op1,)
    """
    subexpr_count = {}
    def add(x, count_added):
        op_repr = repr(x)
        if op_repr in subexpr_count:
            c, op = subexpr_count[op_repr]
            assert x == op
            subexpr_count[op_repr] = (c + count_added, x)
        else:
            subexpr_count[op_repr] = (count_added, x)
    def find_op_pair(x):
        if len(x) < 2:
            return None
        for i, op in enumerate(x):
            factor = 1
            if len(x) > 2:
                factor = 2
            op_type = get_op_type(op)
            if op_type in [ "V_S", "V_G", "V_U", "S", "G", "U", ]:
                op1 = x[(i+1) % len(x)]
                op1_type = get_op_type(op1)
                if op1_type in [ "V_S", "V_G", "V_U", "S", "G", "U", ]:
                    prod = (op, op1,)
                    if op_type in [ "V_G", "G", ] and op1_type in [ "V_G", "G", ]:
                        add(prod, 1.02 * factor)
                    elif op_type in [ "V_U", "U", ] and op1_type in [ "V_U", "U", ]:
                        add(prod, 1.02 * factor)
                    elif op_type in [ "V_U", "U", ] and op1_type in [ "V_G", "G", ]:
                        # do not multiply U with G
                        pass
                    elif op_type in [ "V_G", "G", ] and op1_type in [ "V_U", "U", ]:
                        # do not multiply U with G
                        pass
                    elif op_type in [ "V_G", "G", ] or op1_type in [ "V_G", "G", ]:
                        add(prod, 1.01 * factor)
                    elif op_type in [ "V_U", "U", ] or op1_type in [ "V_U", "U", ]:
                        add(prod, 1.01 * factor)
                    elif op_type in [ "V_S", "S", ] and op1_type in [ "V_S", "S", ]:
                        add(prod, 1 * factor)
                    else:
                        assert False
    def find(x):
        if isinstance(x, list):
            for op in x:
                find(op)
        elif isinstance(x, Op) and x.otype == "Tr" and len(x.ops) >= 2:
            find_op_pair(x.ops)
        elif isinstance(x, Term):
            find(x.c_ops)
        elif isinstance(x, Expr):
            for t in x.terms:
                find(t)
    for name, tr in variables_tr:
        find(tr)
    max_num_repeat = 1.5
    best_match = None
    for num_repeat, op in subexpr_count.values():
        if num_repeat > max_num_repeat:
            max_num_repeat = num_repeat
            best_match = op
    return best_match

@q.timer
def collect_common_subexpr_in_tr(variables_tr, op_common, var):
    op_repr = repr(op_common)
    def replace(x):
        if x is None:
            return None
        elif isinstance(x, list):
            # need to represent the product of the list of operators
            for op in x:
                replace(op)
            if len(x) < 2:
                return None
            for i, op in enumerate(x):
                if isinstance(op, Op) and op.otype in [ "Var", "S", "G", "U", ]:
                    i1 = (i+1) % len(x)
                    op1 = x[i1]
                    if isinstance(op1, Op) and op1.otype in [ "Var", "S", "G", "U", ]:
                        prod = (op, op1,)
                        if repr(prod) == op_repr:
                            x[i1] = None
                            x[i] = var
        elif isinstance(x, Op) and x.otype == "Tr" and len(x.ops) >= 2:
            replace(x.ops)
        elif isinstance(x, Term):
            replace(x.c_ops)
        elif isinstance(x, Expr):
            for t in x.terms:
                replace(t)
    def remove_none(x):
        """
        return a None removed x
        possibly modify in-place
        """
        if isinstance(x, list):
            return [ remove_none(op) for op in x if op is not None ]
        elif isinstance(x, Op):
            if x.otype == "Tr":
                x.ops = remove_none(x.ops)
            return x
        elif isinstance(x, Term):
            x.c_ops = remove_none(x.c_ops)
            x.a_ops = remove_none(x.a_ops)
            return x
        elif isinstance(x, Expr):
            x.terms = [ remove_none(t) for t in x.terms]
            return x
        else:
            assert False
    for name, tr in variables_tr:
        replace(tr)
        remove_none(tr)

@q.timer
def collect_subexpr_in_cexpr(variables_tr):
    """
    collect common sub-expressions
    modify variables_tr in-place and return definitions as variables_prod
    variables_prod = [ (name, value,), ... ]
    possible (name, value,) includes
    ("V_prod_SG_0", [ op, op1, ],) where get_op_type(op) in [ "V_S", "S", ] and get_op_type(op1) in [ "V_G", "G", ]
    """
    var_nameset = set()
    var_counter_dict = {}
    var_counter_dict["V_prod_GG_"] = 0
    var_counter_dict["V_prod_GS_"] = 0
    var_counter_dict["V_prod_SG_"] = 0
    var_counter_dict["V_prod_SS_"] = 0
    var_counter_dict["V_prod_UU_"] = 0
    var_counter_dict["V_prod_UG_"] = 0
    var_counter_dict["V_prod_GU_"] = 0
    var_counter_dict["V_prod_US_"] = 0
    var_counter_dict["V_prod_SU_"] = 0
    variables_prod = []
    while True:
        subexpr = find_common_subexpr_in_tr(variables_tr)
        if subexpr is None:
            break
        op, op1 = subexpr
        op_type = get_op_type(op)
        op1_type = get_op_type(op1)
        assert op_type in [ "V_S", "V_G", "V_U", "S", "G", "U", ]
        assert op1_type in [ "V_S", "V_G", "V_U", "S", "G", "U", ]
        if op_type in [ "V_G", "G", ] and op1_type in [ "V_G", "G", ]:
            name_prefix = "V_prod_GG_"
        elif op_type in [ "V_G", "G", ] and op1_type in [ "V_S", "S", ]:
            name_prefix = "V_prod_GS_"
        elif op_type in [ "V_S", "S", ] and op1_type in [ "V_G", "G", ]:
            name_prefix = "V_prod_SG_"
        elif op_type in [ "V_S", "S", ] and op1_type in [ "V_S", "S", ]:
            name_prefix = "V_prod_SS_"
        elif op_type in [ "V_U", "U", ] and op1_type in [ "V_U", "U", ]:
            name_prefix = "V_prod_UU_"
        elif op_type in [ "V_U", "U", ] and op1_type in [ "V_G", "G", ]:
            name_prefix = "V_prod_UG_"
        elif op_type in [ "V_G", "G", ] and op1_type in [ "V_U", "U", ]:
            name_prefix = "V_prod_GU_"
        elif op_type in [ "V_U", "U", ] and op1_type in [ "V_S", "S", ]:
            name_prefix = "V_prod_US_"
        elif op_type in [ "V_S", "S", ] and op1_type in [ "V_U", "U", ]:
            name_prefix = "V_prod_SU_"
        else:
            assert False
        while True:
            name = f"{name_prefix}{var_counter_dict[name_prefix]}"
            var_counter_dict[name_prefix] += 1
            if name not in var_nameset:
                break
        var_nameset.add(name)
        variables_prod.append((name, subexpr,))
        var = Var(name)
        collect_common_subexpr_in_tr(variables_tr, subexpr, var)
    return variables_prod

class CExpr:

    """
    self.diagram_types
    self.positions
    self.variables_factor_intermediate
    self.variables_factor
    self.variables_prop
    self.variables_color_matrix
    self.variables_prod
    self.variables_tr
    self.named_terms
    self.named_exprs
    #
    self.named_terms[i] = (term_name, Term(c_ops, [], 1),)
    self.named_exprs[i] = (expr_name, [ (ea_coef, term_name,), ... ],)
    self.positions == sorted(list(self.positions))
    """

    def __init__(self):
        self.diagram_types = []
        self.positions = []
        self.variables_factor_intermediate = []
        self.variables_factor = []
        self.variables_prop = []
        self.variables_color_matrix = []
        self.variables_prod = []
        self.variables_tr = []
        self.named_terms = []
        self.named_exprs = []

    @q.timer
    def optimize(self):
        """
        interface function
        """
        self.collect_op()

    @q.timer
    def collect_op(self):
        """
        interface function
        Performing common sub-expression elimination
        Should be called after contract_simplify_compile(*exprs) or mk_cexpr(*exprs)
        The cexpr cannot be evaluated before collect_op!!!
        eval term factor
        """
        for name, term in self.named_terms:
            assert term.coef == 1
            assert term.a_ops == []
        checks = [
                self.variables_factor_intermediate == [],
                self.variables_factor == [],
                self.variables_prop == [],
                self.variables_color_matrix == [],
                self.variables_prod == [],
                self.variables_tr == [],
                ]
        if not all(checks):
            # likely collect_op is already performed
            return
        # collect ea_coef factors into variables
        self.variables_factor_intermediate, self.variables_factor = collect_factor_in_cexpr(self.named_exprs)
        # collect prop expr into variables
        self.variables_prop = collect_prop_in_cexpr(self.named_terms)
        # collect color matrix expr into variables
        self.variables_color_matrix = collect_color_matrix_in_cexpr(self.named_terms)
        # collect trace expr into variables
        self.variables_tr = collect_tr_in_cexpr(self.named_terms)
        # collect common prod into variables
        self.variables_prod = collect_subexpr_in_cexpr(self.variables_tr)

### ----

def increase_type_dict_count(type_dict, key):
    if key in type_dict:
        type_dict[key] += 1
    else:
        type_dict[key] = 1

def drop_tag_last_subscript(tag):
    """
    Coordinates with names that are same after dropping last subscript belong to the same permutation group.
    """
    return tag.rsplit("_", 1)[0]

def mk_permuting_dicts(pos_list):
    for p_pos_list in permutations(pos_list):
        yield dict(zip(pos_list, p_pos_list))

def get_position_permutation_groups(term):
    """
    Allow permutations within the same permutation group.
    """
    pos_list = get_positions(term)
    group_dict = dict()
    for pos in pos_list:
        g_pos = drop_tag_last_subscript(pos)
        if g_pos != pos:
            if g_pos not in group_dict:
                group_dict[g_pos] = [ pos, ]
            else:
                group_dict[g_pos].append(pos)
    return group_dict.values()

def mk_combined_permuting_dicts(term):
    """
    Produce all possible permutations allowed by permuting within the permutation groups.
    Generator of dict, each dict is a map of original coordinate and the permuted coordinate.
    """
    pos_list_list = list(get_position_permutation_groups(term))
    p_dicts_list = [ list(mk_permuting_dicts(pos_list)) for pos_list in pos_list_list ]
    def loop(level):
        if level < 0:
            yield dict()
        else:
            for d1 in loop(level - 1):
                for d2 in p_dicts_list[level]:
                    d = d1.copy()
                    d.update(d2)
                    yield d
    return loop(len(p_dicts_list) - 1)

def loop_term_ops(type_dict, ops):
    for op in ops:
        if op.otype == "S":
            # diagram type elem definition
            increase_type_dict_count(type_dict, (op.p1, op.p2,))
        elif op.otype == "Tr":
            loop_term_ops(type_dict, op.ops)

def get_term_diagram_type_info_no_permutation(term):
    type_dict = dict()
    loop_term_ops(type_dict, term.c_ops)
    return tuple(sorted(type_dict.items()))

def permute_tag(x, p_dict):
    if x in p_dict:
        return p_dict[x]
    else:
        return x

def permute_type_info_entry(x, p_dict):
    ((p1, p2,), n,) = x
    p1 = permute_tag(p1, p_dict)
    p2 = permute_tag(p2, p_dict)
    return ((p1, p2,), n,)

def permute_type_info(type_info, p_dict):
    l = [ permute_type_info_entry(e, p_dict) for e in type_info ]
    return tuple(sorted(l))

def get_term_diagram_type_info(term):
    base_type_info = get_term_diagram_type_info_no_permutation(term)
    min_type_info = base_type_info
    min_type_info_repr = str(min_type_info)
    for p_dict in mk_combined_permuting_dicts(term):
        type_info = permute_type_info(base_type_info, p_dict)
        type_info_repr = repr(type_info)
        if min_type_info is None or type_info_repr < min_type_info_repr:
            min_type_info = type_info
            min_type_info_repr = type_info_repr
    return min_type_info

def filter_diagram_type(expr, diagram_type_dict=None, included_types=None):
    """
    first: drop diagrams with diagram_type_dict[diagram_type] == None
    second:
        if included_types is None:
            return a list of a single expr with all the remaining diagrams summed together
        else:
            assert isinstance(included_types, list)
            # included_types = [ None, "Type1", [ "Type2", "Type3", ], ]
            return a list of exprs, each expr only includes the types specified in the `included_types`.
    `included_types` is a list of specs of included diagram type.
    Each spec in the list of `included_types` can be
    (1) None: means all remaining types included
    (2) a single str, with value be one diagram_type_name: means only include this type
    (3) a list of str: means include only the types listed in the list.
    """
    if diagram_type_dict is None:
        return [ expr, ]
    if included_types is None:
        included_types_list = [ None, ]
    else:
        assert isinstance(included_types, list)
        included_types_list = []
        for its in included_types:
            if its is None:
                included_types_list.append(its)
            elif isinstance(its, str):
                included_types_list.append([ its, ])
            elif isinstance(its, list):
                included_types_list.append(its)
            else:
                assert False
    expr_terms_list = [ [] for its in included_types_list ]
    for term in expr.terms:
        diagram_type = get_term_diagram_type_info(term)
        if diagram_type in diagram_type_dict:
            diagram_type_name = diagram_type_dict.get(diagram_type)
            if diagram_type_name is not None:
                for i, its in enumerate(included_types_list):
                    if (its is None) or (diagram_type_name in its):
                        expr_terms_list[i].append(term)
        else:
            for i, its in enumerate(included_types_list):
                if its is None:
                    expr_terms_list[i].append(term)
    expr_list = []
    for i, its in enumerate(included_types_list):
        if its is None:
            its_tag = ""
        else:
            its_tag = " (" + ','.join(its) + ")"
        expr_list.append(Expr(expr_terms_list[i], expr.description + its_tag))
    return expr_list

def mk_cexpr(*exprs, diagram_type_dict=None):
    """
    exprs already finished wick contraction,
    otherwise use contract_simplify_compile(*exprs, is_isospin_symmetric_limit, diagram_type_dict)
    !!!if diagram_type_dict[diagram_type] == None: this diagram_type should have already be dropped!!!
    interface function
    """
    if diagram_type_dict is None:
        diagram_type_dict = dict()
    descriptions = [ expr.show() for expr in exprs ]
    # build diagram_types and term names
    diagram_type_counter = 0
    diagram_type_term_dict = dict() # diagram_type_term_dict[repr_term] = diagram_type_name
    term_name_dict = dict() # term_name_dict[term_name] = term
    term_dict = dict() # term_dict[repr(term)] = term_name
    for expr in exprs:
        for term_coef in expr.terms:
            term = Term(term_coef.c_ops, term_coef.a_ops, 1)
            repr_term = repr(term)
            if repr_term in diagram_type_term_dict:
                continue
            diagram_type = get_term_diagram_type_info(term)
            if diagram_type not in diagram_type_dict:
                diagram_type_name = f"ADT{diagram_type_counter}" # ADT is short for "auto diagram type"
                diagram_type_counter += 1
                diagram_type_dict[diagram_type] = diagram_type_name
            diagram_type_name = diagram_type_dict[diagram_type]
            diagram_type_term_dict[repr_term] = diagram_type_name
            assert diagram_type_name is not None
            term_name_counter = 0
            while True:
                term_name = f"term_{diagram_type_name}_{term_name_counter}"
                term_name_counter += 1
                if term_name not in term_name_dict:
                    break
            term_name_dict[term_name] = term
            term_dict[repr_term] = term_name
    # name diagram_types
    diagram_types = []
    for diagram_type, diagram_type_name in diagram_type_dict.items():
        diagram_types.append((diagram_type_name, diagram_type,))
    # name terms
    named_terms = []
    for term_name, term in sorted(term_name_dict.items()):
        named_terms.append((term_name, term,))
    # name exprs
    named_exprs = []
    for i, expr in enumerate(exprs):
        expr_list = []
        typed_expr_list_dict = { name : [] for name, diagram_type in diagram_types }
        for j, term_coef in enumerate(expr.terms):
            coef = term_coef.coef
            term = Term(term_coef.c_ops, term_coef.a_ops, 1)
            repr_term = repr(term)
            diagram_type_name = diagram_type_term_dict[repr_term]
            assert diagram_type_name is not None
            term_name = term_dict[repr_term]
            typed_expr_list_dict[diagram_type_name].append((coef, term_name,))
            expr_list.append((coef, term_name,))
        named_exprs.append((f"{descriptions[i]}  exprs[{i}]", expr_list,))
    # positions
    positions = collect_position_in_cexpr(named_terms, named_exprs)
    # cexpr
    cexpr = CExpr()
    cexpr.diagram_types = diagram_types
    cexpr.positions = positions
    cexpr.named_terms = named_terms
    cexpr.named_exprs = named_exprs
    return cexpr

[docs] @q.timer def contract_simplify(*exprs, is_isospin_symmetric_limit=True, diagram_type_dict=None): """ exprs = [ expr, (expr, *included_types,), ... ]\n In case diagram_type_dict is not None, perform the following filter If diagram_type_dict[diagram_type] is None: term is removed. For (expr, *included_types,), only terms (in expr) with diagram_type that is in included_types is kept included_types should be a list/tuple of string """ def func(expr): expr = copy.deepcopy(expr) if isinstance(expr, tuple): expr, *included_types = expr elif isinstance(expr, Expr): included_types = None else: assert False expr = contract_expr(expr) expr.simplify(is_isospin_symmetric_limit=is_isospin_symmetric_limit) expr_list = filter_diagram_type( expr, diagram_type_dict=diagram_type_dict, included_types=included_types) return expr_list expr_list_list = q.parallel_map(func, exprs) expr_list = [] for el in expr_list_list: expr_list += el return expr_list
[docs] @q.timer def compile_expr(*exprs, diagram_type_dict = None): """ interface function """ exprs = copy.deepcopy(exprs) cexpr = mk_cexpr(*exprs, diagram_type_dict=diagram_type_dict) return cexpr
[docs] @q.timer def contract_simplify_compile(*exprs, is_isospin_symmetric_limit = True, diagram_type_dict = None): """ Call ``contract_simplify`` and then ``compile_expr``\n This function can be used to construct the first argument of ``cached_comipled_cexpr``. e.g. exprs = [ Qb("u", "x", s, c) * Qv("u", "x", s, c) + "u_bar*u", Qb("s", "x", s, c) * Qv("s", "x", s, c) + "s_bar*s", Qb("c", "x", s, c) * Qv("c", "x", s, c) + "c_bar*c", ] e.g. exprs = [ mk_pi_p("x2", True) * mk_pi_p("x1") + "(pi * pi)", mk_j5pi_mu("x2", 3) * mk_pi_p("x1") + "(a_pi * pi)", mk_k_p("x2", True) * mk_k_p("x1") + "(k * k )", mk_j5k_mu("x2", 3) * mk_k_p("x1") + "(a_k * k )", ] """ contracted_simplified_exprs = contract_simplify( *exprs, is_isospin_symmetric_limit=is_isospin_symmetric_limit, diagram_type_dict=diagram_type_dict) cexpr = compile_expr(*contracted_simplified_exprs, diagram_type_dict=diagram_type_dict) return cexpr
def show_variable_value(value): if isinstance(value, list): return "*".join(map(show_variable_value, value)) elif isinstance(value, Var): return f"{value.name}" elif isinstance(value, G) and value.tag in [ 0, 1, 2, 3, 5, ]: tag = { 0: "x", 1: "y", 2: "z", 3: "t", 5: "5", }[value.tag] return f"gamma_{tag}" elif isinstance(value, G): return f"gamma({value.tag})" elif isinstance(value, U): return f"U({value.tag},{value.p},{value.mu})" elif isinstance(value, S): return f"S_{value.f}({value.p1},{value.p2})" elif isinstance(value, Tr): expr = "*".join(map(show_variable_value, value.ops)) return f"tr({expr})" elif isinstance(value, Term): if value.coef == 1: return "*".join(map(show_variable_value, value.c_ops + value.a_ops)) else: return "*".join(map(show_variable_value, [ f"({value.coef})", ] + value.c_ops + value.a_ops)) elif isinstance(value, tuple) and len(value) == 2: return f"{show_variable_value(value[0])}*{show_variable_value(value[1])}" else: return f"{value}" def display_cexpr(cexpr : CExpr): """ return a string interface function """ lines = [] lines.append(f"# Begin CExpr") if cexpr.diagram_types: lines.append(f"diagram_type_dict = dict()") for name, diagram_type in cexpr.diagram_types: lines.append(f"diagram_type_dict[{diagram_type}] = {name!r}") if cexpr.positions: position_vars = ", ".join(cexpr.positions) lines.append(f"# Positions:") lines.append(f"{position_vars} = {cexpr.positions}") if cexpr.variables_prop: lines.append(f"# Variables prop:") for name, value in cexpr.variables_prop: lines.append(f"{name:<30} = {show_variable_value(value)}") if cexpr.variables_color_matrix: lines.append(f"# Variables color matrix:") for name, value in cexpr.variables_color_matrix: lines.append(f"{name:<30} = {show_variable_value(value)}") if cexpr.variables_factor_intermediate: lines.append(f"# Variables factor intermediate:") for name, value in cexpr.variables_factor_intermediate: lines.append(f"{name:<30} = {show_variable_value(value)}") if cexpr.variables_factor: lines.append(f"# Variables factor:") for name, value in cexpr.variables_factor: lines.append(f"{name:<30} = {show_variable_value(value)}") if cexpr.variables_prod: lines.append(f"# Variables prod:") for name, value in cexpr.variables_prod: lines.append(f"{name:<30} = {show_variable_value(value)}") if cexpr.variables_tr: lines.append(f"# Variables tr:") for name, value in cexpr.variables_tr: lines.append(f"{name:<30} = {show_variable_value(value)}") if cexpr.diagram_types: lines.append(f"# Diagram type coef:") for name, diagram_type in cexpr.diagram_types: if name is not None: coef_name = f"coef_{name}" lines.append(f"{coef_name:<30} = 1") if cexpr.named_terms: lines.append(f"# Named terms:") for idx, (name, term,) in enumerate(cexpr.named_terms): name_type = "_".join([ "coef", ] + name.split("_")[1:-1]) lines.append(f"{name:<30} = {name_type} * {show_variable_value(term)}") lines.append(f"terms = [ 0 for i in range({len(cexpr.named_terms)}) ]") for idx, (name, term,) in enumerate(cexpr.named_terms): lines.append(f"terms[{idx}] = {name}") if cexpr.named_exprs: lines.append(f"# Named exprs:") lines.append(f"exprs = [ 0 for i in range({len(cexpr.named_exprs)}) ]") for idx, (name, expr,) in enumerate(cexpr.named_exprs): lines.append(f"# {name}") for e in expr: lines.append(f"exprs[{idx}] += {show_variable_value(e)}") lines.append(f"# End CExpr") return "\n".join(lines) @q.timer_verbose def cexpr_code_gen_py(cexpr : CExpr, *, is_cython = True, is_distillation = False): """ return a string interface function """ gen = CExprCodeGenPy(cexpr, is_cython = is_cython, is_distillation = is_distillation) return gen.code_gen() class CExprCodeGenPy: """ self.cexpr self.is_cython self.is_distillation self.var_dict_for_factors self.lines self.indent self.total_sloppy_flops # flops per complex multiplication: 6 flops per matrix multiplication: 6 M N L + 2 M L (N-1) ==> 13536 (sc * sc), 4320 (sc * s), 480 (s * s), 3168 (sc * c), 198 (c * c) flops per trace 2 (M-1) ==> 22 (sc) flops per trace2 6 M N + 2 (M N - 1) ==> 1150 (sc, sc) """ def __init__(self, cexpr, *, is_cython = True, is_distillation = False): self.cexpr = cexpr self.is_cython = is_cython self.is_distillation = is_distillation self.var_dict_for_factors = {} self.lines = [] self.indent = 0 self.total_sloppy_flops = 0 # if self.is_distillation: assert self.is_cython == False # # self.set_var_dict_for_factors(); def code_gen(self): """ main function """ lines = self.lines append = self.append append_cy = self.append_cy append_py = self.append_py if self.is_distillation: append(f"from auto_contractor.runtime_distillation import *") else: append(f"from auto_contractor.runtime import *") append_cy(f"import cython") append_cy(f"cimport qlat_utils.everything as cc") append_cy(f"cimport qlat_utils.all as qu") append_cy(f"cimport libcpp.complex") append_cy(f"cimport numpy") self.sep() self.cexpr_function() self.sep() self.cexpr_function_get_prop() self.sep() self.cexpr_function_eval() self.sep() self.cexpr_function_eval_with_props() self.sep() self.total_flops() return "\n".join(lines) def set_var_dict_for_factors(self): """ if this function is called (currently not), factor will be referred use array and index, instead of its variable name. """ cexpr = self.cexpr self.var_dict_for_factors = {} var_dict_for_factors = self.var_dict_for_factors for idx, (name, value,) in enumerate(cexpr.variables_factor_intermediate): assert name.startswith("V_factor_") var_dict_for_factors[name] = f"factors_intermediate_view[{idx}]" for idx, (name, value,) in enumerate(cexpr.variables_factor): assert name.startswith("V_factor_") var_dict_for_factors[name] = f"factors_view[{idx}]" def gen_expr(self, x): """ return code_str, type_str """ if isinstance(x, (int, float, complex)): return f"{x}", "V_a" elif isinstance(x, (ea.Expr, ea.Factor)): return f"({ea.compile_py(x, self.var_dict_for_factors)})", "V_a" assert isinstance(x, Op) if x.otype == "S": return f"get_prop('{x.f}', {x.p1}, {x.p2})", "V_S" elif x.otype == "U": return f"get_prop('U', '{x.tag}', {x.p}, {x.mu})", "V_U" elif x.otype == "G": assert x.s1 == "auto" and x.s2 == "auto" assert x.tag in [ 0, 1, 2, 3, 5, ] or isinstance(x.tag, str) if self.is_cython: if x.tag in [ 0, 1, 2, 3, 5, ]: return f"qu.gamma_matrix_{x.tag}", "V_G" else: return f"cc.get_gamma_matrix({x.tag})", "V_G" else: return f"get_gamma_matrix({x.tag})", "V_G" elif x.otype == "Tr": if len(x.ops) == 0: assert False elif len(x.ops) == 1: c, t = self.gen_expr(x.ops[0]) assert t == "V_S" self.total_sloppy_flops += 22 if self.is_cython: return f"cc.matrix_trace({c})", "V_a" else: return f"mat_tr_wm({c})", "V_a" else: c1, t1 = self.gen_expr_prod_list(x.ops[:-1]) c2, t2 = self.gen_expr(x.ops[-1]) if t1 == "V_S" and t2 == "V_S": self.total_sloppy_flops += 1150 if self.is_cython: return f"cc.matrix_trace({c1}, {c2})", "V_a" else: return f"mat_tr_wm_wm({c1}, {c2})", "V_a" elif t1 == "V_S" and t2 == "V_G": if self.is_cython: return f"cc.matrix_trace({c1}, {c2})", "V_a" else: return f"mat_tr_wm_sm({c1}, {c2})", "V_a" elif t1 == "V_G" and t2 == "V_S": if self.is_cython: return f"cc.matrix_trace({c1}, {c2})", "V_a" else: return f"mat_tr_sm_wm({c1}, {c2})", "V_a" elif t1 == "V_S" and t2 == "V_U": if self.is_cython: return f"cc.matrix_trace({c1}, {c2})", "V_a" else: return f"mat_tr_wm_cm({c1}, {c2})", "V_a" elif t1 == "V_U" and t2 == "V_S": if self.is_cython: return f"cc.matrix_trace({c1}, {c2})", "V_a" else: return f"mat_tr_cm_wm({c1}, {c2})", "V_a" else: assert False elif x.otype == "Var": if x.name.startswith("V_S_") or x.name.startswith("V_U_"): if self.is_cython: return f"p_{x.name}[0]", get_var_name_type(x.name) else: return f"p_{x.name}", get_var_name_type(x.name) else: return f"{x.name}", get_var_name_type(x.name) def gen_expr_prod(self, ct1, ct2): c1, t1 = ct1 c2, t2 = ct2 if c1 == "(1+0j)": assert t1 == "V_a" return ct2 elif c2 == "(1+0j)": assert t2 == "V_a" return ct1 elif t1 == "V_S" and t2 == "V_S": self.total_sloppy_flops += 13536 if self.is_cython: return f"{c1} * {c2}", "V_S" else: return f"mat_mul_wm_wm({c1}, {c2})", "V_S" # elif t1 == "V_S" and t2 == "V_a": if self.is_cython: return f"{c1} * {c2}", "V_S" else: return f"mat_mul_a_wm({c2}, {c1})", "V_S" elif t1 == "V_a" and t2 == "V_S": if self.is_cython: return f"{c1} * {c2}", "V_S" else: return f"mat_mul_a_wm({c1}, {c2})", "V_S" elif t1 == "V_a" and t2 == "V_a": return f"{c1} * {c2}", "V_a" # elif t1 == "V_S" and t2 == "V_G": self.total_sloppy_flops += 4320 if self.is_cython: return f"{c1} * {c2}", "V_S" else: return f"mat_mul_wm_sm({c1}, {c2})", "V_S" elif t1 == "V_G" and t2 == "V_S": self.total_sloppy_flops += 4320 if self.is_cython: return f"{c1} * {c2}", "V_S" else: return f"mat_mul_sm_wm({c1}, {c2})", "V_S" elif t1 == "V_G" and t2 == "V_G": self.total_sloppy_flops += 480 if self.is_cython: return f"{c1} * {c2}", "V_G" else: return f"mat_mul_sm_sm({c1}, {c2})", "V_G" elif t1 == "V_G" and t2 == "V_a": if self.is_cython: return f"{c1} * {c2}", "V_G" else: return f"mat_mul_a_sm({c2}, {c1})", "V_G" elif t1 == "V_a" and t2 == "V_G": if self.is_cython: return f"{c1} * {c2}", "V_G" else: return f"mat_mul_a_sm({c1}, {c2})", "V_G" # elif t1 == "V_S" and t2 == "V_U": self.total_sloppy_flops += 3168 if self.is_cython: return f"{c1} * {c2}", "V_S" else: return f"mat_mul_wm_cm({c1}, {c2})", "V_S" elif t1 == "V_U" and t2 == "V_S": self.total_sloppy_flops += 3168 if self.is_cython: return f"{c1} * {c2}", "V_S" else: return f"mat_mul_cm_wm({c1}, {c2})", "V_S" elif t1 == "V_U" and t2 == "V_U": self.total_sloppy_flops += 198 if self.is_cython: return f"{c1} * {c2}", "V_U" else: return f"mat_mul_cm_cm({c1}, {c2})", "V_U" elif t1 == "V_U" and t2 == "V_a": if self.is_cython: return f"{c1} * {c2}", "V_U" else: return f"mat_mul_a_cm({c2}, {c1})", "V_U" elif t1 == "V_a" and t2 == "V_U": if self.is_cython: return f"{c1} * {c2}", "V_U" else: return f"mat_mul_a_cm({c1}, {c2})", "V_U" # else: print(ct1, ct2) assert False def gen_expr_prod_list(self, x_list): if len(x_list) == 0: return f"1", "V_a" elif len(x_list) == 1: return self.gen_expr(x_list[0]) else: assert len(x_list) > 1 return self.gen_expr_prod(self.gen_expr_prod_list(x_list[:-1]), self.gen_expr(x_list[-1])) def append(self, line): lines = self.lines if line == "": lines.append("") else: lines.append(self.indent * ' ' + line) def append_py(self, line): if self.is_cython: return lines = self.lines if line == "": lines.append("# Python only") else: lines.append(self.indent * ' ' + line + " # Python only") def append_cy(self, line): if not self.is_cython: return lines = self.lines if line == "": lines.append("# Cython") else: lines.append(self.indent * ' ' + line + " # Cython") def sep(self): append = self.append append(f"") append(f"### ----") append(f"") def cexpr_function(self): append = self.append append_cy = self.append_cy append_py = self.append_py append(f"@timer") append(f"def cexpr_function(*, positions_dict, get_prop, is_ama_and_sloppy=False):") self.indent += 4 append(f"# get_props") append(f"props, cms, factors = cexpr_function_get_prop(positions_dict, get_prop)") append(f"# eval") append(f"ama_val = cexpr_function_eval(positions_dict, props, cms, factors)") append(f"# extract sloppy val") append(f"val_sloppy = ama_extract(ama_val, is_sloppy = True)") append(f"# extract AMA val") append(f"val_ama = ama_extract(ama_val)") append(f"# return") append(f"if is_ama_and_sloppy:") append(f" # return both AMA corrected results and sloppy results") append(f" return val_ama, val_sloppy") append(f"else:") append(f" # return AMA corrected results by default") append(f" return val_ama") self.indent -= 4 def cexpr_function_get_prop(self): append = self.append append_cy = self.append_cy append_py = self.append_py cexpr = self.cexpr append(f"@timer_flops") append_cy(f"@cython.boundscheck(False)") append_cy(f"@cython.wraparound(False)") append(f"def cexpr_function_get_prop(positions_dict, get_prop):") self.indent += 4 append(f"# set positions") for position_var in cexpr.positions: if position_var in aff.auto_fac_funcs_list: append(f"{position_var} = aff.{position_var}") else: append(f"{position_var} = positions_dict['{position_var}']") append(f"# get prop") for name, value in cexpr.variables_prop: assert name.startswith("V_S_") x = value assert isinstance(x, Op) assert x.otype == "S" c, t = self.gen_expr(x) assert t == "V_S" # potentially be AMA prop (cannot use cdef in cython) append(f"{name} = {c}") append(f"# get color matrix") for name, value in cexpr.variables_color_matrix: assert name.startswith("V_U_") x = value assert isinstance(x, Op) assert x.otype == "U" c, t = self.gen_expr(x) assert t == "V_U" append_cy(f"cdef qu.ColorMatrix {name} = {c}") append_py(f"{name} = {c}") append(f"# set props for return") append(f"props = [") self.indent += 4 for name, value in cexpr.variables_prop: append(f"{name},") append(f"]") self.indent -= 4 append(f"# set color matrix for return") append(f"cms = [") self.indent += 4 for name, value in cexpr.variables_color_matrix: append(f"{name},") append(f"]") self.indent -= 4 append(f"# set intermediate factors") for idx, (name, value,) in enumerate(cexpr.variables_factor_intermediate): assert name.startswith("V_factor_") x = value assert isinstance(x, ea.Expr) c, t = self.gen_expr(x) assert t == "V_a" append(f"# {idx} {name}") append_cy(f"cdef cc.PyComplexD {name} = {c}") append_py(f"{name} = {c}") append(f"# declare factors") append_cy(f"cdef numpy.ndarray[numpy.complex128_t] factors") append(f"factors = np.zeros({len(cexpr.variables_factor)}, dtype=np.complex128)") append_cy(f"cdef cc.PyComplexD[:] factors_view = factors") append_py(f"factors_view = factors") append(f"# set factors") for idx, (name, value,) in enumerate(cexpr.variables_factor): assert name.startswith("V_factor_") x = value assert isinstance(x, ea.Expr) c, t = self.gen_expr(x) assert t == "V_a" append(f"# {name}") append_cy(f"cdef cc.PyComplexD {name} = {c}") append_py(f"{name} = {c}") append(f"factors_view[{idx}] = {name}") append(f"# set flops") append(f"total_flops = len(props) * 144 * 2 * 8 + len(cms) * 9 * 2 * 8 + len(factors) * 2 * 8") append(f"# return") append(f"return total_flops, (props, cms, factors,)") self.indent -= 4 def cexpr_function_eval(self): append = self.append append_cy = self.append_cy append_py = self.append_py cexpr = self.cexpr append(f"@timer_flops") append(f"def cexpr_function_eval(positions_dict, props, cms, factors):") self.indent += 4 append(f"# load AMA props with proper format") append(f"props = [ load_prop(p) for p in props ]") append(f"# join the AMA props") append(f"ama_props = ama_list(*props)") append(f"# apply eval to the factors and AMA props") append(f"ama_val = ama_apply1(lambda x_props: cexpr_function_eval_with_props(positions_dict, x_props, cms, factors), ama_props)") append(f"# set flops") append(f"total_flops = ama_counts(ama_val) * total_sloppy_flops") append(f"# return") append(f"return total_flops, ama_val") self.indent -= 4 def cexpr_function_eval_with_props(self): append = self.append append_cy = self.append_cy append_py = self.append_py cexpr = self.cexpr append(f"@timer_flops") append_cy(f"@cython.boundscheck(False)") append_cy(f"@cython.wraparound(False)") append_cy(f"def cexpr_function_eval_with_props(dict positions_dict, list props, list cms, cc.PyComplexD[:] factors_view):") append_py(f"def cexpr_function_eval_with_props(positions_dict, props, cms, factors_view):") self.indent += 4 append(f"# set positions") for position_var in cexpr.positions: if position_var in aff.auto_fac_funcs_list: append(f"{position_var} = aff.{position_var}") else: append(f"{position_var} = positions_dict['{position_var}']") append(f"# set props") for idx, (name, value,) in enumerate(cexpr.variables_prop): append_cy(f"cdef cc.WilsonMatrix* p_{name} = &(<qu.WilsonMatrix>props[{idx}]).xx") append_py(f"p_{name} = props[{idx}]") append(f"# set cms") for idx, (name, value,) in enumerate(cexpr.variables_color_matrix): append_cy(f"cdef cc.ColorMatrix* p_{name} = &(<qu.ColorMatrix>cms[{idx}]).xx") append_py(f"p_{name} = cms[{idx}]") append(f"# set factors") for idx, (name, value,) in enumerate(cexpr.variables_factor): append_cy(f"cdef cc.PyComplexD {name} = factors_view[{idx}]") append_py(f"{name} = factors_view[{idx}]") append(f"# compute products") for name, value in cexpr.variables_prod: assert name.startswith("V_prod_") x = value assert isinstance(x, tuple) assert len(x) == 2 c, t = self.gen_expr_prod_list(x) assert t == get_var_name_type(name) if t == "V_G": append_cy(f"cdef cc.SpinMatrix {name} = {c}") append_py(f"{name} = {c}") elif t == "V_S": append_cy(f"cdef cc.WilsonMatrix {name} = {c}") append_py(f"{name} = {c}") elif t == "V_U": append_cy(f"cdef cc.ColorMatrix {name} = {c}") append_py(f"{name} = {c}") else: assert False append(f"# compute traces") for name, value in cexpr.variables_tr: assert name.startswith("V_tr_") x = value assert isinstance(x, Op) assert x.otype == "Tr" c, t = self.gen_expr(x) assert t == "V_a" append_cy(f"cdef cc.PyComplexD {name} = cc.pycc_d({c})") append_py(f"{name} = {c}") append(f"# set terms") for name, term in cexpr.named_terms: x = term if x.coef == 1: c_ops = x.c_ops else: c_ops = [ x.coef, ] + x.c_ops c, t = self.gen_expr_prod_list(c_ops) append_cy(f"cdef cc.PyComplexD {name} = {c}") append_py(f"{name} = {c}") append(f"# declare exprs") append_cy(f"cdef numpy.ndarray[numpy.complex128_t] exprs") append(f"exprs = np.zeros({len(cexpr.named_exprs)}, dtype = np.complex128)") append_cy(f"cdef cc.PyComplexD[:] exprs_view = exprs") append_py(f"exprs_view = exprs") append(f"# set exprs") def show_coef_term(coef, tname): coef, t = self.gen_expr(coef) assert t == "V_a" if coef == "1": return f"{tname}" else: return f"({coef}) * {tname}" append_cy(f"cdef cc.PyComplexD expr") for idx, (name, expr,) in enumerate(cexpr.named_exprs): name = name.replace("\n", " ") append(f"# {idx} name='{name}' ") append(f"expr = 0") for coef, tname in expr: s = show_coef_term(coef, tname) append(f"expr += {s}") append(f"exprs_view[{idx}] = expr") append(f"# set flops") append(f"total_flops = total_sloppy_flops") append(f"# return") append(f"return total_flops, exprs") self.indent -= 4 def total_flops(self): append = self.append append(f"# Total flops per sloppy call is: {self.total_sloppy_flops}") append(f"total_sloppy_flops = {self.total_sloppy_flops}") #### ---- if __name__ == "__main__": expr = Qb("d", "x1", "s1", "c1") * G(5, "s1", "s2") * Qv("u", "x1", "s2", "c1") * Qb("u", "x2", "s3", "c2") * G(5, "s3", "s4") * Qv("d", "x2", "s4", "c2") print(expr) print() expr = simplified(contract_expr(expr)) print(expr) print() cexpr = copy.deepcopy(mk_cexpr(expr)) print(cexpr) print() cexpr.optimize() print(cexpr) print() print(display_cexpr(cexpr)) print() print(expr) print() cexpr = contract_simplify_compile(expr, is_isospin_symmetric_limit = True) print(display_cexpr(cexpr)) print() cexpr.optimize() print(display_cexpr(cexpr)) print(cexpr_code_gen_py(cexpr))