# for data processing and matrix computations
import numpy as np
import pandas as pd
# for computing p-values for hypothesis testing
from scipy import stats
def CVAR(data, p, ordering=None, JT=None):
"""
Fits a Causal VAR(p) model and reports the related matrices in package as a
dict. Valid keys for the resulting dictionary include:
'block covariance', 'partial correlations', 'K', 'K-original' (only exists
when restricted == True), 'A', 'B', and 'Delta'
For details, please refer to the paper.
Parameters
----------
data: array_like
A multi-dimensional time series.
p: int
A positive integer indicating the order of the desired VAR model.
ordering: list
An ordered list specifying the desired variable ordering for the data
JT: dict, default None
A pseudo-graph object that describes the junction tree structure based on
a graph genereated with contemporaneous variables in the data.
Setting JT to a non-None value will enable covariance selection, resulting
in a restricted model (zeros will be set to the corresponding positions in
the base of the given junction tree).
Example:
>> JT_Fin_VAR1={'cliques':
[['BVSP', 'EM', 'NIK'],
['BVSP', 'DAX', 'EM', 'FTSE', 'ISE', 'SP'],
['BVSP', 'DAX', 'EU', 'FTSE', 'ISE']],
'separators':
[['BVSP', 'EM'],
['BVSP', 'DAX', 'FTSE', 'ISE']]}
Returns
-------
dict
A collection of important matrices for the causal VAR model stored as
{'matrix_name' : matrix} pairs.
"""
if ordering is not None:
data = data.reindex(columns=ordering)
else:
ordering=data.columns
X = np.array(data)
n, d = X.shape
# For VAR(p), we need the block Toeplitz covariance matrix C_{p+1}
CC = block_covariance(data, p+1, keep_label=True)
K = pd.DataFrame(np.linalg.inv(CC), columns=CC.columns, index=CC.columns)
cond_C_inv = K.iloc[0:d, 0:d]
if JT is not None:
# conduct covariance selection and build a restricted CVAR model
K_hat = cov_selection(data, p, JT, keep_label=True)
L, D = block_LDL(K_hat, p, d)
return {
'block covariance': CC,
'partial correlations': partial_cor(cond_C_inv, keep_label=True),
'K': K_hat,
'K-original': K,
'A': pd.DataFrame(L[:d,:d].T, columns=ordering, index=ordering),
'B': pd.DataFrame(L[d:,:d].T, columns=CC.columns[d:], index=ordering),
'Delta': np.linalg.inv(D[:d,:d])
}
# otherwise, build an unrestricted CVAR model
L, D = block_LDL(K, p, d)
return {
'block covariance': CC,
'partial correlations': partial_cor(cond_C_inv, keep_label=True),
'K': K,
'A': pd.DataFrame(L[:d,:d].T, columns=ordering, index=ordering),
'B': pd.DataFrame(L[d:,:d].T, columns=CC.columns[d:], index=ordering),
'Delta': np.linalg.inv(D[:d,:d])
}
# Section 1: functions for analyzing multi-variate time series data;
## primarily used to construct graphs for our casual VAR model along with the
## NetworkX package.
def covariance_func(data, h, keep_label=False):
"""
Computes the ergodic estimate for C(h) (the covariance function for the
VAR model, see p.4 & 16 of paper ver.0727_2021)
Parameters
----------
data: array_like
A multi-dimensional time series.
h: an integer
An integer, can be negative.
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
C: np.array (default) or pd.DataFrame
A d x d matrix (with the same variable labelling as data) that serves as the
the ergodic estimate for C(h) (pg. 4 & 16 of paper ver.0727_2021).
"""
if keep_label and (type(data) is not pd.DataFrame):
raise TypeError("data (first argument) has to be a pd.DataFrame"
" in order to carry variable labels.")
X = np.array(data)
n, d = X.shape
h_abs = abs(h) # in case when h is negative
# compute Xbar, a row vector containing the sample means for each variable.
Xbar = np.zeros((1,d))
for i in range(d):
Xbar[0, i] = np.mean(X[: , i])
# construct C based on page 16 of paper (ver.0727_2021)
C = np.zeros((d, d))
for i in range(n - h_abs):
Z0 = np.transpose(X[i, :] - Xbar) # column vector (temp)
Z1 = X[i + h_abs, :] - Xbar # row vector (temp)
C = C + np.matmul(Z0, Z1) # running sum of outer products
C = np.true_divide(C, n) # divde every entry in C by the sample size n.
# return C^T(h) if h is negative (see page 4, ver.0727_2021, above Eq(4))
if h < 0:
return C.T
# otherwise, return C(h) (h is positive).
return C
def block_covariance(data, k, keep_label=False):
"""
Computes the block Toeplitz covairnce matrix C_k (intended for VAR(k-1)) using
the values of the covariance function C(h).
Parameters
----------
data: array_like
A multi-dimensional time series
k: an integer
A positive integer.
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
CC, np.array (default) or pd.DataFrame
CC is a symmetric, positive definite block Toeplitz matrix with
p x p blocks which are d x d covariance matrices C(h)'s.
Remarks
-------
CC is the concentration matrix K that should be used for the VAR(k-1) model.
To avoid off-by-one error use the concentration_mat() function below.
"""
if keep_label and (type(data) is not pd.DataFrame):
raise TypeError("data (first argument) has to be a pd.DataFrame"
" in order to carry variable labels.")
X = np.array(data)
n, d = data.shape
# construct the block Toeplitz covariance matrix row-by-row, block-by-block
CC = []
for i in range(k):
row_block_vec = []
for j in range(k):
h = i - j
row_block_vec.append( covariance_func(data, h) )
CC.append(row_block_vec) # add the next row of blocks to CC
CC = np.block(CC) # "unwrap" the blocks using np.block()
if keep_label:
# extend the labels to fit the matrices for VAR(k-1)
labels = extend_labels(data, k-1)
return pd.DataFrame(CC, index = labels, columns = labels)
return pd.DataFrame(CC)
def concentration_mat(data, p, keep_label=False):
"""
Computes the concentration matrix K, inverse of the block Toeplitz co-
variance matrix, for the VAR(p) model.
Parameters
----------
data: array_like
A multi-dimensional time series
p: an integer
A positive integer indicating the order of the desired VAR model.
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
K: np.array (default) or pd.DataFrame
K is the inverse of the symmetric, positive definite block Toeplitz
matrix with p x p blocks which are d x d covariance matrices C(h)'s.
The upper left d x d block of K is the inverse of the conditional
covariance matrix C(p|1,..., p-1).
"""
CC = block_covariance(data, p+1, keep_label)
if keep_label:
return pd.DataFrame(np.linalg.inv(CC), index=CC.columns, columns=CC.columns)
return np.linalg.inv(CC)
def inv_cond_cov(data, p, keep_label=False):
"""
Computes the conditional covariance matrix C(p|1,...,p) using the K matrix
(i.e., inverse of the block covariance matrix) (maybe inefficient)
Parameters
----------
data: array_like
A multi-dimensional time series
p: an integer
A positive integer indicating the order of the desired VAR model.
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
matrix: np.array (default) or pd.DataFrame
The inverse of the conditional covariance matrix C(p|1,..., p-1) for the
time series data.
"""
n,d = data.shape
print(d)
K = concentration_mat(data, p, keep_label)
if keep_label:
return K.iloc[0:d, 0:d]
return K[0:d, 0:d]
def partial_cor(C_inv, keep_label=False):
"""
Computes the partial correlation matrix based on the concentration matrix K
using the formula on page 18 (bottom) of paper (ver.0727_2021).
Parameters
----------
C_inv: array_like
A conentration matrix (inverse of a covariance matrix)
p: an integer
A positive integer indicating the order of the desired VAR model.
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
rr: np.array (default) or pd.DataFrame
rr is a d x d matrix (same shape as K for the VAR model).
rr[i][j] is the partial correlation coefficient between X_i and X_j (as
specified in K) given all other variables (ceteris paribus). As a result,
rr keeps the same varaible ordering as the corresponding covariance matrix
(i.e., inverse of K).
"""
if keep_label and (type(C_inv) is not pd.DataFrame):
raise TypeError("C_inv (first argument) has to be a pd.DataFrame"
" in order to carry variable labels.")
K = np.array(C_inv)
_, d = K.shape
rr = np.zeros((d,d))
for i in range(d):
for j in range(d):
if i == j:
rr[i,j]= np.nan # diagonal entry: undefined
else:
rr[i,j] = np.true_divide(-K[i,j], np.sqrt(K[i,i]*K[j,j]))
if keep_label:
return pd.DataFrame(rr, index=C_inv.columns, columns=C_inv.columns)
return rr
def check_RZP(adj_matrix):
"""
Checks if an adjacency matrix has a reducible zero pattern.
"""
M = np.array(adj_matrix)
nrow, ncol = M.shape
for j in range(nrow):
for i in range(ncol):
if (j < i) and (M[j,i] == 0):
for h in range(j):
if (M[h,j] == 1) and (M[h,i] == 1):
return False
return True
def get_tt(rr):
"""
Computes the corresponding t-statistic based on partial correlations.
(not used in final version)
"""
tt = np.zeros((d,d))
for i in range(d):
for j in range(d):
if i == j:
tt[i,j] = np.nan
else:
tt[i,j] = np.sqrt(n-d) * np.true_divide(rr[i,j],
np.sqrt(1-(rr[i,j]**2)))
return tt
def get_pp(tt, df):
"""
Computes the corresponding p-values based on the t-statistics with a given
degree of freedom (df). (not used in final version)
"""
# finds p-val with two-sided alternative for a t-val.
# Note that df is fixed at this point.
apply_get_p = np.vectorize(lambda t:
2 * (1- stats.t.cdf(abs(t), df)) )
return apply_get_p(tt)
def extend_labels(data, p):
"""
Returns an extened list of indices (from data) with lagged-varaibles up to
order p.
"""
assert type(data) is pd.DataFrame, "input data have to be a pd.DataFrame"
labels = data.columns
for i in range(1, p+1):
labels = labels.append(pd.Index(map(lambda x: x + '-' + str(i),
data.columns)))
return labels
def max_card_order(G, start_node):
"""
Returns an ordering (list of variable names) of graph G's vertices based on
the max cardinality search algorithm according to the pseudocode in p.312 of
[11] Koller, D., Friedman, N., Probabilistic Graphical Models. Principles
and Techniques. MIT Press (2009).
(with slight modificiation).
"""
assert start_node in G, "start_node should be in G"
N = len(G)
order = [0] * (N-1) + [start_node]
weight = {v: 0 for v in G.nodes}
# setting weights of used nodes to -N ensures the priority of unused nodes.
weight[start_node] = -N
for u in G.neighbors(start_node):
weight[u] += 1
for k in range(N-2, -1, -1):
v = max(weight, key=weight.get)
order[k] = v
weight[v] = -N
for u in G.neighbors(v):
weight[u] += 1
return order
def max_card_order_v1(G, start_node):
"""
This is the same as the previous function but allows users to break ties
manually.
"""
assert start_node in G, "start_node should be in V(G)"
N = len(G)
order = ['?'] * (N-1) + [start_node]
weight = {v: 0 for v in G.nodes}
# setting weights of used nodes to -N ensures the priority of unused nodes.
weight[start_node] = -N
for u in G.neighbors(start_node):
weight[u] += 1
for k in range(N-2, -1, -1):
v = max(weight, key=weight.get)
maxVal = weight[v]
keys = [key for key, val in weight.items() if val == maxVal]
if len(keys) != 1:
print('Current ordering:', order)
v = input("Break the tie between " + str(keys) + '\n Enter: ')
order[k] = v
weight[v] = -N
for u in G.neighbors(v):
weight[u] += 1
return order
# Section 2: Covariance Selection
## to build a restricted model in one-click given a junction tree
def lagged_sample(data, p, keep_label=False):
"""
Generates a (n-p)-element (p+1)d-dimensional sample of all contemporary
and lagged variables based on the original d-dimensional sample. Intended for
a vector autoregression (VAR) model of order p.
Parameters
----------
data: array_like
A multi-dimensional time series.
p: int
A positive integer indicating the order of the desired VAR model.
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
lagged_data: np.array (default) or pd.DataFrame
An (n-p)-element (p+1)d-dimensional sample of all contemporary and lagged
variables based on the original d-dimensional sample.
"""
if keep_label and (type(data) is not pd.DataFrame):
raise TypeError("data (first argument) has to be a pd.DataFrame"
" in order to carry variable labels.")
X = np.array(data)
n, d = data.shape
block_vector = [X[(p-i) :(n-i), :] for i in range(p+1)]
X_lagged= np.block(block_vector)
if keep_label:
labels = extend_labels(data, p)
return pd.DataFrame(X_lagged, columns=labels)
return X_lagged # default, return type: np.array
def blowed_S_inv(lagged_data, indices, keep_label=False):
"""
blowed_S_inv will take a low-dimensional sample covariance matrix S and insert
the inverse of S into the proper row/column of a larger (sparse) matrix with
the same shape as K.
Parameters
----------
lagged_data: array_like
An (n-p)-element (p+1)d-dimensional sample of all contemporary and lagged
variables based on the original d-dimensional sample.
indices: list of int
A list of indices corresponding to the set of "valid" variables included in
a clique/separator.
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
S_inv, np.array (default) or pd.DataFrame
A pd x pd matrix with nonzero entries in the indices of S with zeros in
indices not contained in S.
"""
if keep_label and (type(data) is not pd.DataFrame):
raise TypeError("data (first argument) has to be a pd.DataFrame"
" in order to carry variable labels.")
X_lagged = np.array(lagged_data)
n, m = X_lagged.shape # m = (p+1) * d
# S0 is the original dense sample covariance matrix
S0 = np.zeros((len(indices), len(indices)))
for i0, i in enumerate(indices):
for j0, j in enumerate(indices):
Z1 = X_lagged[:, i] - np.mean(X_lagged[:, i])
Z2 = X_lagged[:, j] - np.mean(X_lagged[:, j])
S0[i0, j0] = np.dot(Z1, Z2)
S0 = np.linalg.inv(S0)
S_inv = np.zeros((m, m))
for i0, i in enumerate(indices):
for j0, j in enumerate(indices):
S_inv[i, j] = S0[i0, j0]
if keep_label:
labels = extend_labels(lagged_data.columns)
return pd.DataFrame(S_inv, index=labels, columns=labels)
return S_inv
def cov_selection(data, p, JT, keep_label=False):
"""
Computes K-hat, a re-estimation of the concentration matrix for a restricted
VAR(p) model via covariance selection to implement contemperaneous causallity
respective to a junction tree (JT) specified by the user.
Parameters
----------
data: array_like
A multi-dimensional time series.
p: int
A postive integer specifying the order of the VAR model.
JT: dict
A pseudo-graph object that describes the junction tree structure based on
a graph genereated with contemporaneous variables in the data.
Example:
JT_Fin_VAR1={'cliques':
[['BVSP', 'EM', 'NIK'],
['BVSP', 'DAX', 'EM', 'FTSE', 'ISE', 'SP'],
['BVSP', 'DAX', 'EU', 'FTSE', 'ISE']],
'separators':
[['BVSP', 'EM'],
['BVSP', 'DAX', 'FTSE', 'ISE']]}
keep_label: bool, default False
If (keep_label == True), return result as a pd.DataFrame with readable
labelling (extra columns are labelled as 'X_{i}-p' indicating the
respective lag-p variables in the context of autoregression).
Returns
-------
K_hat: np.data pd.DataFrame
numpy pd x pd K concentration matrix
"""
lagged_data = lagged_sample(data, p, keep_label)
X_lagged = np.array(lagged_data)
n, d = data.shape
n_adj, d_adj= X_lagged.shape # n_adj = n-p; d_adj = (p+1) * d
# create a mapping of (contemporaneous) variables to indices
dct_var_to_index = {var:i for i,var in enumerate(data.columns)}
K_hat = np.zeros((d_adj, d_adj)) # K_hat is a (p+1)d x (p+1)d matrix
# all lagged variables (columns > d) needs to be included for covariance
# selection as our causal VAR model only consider comtemporaneous causality.
ind_lagged_var = list(range(d, d_adj))
for clique in JT['cliques']:
indices = [dct_var_to_index[var] for var in clique]
K_hat += blowed_S_inv(X_lagged, indices + ind_lagged_var)
for sep in JT['separators']:
indices = [dct_var_to_index[var] for var in sep]
K_hat -= blowed_S_inv(X_lagged, indices + ind_lagged_var)
K_hat *= n_adj # multiply sum & diff. of product moments by (n-p).
if keep_label:
labels = extend_labels(data, p)
return pd.DataFrame(K_hat, index=labels, columns=labels)
return K_hat
# Section 3: Block LDL Decomposition
## To obtain the model equations from the concentration matrix
def block_LDL(conc_mat, p, d):
"""
Performs the proposed block LDL decomposition on the postiive definite
concentration matrix for the novel causal VAR model. For more details, see
the paper.
Parameters
----------
conc_mat: array_like
A positive-definite concentration matrix for the causal VAR(p) model.
p: int
A positive integer indicating the order of the VAR model.
d: int
A positive integer indicating the dimension of the VAR process.
Returns
-------
L: np.array
A block-triangular matrix.
D: np.array
A block-diagonal matrix.
"""
K = np.array(conc_mat)
L = np.zeros(((p+1)*d,(p+1)*d))
D = np.zeros(((p+1)*d,(p+1)*d))
for j in range(d):
if j == 0:
D[0,0] = K[0,0]
else:
sum = 0
for h in range(j):
sum += L[j,h]*D[h,h]*L[j,h]
D[j,j] = K[j,j] - sum
for i in range(j+1,d):
sum = 0
for h in range(j):
sum += L[i,h]*D[h,h]*L[j,h]
L[i,j] = (K[i,j] - sum)/D[j,j]
L[d:,j] = K[d:,j]
for h in range(j):
L[d:,j] -= D[h,h]*L[j,h]*L[d:,h]
L[d:,j] = L[d:,j]/D[j,j]
for i in range(2*d):
L[i,i] = 1
return L, D
def get_A(L,d):
"""
Retrives the A matrix (part of model equation) from the result of the block
LDL decomposition.
"""
return np.transpose(L[:d,:d])
def get_B(L,d):
"""
Retrives the B matrix (part of model equation) from the result of the block
LDL decomposition. Notice that B = (B_1, B_2, ..., B_p).
"""
return np.transpose(L[d:,:d])
def get_Delta(D,d):
"""
Retrives the Delta matrix from the result of the block LDL decomposition.
"""
return np.linalg.inv(D[:d,:d])