Source code for eyefeatures.features.feature_maps

from collections.abc import Callable
from typing import Literal

import gudhi as gd
import numpy as np
import pandas as pd
from numpy.typing import NDArray
from PyEMD.EMD2d import EMD2D
from scipy.ndimage import gaussian_filter, zoom as ndimage_zoom
from scipy.signal import convolve2d
from scipy.stats import gaussian_kde

from eyefeatures.utils import _rec2square, _split_dataframe


# =========================== HEATMAPS ===========================
def _check_shape(shape: tuple[int, int]):
    assert isinstance(shape, tuple), f"'shape' must be tuple, hot {type(shape)}."
    assert len(shape) == 2, f"'shape' must be of length 2, got {len(shape)}."
    for k in shape:
        assert isinstance(
            k, int
        ), f"Values in 'shape' must be integers, got '{type(k)}'."
        assert k > 0, f"Integers in 'shape' must be positive, got '{k}'."


def get_heatmap(
    x: NDArray,
    y: NDArray,
    shape: tuple[int, int],
    check: bool = True,
    zoom_to_data: bool = False,
) -> np.ndarray:
    """Get heatmap from scanpath (given coordinates are scaled and
    sorted in time) using Gaussian KDE.

    Args:
        x: X coordinate column name.
        y: Y coordinate column name.
        shape: if tuple with single integer, then square matrix is returned,
            otherwise k must be (height, width) tuple and rectangular matrix
            is returned.
        check: whether to check 'shape' for correct typing.
        zoom_to_data: if True, normalize coordinates to fill the entire heatmap.
            If False, use coordinates as-is (assumes [0,1] range).

    Returns:
        heatmap matrix.
    """
    if check:
        _check_shape(shape)

    x = np.asarray(x)
    y = np.asarray(y)

    # Handle edge cases where KDE cannot be applied
    if len(x) <= 2:
        # in case of small number of samples, KDE cannot be applied and
        # default kernel estimate is returned instead
        x, y = np.array([0.25, 0.50, 0.75]), np.array([0.50, 0.50, 0.50])

    # Normalize coordinates to [0, 1] if zoom_to_data is enabled
    if zoom_to_data:
        x_min, x_max = x.min(), x.max()
        y_min, y_max = y.min(), y.max()
        # Add small margin to avoid edge effects
        x_range = x_max - x_min if x_max > x_min else 1.0
        y_range = y_max - y_min if y_max > y_min else 1.0
        x = (x - x_min) / x_range
        y = (y - y_min) / y_range

    # Check if all points are the same (zero variance)
    if len(np.unique(x)) == 1 and len(np.unique(y)) == 1:
        # All points at same location - return uniform heatmap centered at that point
        heatmap = np.zeros(shape)
        center_x = int(np.clip(x[0] * shape[1], 0, shape[1] - 1))
        center_y = int(np.clip(y[0] * shape[0], 0, shape[0] - 1))
        heatmap[center_y, center_x] = 1.0
        return heatmap

    # Check if points are collinear (all x same or all y same)
    if len(np.unique(x)) == 1 or len(np.unique(y)) == 1:
        # Points lie on a line - use histogram-based approach instead
        return _get_heatmap_histogram(x, y, shape)

    # Try to create KDE, with fallback to histogram if it fails
    try:
        scanpath = np.vstack([x, y])
        kernel = gaussian_kde(scanpath)
        interval_x, interval_y = np.linspace(0, 1, shape[1]), np.linspace(
            0, 1, shape[0]
        )
        x_grid, y_grid = np.meshgrid(interval_x, interval_y)

        # Query KDE with same [x, y] order as training data
        positions = np.vstack([x_grid.ravel(), y_grid.ravel()])
        return np.reshape(kernel(positions), x_grid.shape)
    except (np.linalg.LinAlgError, ValueError):
        # KDE failed due to singular covariance matrix - fallback to histogram
        return _get_heatmap_histogram(x, y, shape)


def _get_heatmap_histogram(
    x: NDArray, y: NDArray, shape: tuple[int, int]
) -> np.ndarray:
    """Fallback method: create heatmap using 2D histogram when KDE fails.

    Args:
        x: X coordinates (normalized 0-1)
        y: Y coordinates (normalized 0-1)
        shape: Output shape (height, width)

    Returns:
        heatmap matrix
    """
    # Convert normalized coordinates to pixel indices
    x_indices = np.clip((x * shape[1]).astype(int), 0, shape[1] - 1)
    y_indices = np.clip((y * shape[0]).astype(int), 0, shape[0] - 1)

    # Create histogram
    heatmap = np.zeros(shape)
    for xi, yi in zip(x_indices, y_indices, strict=False):
        heatmap[yi, xi] += 1.0

    # Normalize
    if heatmap.sum() > 0:
        heatmap = heatmap / heatmap.sum()

    # Apply slight Gaussian smoothing to avoid completely sparse heatmaps
    if heatmap.sum() > 0:
        heatmap = gaussian_filter(heatmap, sigma=0.5)
        heatmap = heatmap / heatmap.sum() if heatmap.sum() > 0 else heatmap

    return heatmap


[docs] def get_heatmaps( data: pd.DataFrame, x: str, y: str, shape: tuple[int, int], pk: list[str] = None, zoom_to_data: bool = False, ) -> np.ndarray: """Get heatmaps from scanpaths (given coordinates are scaled and sorted in time) using Gaussian KDE. Args: data: input Dataframe with fixations. x: X coordinate column name. y: Y coordinate column name. shape: if tuple with single integer, then square matrix is returned, otherwise k must be (height, width) tuple and rectangular matrix is returned. pk: List of columns being primary key. zoom_to_data: if True, normalize coordinates to fill the entire heatmap. If False, use coordinates as-is (assumes [0,1] range). Returns: heatmap matrices. """ _check_shape(shape) if pk is None: x_path, y_path = data[x].values, data[y].values heatmap = get_heatmap(x_path, y_path, shape, zoom_to_data=zoom_to_data) heatmaps = heatmap[np.newaxis, :, :] else: groups: list[str, pd.DataFrame] = _split_dataframe(data, pk) hshape = (len(groups), shape[0], shape[1]) heatmaps = np.zeros(hshape) for i, (_, group_X) in enumerate(groups): x_path, y_path = group_X[x], group_X[y] heatmaps[i, :, :] = get_heatmap( x_path, y_path, shape, check=False, zoom_to_data=zoom_to_data ) return heatmaps
# =========================== PCA =========================== def get_pca(matrix: NDArray, p: int = None, reserve_info: float = None) -> np.ndarray: """Computes PCA compression. Args: matrix: matrix to get principal components from (n x m) p: number of first principal components to leave reserve_info: use such value of p that 0.0 <= reserve_info <= 1.0 fraction of information is conserved. Note: either p or reserve_info must be specified. Returns: matrix of eigenvectors (m x p), projection (n x p), rows means (n x 1) """ assert len(matrix.shape) == 2, "'matrix' should be a matrix" assert (p is None) or ( 0 <= p <= matrix.shape[0] ), "given 'matrix' is n x m, 0 <= p <= n must hold" assert (p is not None) or ( reserve_info is not None ), "either 'p' or 'reserve_info' must be provided" assert (reserve_info is None) or ( 0.0 <= reserve_info <= 1.0 ), "'reserve_info' must be between 0.0 and 1.0" matrix = matrix.astype(np.float64) row_means = np.mean(matrix, axis=1) matrix -= row_means[:, None] c = np.cov(matrix, rowvar=False) n, m = matrix.shape evals, evecs = np.linalg.eigh(c) assert evals.shape == (m,) assert evecs.shape == (m, m) sorted_evals_indexes = np.argsort(evals)[::-1] evals = evals[sorted_evals_indexes] # sorted eigenvalues, (m,) evecs = evecs[:, sorted_evals_indexes] # sorted eigenvectors, (m, m) if p is not None: evecs = evecs[:, :p] # (m, p) else: evals /= evals.sum() cumsum = 0.0 p = 0 while cumsum < reserve_info and p < evals.size: cumsum += evals[p] p += 1 evecs = evecs[:, :p] # (m, p) print(matrix.shape) print(evecs.shape) projection = matrix @ evecs # (n, p) return evecs, projection, row_means # =========================== RQA ===========================
[docs] def get_rqa( data: pd.DataFrame, x: str, y: str, metric: Callable, rho: float ) -> np.ndarray: r"""Calculates recurrence quantification analysis matrix based on given fixations. Args: data: input Dataframe with fixations. x: X coordinate column name. y: Y coordinate column name. metric: callable metric on :math:`\mathbb{R}^2` points. rho: threshold radius. Returns: rqa matrix. """ fixations = data[[x, y]].values n = len(fixations) rqa_matrix = np.zeros((n, n), dtype=np.int32) for i in range(n): for j in range(i + 1, n): dist = metric(fixations[i], fixations[j]) rqa_matrix[i][j] = int(dist <= rho) rqa_matrix[j][i] = int(dist <= rho) return rqa_matrix
# =========================== MTF ===========================
[docs] def get_mtf( data: pd.DataFrame, x: str, y: str, n_bins: int = 10, output_size: int | float = 1.0, shrink_strategy: Literal["max", "mean", "normal"] = "normal", flatten: bool = False, ) -> np.ndarray: """Calculates Markov Transition Field for (x,y) coordinates. Args: data: input Dataframe with fixations. x: X coordinate column name. y: Y coordinate column name. n_bins: number of bins to discretize time series into. output_size: fraction between 0 and 1. Specifies fraction of input series length to shrink output to. shrink_strategy: strategy to use for convolution while shrinking. Ignored if 'output_size' is equal to size of 'data'. flatten: bool, whether to flatten the array. Returns: tensor of shape (2, n_coords, n_coords), where n_coords is the length of input dataframe. """ if isinstance(output_size, float): assert 0.0 < output_size <= 1.0, "Must be 0 < output_size <= 1." output_size = np.ceil(output_size * len(data)).astype(np.int64) elif isinstance(output_size, int): assert ( 0 < output_size <= len(data) ), "'output_size' must be positive integer not exceeding the input size." else: raise ValueError(f"'output_size' is of wrong type '{type(output_size)}'.") assert n_bins > 1, "'n_bins' must be greater than 1." assert len(data) > n_bins, "Input series must contain more than 'n_bins' samples." x_coords, y_coords = data[[x]].values.ravel(), data[[y]].values.ravel() fixations_mtf = _get_mtf(np.array([x_coords, y_coords]), n_bins=n_bins) n_samples, n_timestamps = 2, len(x_coords) if output_size < n_timestamps: shrunk_mtfs = [] for i in range(n_samples): shrunk_mtfs.append( _shrink_matrix( fixations_mtf[i, :, :], height=output_size, width=output_size, strategy=shrink_strategy, ) ) del fixations_mtf fixations_mtf = np.array(shrunk_mtfs) return fixations_mtf.flatten() if flatten else fixations_mtf
def _get_mtf(a: np.ndarray, n_bins: int) -> np.ndarray: assert len(a.shape) == 2, "Array of shape (n_samples, n_timestamps) must be passed." n_samples, n_timestamps = a.shape a_binned = np.zeros(a.shape, dtype=np.int64) quantiles = np.linspace(0, 1, n_bins + 1)[1:-1] # evenly spaced quantiles bins = np.quantile(a, q=quantiles, axis=1).T # bins for each sample for i in range(n_samples): a_binned[i, :] = np.searchsorted( bins[i, :], a[i, :], side="left" ) # squeeze coordinates mtm = np.zeros((n_samples, n_bins, n_bins)) # build Markov Transition Matrix for i in range(n_samples): for j in range(n_timestamps - 1): mtm[i, a_binned[i, j], a_binned[i, j + 1]] += 1 mtm_sum = mtm.sum(axis=2) # normalize rows sums in each sample to 1 np.place(mtm_sum, mtm_sum == 0, 1) mtm /= mtm_sum[:, :, None] mtf = np.zeros( (n_samples, n_timestamps, n_timestamps) ) # build Markov Transition Field for i in range(n_samples): for j in range(n_timestamps): for k in range(n_timestamps): mtf[i, j, k] = mtm[i, a_binned[i, j], a_binned[i, k]] return mtf def _shrink_matrix( mat: np.array, height: int, width: int, strategy: Literal["max", "mean", "normal"] = "normal", ) -> np.ndarray: """Shrinks matrix to be close to output_size x output_size. Args: mat: 2d matrix (image channel). height: height of shrunk matrix. width: width of shrunk matrix. strategy: strategy to use while shrinking. * 'mean' - 2d convolution with uniform kernel. * 'normal' - 2d convolution with Gauss kernel. * 'max' - max pooling, resulting image is the closest possible to provided 'size'. Returns: shrunk matrix. """ ih, iw = mat.shape oh, ow = height, width assert len(mat.shape) == 2 assert oh < ih and ow < iw if strategy in ("mean", "normal"): # convolution (uniform/normal) dh, dw = ih - oh + 1, iw - ow + 1 if strategy == "mean": kernel = np.ones((dh, dw)) / (dh * dw) else: # 'normal' m = max(dh, dw) kernel = _gaussian_kernel(m, sigma=5) kernel = _rec2square(kernel) shrunk_mat = convolve2d(mat, kernel, mode="valid") elif strategy == "max": # max pooling dh, dw = int(np.ceil(ih / oh)), int(np.ceil(iw / ow)) rh, rw = ih // dh, iw // dw if ih % dh == 0 and iw % dw == 0: # filter divides matrix on equal blocks shrunk_mat = mat.reshape(rh, dh, rw, dw).max(axis=(1, 3)) else: Q1 = mat[: rh * dh, : rw * dw].reshape(rh, dh, rw, dw).max(axis=(1, 3)) Q2 = mat[rh * dh :, : rw * dw].reshape(-1, rw, dw).max(axis=2) Q3 = mat[: rh * dh, rw * dw :].reshape(rh, dh, -1).max(axis=1) Q4 = mat[rh * dh :, rw * dw :].max() shrunk_mat = np.vstack(np.c_[Q1, Q3], np.c_[Q2, Q4]) else: raise ValueError(f"'shrink_strategy'={strategy} is not supported.") return shrunk_mat def _gaussian_kernel(size, sigma) -> np.ndarray: if size % 2 == 0: x = (np.arange(-size / 2, size / 2, 1) + 0.5).reshape(1, -1) else: x = np.arange(-size // 2 + 1, size // 2 + 1, 1).reshape(1, -1) xx = np.tile(x, (size, 1)) yy = np.rot90(xx, 1) kernel = (1 / 2 * np.pi * np.square(sigma)) * np.exp( -(np.square(xx) + np.square(yy)) / (2 * np.square(sigma)) ) kernel = kernel / np.sum(kernel) return kernel # =========================== GASF/GADF ===========================
[docs] def get_gaf( data: pd.DataFrame, x: str, y: str, t: str = None, field_type: Literal["difference", "sum"] = "difference", to_polar: Literal["regular", "cosine"] = "cosine", flatten: bool = False, ) -> np.ndarray: """Calculates Gramian Angular Field for (x,y) coordinates. Args: data: input Dataframe with fixations. x: X coordinate column name. y: Y coordinate column name. t: timestamps column name. field_type: which type of field to calculate. If "difference", then GADF is returned, otherwise ("sum") GASF is returned. to_polar: conversion from cartesian to polar coordinates. * 'regular': standard conversion calculating arctan(y/x). * 'cosine': angle is calculated as cosine of series data, radius is taken as timestamps. flatten: bool, whether to flatten the array. Returns: tensor of shape (2, n_coords, n_coords), where n_coords is the length of input dataframe. """ assert field_type in ( "difference", "sum", ), f"'field_type'={field_type} is not supported." assert to_polar in ("regular", "cosine"), f"'to_polar'={to_polar} is not supported." x_coords, y_coords = data[[x]].values.ravel(), data[[y]].values.ravel() timestamps = np.arange(len(x_coords)) if t is None else data[[t]].values.ravel() gaf = _get_gaf( np.array([x_coords, y_coords]), timestamps, field_type=field_type, to_polar=to_polar, ) return gaf.flatten() if flatten else gaf
def _get_gaf( a: np.array, t: np.array, field_type: Literal["difference", "sum"], to_polar: Literal["regular", "cosine"], ) -> np.ndarray: """ Args: a: array of shape (n_samples, n_timestamps) being angular values. t: array of shape (n_timestamps,) being timestamps for all samples in 'a' or (n_samples, n_timestamps) being timestamps of corresponding angular values in 'a', i.e. a[i, :] are expected to be the angles at time t[i, :]. """ def _get_t(t_: np.array, i_: int): return t_[i_, :] if len(t_.shape) > 1 else t_ assert len(a.shape) == 2, "Array of shape (n_samples, n_timestamps) must be passed." if len(t.shape) == 2: assert t.shape == a.shape, "'a' and 't' must be of same shape." else: assert len(t.shape) == 1 and len(t) == a.shape[1] n_samples, n_timestamps = a.shape if field_type == "sum": def f(phi1, phi2): return np.cos(phi1 + phi2) else: # 'difference' def f(phi1, phi2): return np.sin(phi1 - phi2) gaf = np.zeros((n_samples, n_timestamps, n_timestamps)) for i in range(n_samples): if to_polar == "regular": rho, phi = _car2pol( a[i, :], _get_t(t, i) ) # _get_t used to avoid copies of 't' in 1d case else: rho, phi = _encode_car(a[i, :], _get_t(t, i)) for j in range(n_timestamps): for k in range(n_timestamps): gaf[i, j, k] = f(phi[j], phi[k]) return gaf def _rescale(a: np.array) -> np.ndarray: """Linearly map array to [-1, 1].""" amm = _minmax(a) return amm * 2 - 1 def _minmax(a: np.array) -> np.ndarray: min_, max_ = a.min(), a.max() if min_ == max_: return a if min_ == 0.0 else np.ones(a.shape, dtype=a.dtype) return (a - min_) / (max_ - min_) def _car2pol(x: np.array, f_x: np.array) -> tuple[np.ndarray, np.ndarray]: rho = np.sqrt(x**2 + f_x**2) phi = np.arctan2(f_x, x) return rho, phi def _encode_car(x: np.array, t: np.array) -> tuple[np.ndarray, np.ndarray]: rho = t phi = np.arccos(_rescale(x)) return rho, phi def _resize_2d_maps_to_shape(maps: np.ndarray, shape: tuple[int, int]) -> np.ndarray: """Resize (2, h, w) or (c, h, w) array to (c, shape[0], shape[1]) using bilinear interpolation.""" _check_shape(shape) if maps.shape[1] == shape[0] and maps.shape[2] == shape[1]: return maps n, h, w = maps.shape zoom_factors = (1, shape[0] / h, shape[1] / w) return ndimage_zoom(maps, zoom_factors, order=1, mode="nearest") # =========================== GAF/MTF batch (2D DL representations) =========================== def get_gafs( data: pd.DataFrame, x: str, y: str, shape: tuple[int, int], pk: list[str] = None, t: str = None, field_type: Literal["difference", "sum"] = "difference", to_polar: Literal["regular", "cosine"] = "cosine", ) -> np.ndarray: """Batch Gramian Angular Fields for 2D DL: one (2, H, W) image per group, resized to shape. Args: data: input DataFrame with fixations. x: X coordinate column name. y: Y coordinate column name. shape: (height, width) of output images per channel. pk: list of primary key columns for grouping. t: timestamps column name. field_type: "difference" (GADF) or "sum" (GASF). to_polar: "regular" or "cosine". Returns: array of shape (n_groups, 2, shape[0], shape[1]), dtype float. """ _check_shape(shape) if pk is None: groups = [(None, data)] else: groups = list(_split_dataframe(data, pk)) out = np.zeros((len(groups), 2, shape[0], shape[1]), dtype=np.float64) for i, (_, group_X) in enumerate(groups): if len(group_X) < 2: # Pad with duplicate row so GAF is at least 2x2, then resize group_X = pd.concat([group_X, group_X.iloc[-1:]], ignore_index=True) gaf = get_gaf( group_X, x, y, t=t, field_type=field_type, to_polar=to_polar, flatten=False ) gaf = _resize_2d_maps_to_shape(gaf, shape) out[i] = gaf return out def get_mtfs( data: pd.DataFrame, x: str, y: str, shape: tuple[int, int], pk: list[str] = None, n_bins: int = 10, shrink_strategy: Literal["max", "mean", "normal"] = "normal", ) -> np.ndarray: """Batch Markov Transition Fields for 2D DL: one (2, H, W) image per group, resized to shape. Args: data: input DataFrame with fixations. x: X coordinate column name. y: Y coordinate column name. shape: (height, width) of output images per channel. pk: list of primary key columns for grouping. n_bins: number of bins for MTF discretization. shrink_strategy: strategy when shrinking MTF matrix. Returns: array of shape (n_groups, 2, shape[0], shape[1]), dtype float. """ _check_shape(shape) if pk is None: groups = [(None, data)] else: groups = list(_split_dataframe(data, pk)) out = np.zeros((len(groups), 2, shape[0], shape[1]), dtype=np.float64) min_len = max(n_bins + 1, 2) for i, (_, group_X) in enumerate(groups): grp = group_X if len(grp) < min_len: # Repeat rows to reach min_len while len(grp) < min_len: grp = pd.concat([grp, grp.iloc[-1:]], ignore_index=True) try: # get_mtf requires output_size in [2, len(data)] and len(data) > n_bins out_size = max(2, min(len(grp), shape[0], shape[1])) mtf = get_mtf( grp, x, y, n_bins=n_bins, output_size=out_size, shrink_strategy=shrink_strategy, flatten=False, ) mtf = _resize_2d_maps_to_shape(mtf, shape) out[i] = mtf except Exception: # Fallback: zeros or small constant out[i] = np.zeros((2, shape[0], shape[1]), dtype=np.float64) return out # =========================== HILBERT CURVE ===========================
[docs] def get_hilbert_curve_enc( data: pd.DataFrame, x: str, y: str, scale: bool = True, p: int = 4 ) -> np.ndarray: """Map scanpath to values on Hilbert curve and encode to single feature vector. Args: data: input Dataframe with fixations. x: X coordinate column name. y: Y coordinate column name. scale: whether to scale the scanpath to [0, 1] before mapping to Hilbert curve. If false, then p: order of Hilbert curve, unit square is divided into (2^p)x(2^p) smaller squares. Higher value of p indicates better locality preservation. Returns: scanpath encoding in 2^p-dimensional feature space using 1D Hilbert curve. """ n = 2**p mapping = get_hilbert_curve( data=data, x=x, y=y, scale=scale, p=p ) # get Hilbert curve mapping mapping = np.unique(mapping) # leave only unique values vec = np.zeros((n * n,)) for i in range(n * n): vec[i] = i in mapping # activate mapped values return vec
[docs] def get_hilbert_curve( data: pd.DataFrame, x: str, y: str, scale: bool = True, p: int = 4 ) -> np.ndarray: """Map scanpath to points on 1D Hilbert curve. Args: data: input Dataframe with fixations. x: X coordinate column name. y: Y coordinate column name. scale: whether to scale the scanpath to [0, 1] before mapping to Hilbert curve. If false, then p: order of Hilbert curve, unit square is divided into (2^p)x(2^p) smaller squares. Higher value of p indicates better locality preservation. Returns: scanpath mapping to 1D Hilbert curve. """ x, y = data[x].values, data[y].values n_fixations = len(x) if scale: x, y = ( _minmax(x), _minmax(y), ) # map x, y to [0, 1] TODO: better approach than minmax? else: assert ( 0 <= x <= 1 ), "Either scale 'x' to be between 0 and 1 or add 'scale'=True." assert ( 0 <= y <= 1 ), "Either scale 'y' to be between 0 and 1 or add 'scale'=True." x, y = x * (2**p), y * (2**p) # map x, y to [0, 2^p] x, y = ( np.array(np.round(x), dtype=int), np.array(np.round(y), dtype=int), ) # map [0, 2^p] to {0, 1, .., 2^p} h = np.zeros((n_fixations,)) for i in range(n_fixations): h[i] = xy2h(x[i], y[i], p=p) return h
def xy2h(x: int, y: int, p: int) -> int: """Mapping of 2D space to 1D using Hilbert curve. Args: x: x-coordinate of a point, 0 <= x < 2^p. y: y-coordinate of a point, 0 <= y < 2^p. p: order of Hilbert curve, unit square is divided into (2^p)x(2^p) smaller squares. Higher value of p indicates better locality preservation. Returns: corresponding point on 1D Hilbert curve. Notes: Algorithm: https://people.math.sc.edu/Burkardt/py_src/hilbert_curve/hilbert_curve.py. """ n = 2**p s = n // 2 d = 0 while s > 0: rx = (x & s) > 0 ry = (y & s) > 0 d += s * s * ((3 * rx) ^ ry) # adding length on Hilbert curve if ry == 0: # rotation of quadrant if rx == 1: x = n - 1 - x y = n - 1 - y x, y = y, x s = s // 2 return d
[docs] def hilbert_huang_transform(data: np.ndarray, max_imf: int = 1) -> np.ndarray: """Perform Hilbert-Huang transform on a given data sequence. Args: data: input sequence of form (x, y) coordinates max_imf: maximum number of intrinsic mode functions to extract Returns: intrinsic mode functions from an input data vector """ emd = EMD2D() decomposed = emd(data, max_imf=max_imf) return decomposed
# =========================== PERSISTENCE CURVE =========================== def vietoris_rips_filtration( scanpath: np.ndarray, max_dim: int = 2, max_radius: float = 1.0 ): """Compute the Vietoris-Rips filtration for a point cloud. Args: scanpath: scanpath data as a numpy array of shape (n, 2). max_dim: maximum dimension for persistent homology. max_radius: maximum radius for the filtration. Returns: persistence diagram and simplex tree. """ rips_complex = gd.RipsComplex(points=scanpath, max_edge_length=max_radius) simplex_tree = rips_complex.create_simplex_tree(max_dimension=max_dim) persistence = simplex_tree.persistence() return persistence, simplex_tree def lower_star_filtration(time_series: np.ndarray, persistence_dim_max: bool = False): """Compute the Lower Star filtration for a time series. Args: time_series: time series data. persistence_dim_max: If true, the persistent homology for the maximal dimension in the complex is computed. If False, it is ignored. Default is False. """ simplex_tree = gd.SimplexTree() for i, val in enumerate(time_series): simplex_tree.insert([i], filtration=val) simplex_tree.make_filtration_non_decreasing() persistence = simplex_tree.persistence() return persistence, simplex_tree def persistence_curve(persistence_diagram: list[tuple | np.ndarray], t: float): """Compute the persistence curve for a persistence diagram at time t. Args: persistence_diagram: persistence diagram [(birth, death), ...]. t: threshold time for persistence curve. Returns: sum of persistence intervals active at time t. """ total_persistence = sum( (death - birth) for birth, death in persistence_diagram if birth <= t <= death ) return total_persistence def persistence_entropy_curve(persistence_diagram: list[tuple | np.ndarray], t: float): """Compute the persistence entropy curve for a persistence diagram at time t. Args: persistence_diagram: persistence diagram [(birth, death), ...]. t: threshold time for persistence entropy curve. Returns: entropy value at time t. """ intervals = [ (death - birth) for birth, death in persistence_diagram if birth <= t <= death ] total_persistence = sum(intervals) if total_persistence == 0: return 0.0 probabilities = [interval / total_persistence for interval in intervals] entropy = -sum(p * np.log(p) if p > 1e-15 else 0.0 for p in probabilities) return entropy
[docs] def calculate_topological_features( scanpath: np.ndarray, time_series: np.ndarray, max_radius: float = 1.0, max_time: float = 1.0, max_dim: int = 2, time_steps: int = 100, ): """Calculate topological features (persistence curve and persistence entropy) for a scanpath. Args: scanpath: scanpath data array of shape (n, 2). time_series: 1d array of time series data (e.g. x or y coordinates over time). max_radius: maximum radius for Vietoris-Rips filtration. max_time: maximum threshold for the persistence curve. max_dim: maximum dimension for persistent homology. time_steps: number of time steps for evaluating the persistence curve and entropy. Returns: values of persistence curve at each time step, values of persistence entropy at each time step. """ persistence_vr, _ = vietoris_rips_filtration( scanpath, max_dim=max_dim, max_radius=max_radius ) persistence_ls, _ = lower_star_filtration(time_series) time_grid = np.linspace(0, max_time, time_steps) persistence_curve_vals = [] persistence_entropy_vals = [] for t in time_grid: pc_vr = persistence_curve([p[1] for p in persistence_vr], t) pe_vr = persistence_entropy_curve([p[1] for p in persistence_vr], t) pc_ls = persistence_curve([p[1] for p in persistence_ls], t) pe_ls = persistence_entropy_curve([p[1] for p in persistence_ls], t) persistence_curve_vals.append(pc_vr + pc_ls) persistence_entropy_vals.append(pe_vr + pe_ls) return np.array(persistence_curve_vals), np.array(persistence_entropy_vals)