Source code for fem.discrete.fit

"""This module implements a class that can fit the Potts model to data and make predictions with the model. `FEM for discrete data`_

.. _FEM for discrete data:
   https://joepatmckenna.github.io/fem/discrete.html
"""

import time
import numpy as np
from scipy.sparse.linalg import svds
from scipy.sparse import csc_matrix
import combinatorics
from .. import fortran_module


[docs]class model(object): """This class implements a Potts model that can be fit to data and used to make predictions. Attributes: degs (list): A list of model degrees n_x (int): The number of input variables n_y (int): The number of output variables m_x (ndarray): An array of length `n_x` listing the number of values that each input variable takes m_y (ndarray): m_x_cumsum (ndarray): m_y_cumsum (ndarray): x_int (ndarray): The input variable data `x` mapped to nonnegative integers y_int (ndarray): The output variable data `y` mapped to nonnegative integers cat_x (list): A list of per-row dicts that map the input x symbol data to the integer data in `x_int` cat_y (list): cat_x_inv (list): A list of dicts that are the inverses of the list of dicts in `cat_x` cat_y_inv (list): w (dict): A dictionary of length `len(degs)` keyed by `degs`. `w[deg]` are the model parameters for degree `deg` disc (list): A list of length `n_x` of the running discrepancies per variable computed during the model fit impute (bool): If true, the model will map each combination of hold-one-out input variables to the held out variable Examples: >>> import fem >>> n, m = 10, 3 >>> fem.discrete.time_series(fem.discrete.model_parameters(n, m), n, m) >>> model = fem.discrete.model() >>> model.fit(x[:, :-1], x[:, 1:]) >>> model.predict(x[:, -1]) """ def __init__(self, degs=[1]): self.degs = degs
[docs] def fit(self, x, y=None, iters=100, overfit=True, impute=None, svd='approx'): """Fit the Potts model to the data Args: x (ndarray): y (ndarray): iters (int): overfit (bool): impute (bool): svd (str): Returns: (dict, list): The fitted model parameters and the running discrepancies """ degs = self.degs x = np.array(x) x_int, cat_x = categorize(x) m_x = np.array([len(c) for c in cat_x]) n_x = x_int.shape[0] if y is None: impute = True y = x y_int, cat_y = x_int, cat_x m_y = m_x n_y = n_x else: impute = False y = np.array(y) y_int, cat_y = categorize(y) m_y = np.array([len(c) for c in cat_y]) n_y = y_int.shape[0] cat_x_inv = [{v: k for k, v in cat.iteritems()} for cat in cat_x] cat_y_inv = [{v: k for k, v in cat.iteritems()} for cat in cat_y] m_x_cumsum = np.insert(m_x.cumsum(), 0, 0) m_y_cumsum = np.insert(m_y.cumsum(), 0, 0) idx_x_by_deg = [combinatorics.multiindices(n_x, deg) for deg in degs] mm_x = np.array( [np.sum([np.prod(m_x[i]) for i in idx]) for idx in idx_x_by_deg]) mm_x_cumsum = np.insert(mm_x.cumsum(), 0, 0) if (not impute) or (impute and svd == 'approx'): x_oh = one_hot(x_int, m_x, degs) x_oh_pinv = svd_pinv(x_oh) w, disc, it = fortran_module.fortran_module.discrete_fit( x_int, y_int, m_x, m_y, m_y.sum(), degs, x_oh_pinv[0], x_oh_pinv[1], x_oh_pinv[2], iters, overfit, impute) disc = [d[1:it[i]] for i, d in enumerate(disc)] elif impute and svd == 'exact': w, disc = np.zeros((mm_x_cumsum[-1], mm_x_cumsum[-1])), [] for i in range(n_x): not_i = np.delete(range(n_x), i) x_oh = one_hot(x_int[not_i], m_x[not_i], degs) x_oh_pinv = svd_pinv(x_oh) wi, d, it = fortran_module.fortran_module.discrete_fit( x_int[not_i], y_int[[i]], m_x[not_i], m_y[[i]], m_y[i].sum(), degs, x_oh_pinv[0], x_oh_pinv[1], x_oh_pinv[2], iters, overfit, False) end = time.time() w[m_x_cumsum[i]:m_x_cumsum[i + 1], :m_x_cumsum[ i]] = wi[:, :m_x_cumsum[i]] w[m_x_cumsum[i]:m_x_cumsum[i + 1], m_x_cumsum[ i + 1]:] = wi[:, m_x_cumsum[i]:] disc.append(d[0][1:it[0]]) w = { deg: w[:, mm_x_cumsum[i]:mm_x_cumsum[i + 1]] for i, deg in enumerate(degs) } self.impute = impute self.x_int = x_int self.cat_x = cat_x self.m_x = m_x self.n_x = n_x self.cat_x_inv = cat_x_inv self.cat_y_inv = cat_y_inv self.m_x_cumsum = m_x_cumsum self.m_y_cumsum = m_y_cumsum self.y_int = y_int self.cat_y = cat_y self.m_y = m_y self.n_y = n_y self.disc = disc self.w = w
[docs] def predict(self, x): """Predict the state of the output variable given input variable `x` Args: x (ndarray): Returns: (ndarray, ndarray): prediction and probability """ cat_x = self.cat_x w = self.w degs = self.degs m_x = self.m_x n_y = self.n_y m_y_cumsum = self.m_y_cumsum cat_y_inv = self.cat_y_inv x = np.array(x) if x.ndim == 1: x = x[:, np.newaxis] x_int = np.empty(x.shape, dtype=int) for i in range(x.shape[0]): for j in range(x.shape[1]): x_int[i, j] = cat_x[i][x[i, j]] x_oh = one_hot(x_int, m_x, degs) x_oh = x_oh.toarray() w = np.hstack(w.values()) p = np.exp(w.dot(x_oh)) p = np.split(p, m_y_cumsum[1:-1], axis=0) for i in range(n_y): p[i] /= p[i].sum(0) y_int = np.array([pi.argmax(axis=0) for pi in p]) j = np.arange(y_int.shape[1]) p = np.array([pi[yi, j] for yi, pi in zip(y_int, p)]) y = np.empty(y_int.shape, dtype=x.dtype) for i in range(y.shape[0]): for j in range(y.shape[1]): y[i, j] = cat_y_inv[i][y_int[i, j]] y = y.squeeze() p = p.squeeze() return y, p
[docs] def energy(self, x): cat_x = self.cat_x m_x = self.m_x w = self.w x_int = [cat_x[i][j] for i, j in enumerate(x)] x_oh = one_hot(x_int, m_x) return -0.5 * (x_oh.T * w[1] * x_oh).squeeze()
[docs]def one_hot(x, m, degs=[1]): """Compute the one-hot encoding of `x` Args: x (ndarray): `n` by `l` array of nonnegative integers to be converted to one-hot encoding m (ndarray): length `n` array storing the number of possible integers in each row of `x` degs (list): list of degrees of one-hot-encodings to compute Returns: csc_matrix: sparse matrix that is the one-hot encoding of `x`. The degrees of one-hot encodings are vertically stacked. Examples: The one-hot encoding of a discrete variable records the state of the variable by a boolean vector with a 1 at the index which is the state of the variable. The one-hot encoding of a system of discrete variables is the concatenation of the one-hot encodings of each variable. For example if the system of discrete variables :math:`(x_1, x_2, x_3)` has state :math:`(x_1,x_2,x_3)=(1,1,2)` and the variables may take values from the first :math:`m=(3,3,4)` nonnegative integers, i.e. :math:`x_1,x_2\in\{0,1,2\}` and :math:`x_3\in\{0,1,2,3\}`, then the one-hot encoding of :math:`(x_1, x_2, x_3)^T` is :math:`\sigma=(0,1,0,0,1,0,0,0,1,0)^T`. >>> from fem.discrete import one_hot >>> x = [1,1,2] >>> m = [3,3,4] >>> x_oh = one_hot(x, m) >>> print x_oh.todense() matrix([[0.], [1.], [0.], [0.], [1.], [0.], [0.], [0.], [1.], [0.]]) """ x = np.array(x) m = np.array(m) degs = np.array(degs) if x.ndim == 1: x = x[:, np.newaxis] n, l = x.shape k = degs.shape[0] max_deg = degs.max() # list of indices that correspond to one hot encoding powers idx_len = combinatorics.binomial_coefficients(n, max_deg)[degs].sum() idx = [] for deg in degs: for i in combinatorics.multiindices(n, deg): idx.append(i) # height of vertical vectors mi = np.array([np.prod(m[i]) for i in idx]) # height of one-hot encoding m_sum = mi.sum() # convert one hot power to binary index s = np.vstack( [combinatorics.mixed_radix_to_base_10(x[i], m[i]) for i in idx]) # row offsets stratifier = np.insert(mi.cumsum(), 0, 0)[:-1] # construct sparse matrix data = np.ones(idx_len * l) indices = (s + stratifier[:, np.newaxis]).T.flatten() indptr = idx_len * np.arange(l + 1) x_oh = csc_matrix((data, indices, indptr), shape=(m_sum, l)) return x_oh
[docs]def categorize(x): """Convert symbolic x data to integer data. `x` contains a list of lists that are samples of different variables. Each list of samples is mapped to the first number of unique values detected of nonnegative integers. Args: x (list): A list where each element is a list of samples of a different variable Returns: (list, list): The integer data and a list of dictionaries that map symbol data to integer data for each row of `x` Examples: >>> from fem.discrete import categorize >>> x = [['a', 'b', 'a', 'a'], [10, 11, 12, 10, 11]] >>> x_int, cat_x = categorize(x) >>> x_int [array([0, 1, 0, 0]), array([0, 1, 2, 0, 1])] >>> cat_x [{'a': 0, 'b': 1}, {10: 0, 11: 1, 12: 2}] """ n = len(x) l = [len(xi) for xi in x] x_int = [np.empty(shape=l[i], dtype=int) for i in range(n)] cat_x = [] for i in range(n): unique_states = np.sort(np.unique(x[i])) m = len(unique_states) num = dict(zip(unique_states, np.arange(m))) for j in range(l[i]): x_int[i][j] = num[x[i][j]] cat_x.append(num) if np.allclose(l, l[0]): x_int = np.array(x_int) return x_int, cat_x
[docs]def svd_pinv(x): """Compute the SVD-based pseudoinverse matrices Args: x (csc_matrix): The matrix for which to compute the pseudoinverse Returns: (csc_matrix, ndarray, csc_matrix): If the SVD of `x` is :math:`USV^T`, this function returns :math:`V`, :math:`S^+`, and :math:`U^T` """ x_rank = np.linalg.matrix_rank(x.todense()) x_svd = svds(x, k=min(x_rank, min(x.shape) - 1)) # x_oh_svd = svds(x_oh, k=x_oh_rank) sv_pinv = x_svd[1] zero_sv = np.isclose(sv_pinv, 0) sv_pinv[~zero_sv] = 1.0 / sv_pinv[~zero_sv] sv_pinv[zero_sv] = 0.0 x_pinv = [x_svd[2].T, sv_pinv, x_svd[0].T] return x_pinv