Source code for eyefeatures.preprocessing.fixation_extraction

from collections.abc import Callable

import numpy as np
import pandas as pd
import scipy

from eyefeatures.preprocessing._utils import _get_distance, _get_MEC
from eyefeatures.preprocessing.base import BaseFixationPreprocessor


# ======== FIXATION PREPROCESSORS ========
[docs] class IVT(BaseFixationPreprocessor): """Velocity Threshold Identification. Complexity: O(n). """ def __init__( self, x: str, y: str, t: str, threshold: float, min_duration: float, distance: str = "euc", pk: list[str] = None, eps: float = 1e-10, ): super().__init__(x=x, y=y, t=t, pk=pk) self.threshold = threshold self.min_duration = min_duration self.distance = distance self.eps = eps self.available_distances = ("euc", "manhattan", "chebyshev") # TODO add more def _check_params(self): m = "IVT" assert self.x is not None, self._err_no_field(m, "x") assert self.y is not None, self._err_no_field(m, "y") assert self.t is not None, self._err_no_field(m, "t") assert self.distance in self.available_distances, ( f"'distance' must be one of ({', '.join(self.available_distances)})," f"got {self.distance}." ) # @jit(forceobj=True, looplift=True) def _preprocess(self, X: pd.DataFrame) -> pd.DataFrame: points = X[[self.x, self.y]].values dist = self._get_distances(points, self.distance) x = X[self.x].values y = X[self.y].values t = X[self.t].values dt = np.diff(t) vel = dist / (dt + self.eps) is_fixation = (vel <= self.threshold).astype(np.int32) fixations = self._squash_fixations(is_fixation) fixations_df = pd.DataFrame( data={ "fixation_id": fixations, self.x: x[:-1], self.y: y[:-1], "start_time": t[:-1], "end_time": t[:-1], "distance_min": dist, "distance_max": dist, } ) fixations_df = fixations_df[fixations_df["fixation_id"] != 0] diameters = [] centers = [] for i in set(fixations): if i != 0: points = fixations_df.loc[ fixations_df["fixation_id"] == i, fixations_df.columns.isin([self.x, self.y]), ].values x, y, radius = _get_MEC(np.unique(points, axis=0)) diameters.append(radius * 2) centers.append(np.array([x, y])) fixations_df = fixations_df.groupby(by=["fixation_id"]).agg( { self.x: "mean", self.y: "mean", "start_time": "min", "end_time": "max", "distance_min": "min", # between consecutive gazes "distance_max": "max", } ) fixations_df["diameters"] = diameters fixations_df["centers"] = centers # default features feats = ( "duration", "saccade_duration", "saccade_length", "saccade_angle", "saccade2_angle", ) fixations_df = self._compute_feats(fixations_df, feats) fixations_df = fixations_df[fixations_df["duration"] >= self.min_duration] return fixations_df
[docs] class IDT(BaseFixationPreprocessor): """Dispersion Threshold Identification. Complexity: O(n^2 * log n) worst case, O(n * log n) average. """ def __init__( self, x: str, y: str, t: str, min_duration: float, max_duration: float, max_dispersion: float, distance: str = "euc", # norm in R^2 for distance calculation pk: list[str] = None, eps: float = 1e-20, ): super().__init__(x=x, y=y, t=t, pk=pk) self.min_duration = min_duration self.max_duration = max_duration self.max_dispersion = max_dispersion self.distance = distance self.eps = eps self.available_distances = ("euc", "manhattan", "chebyshev") # TODO add more def _check_params(self): m = "IDT" assert self.x is not None, self._err_no_field(m, "x") assert self.y is not None, self._err_no_field(m, "y") assert self.t is not None, self._err_no_field(m, "t") assert self.min_duration is not None, self._err_no_field(m, "min_duration") assert self.min_duration > 0, "'min_duration' must be non-negative." assert self.max_duration is not None, self._err_no_field(m, "max_duration") assert ( self.max_duration > self.min_duration ), "'max_duration' must be greater than min_duration." assert self.max_dispersion is not None, self._err_no_field(m, "min_duration") assert self.max_dispersion > 0, "'max_dispersion' must be non-negative." assert self.distance in self.available_distances, ( f"'distance' must be one of ({', '.join(self.available_distances)})," f"got {self.distance}." ) @staticmethod def _build_sparse_tables(a): n = a.shape[0] logn = int(np.ceil(np.log2(n))) mint = np.zeros((logn, n)) maxt = np.zeros((logn, n)) mint[0] = a maxt[0] = a for t in range(logn - 1): for i in range(n - (2 << t) + 1): mint[t + 1, i] = min(mint[t, i], mint[t, i + (1 << t)]) maxt[t + 1, i] = max(maxt[t, i], maxt[t, i + (1 << t)]) return mint, maxt @staticmethod def _rmq(table: np.ndarray, left_idx: int, right_idx: int, f: Callable): """RMQ on range [left_idx, right_idx).""" t = int(np.log2(right_idx - left_idx)) return f(table[t, left_idx], table[t, right_idx - (1 << t)]) def _get_disp_window( self, left_idx: int, r_idx: int, mintx, maxtx, minty, maxty ) -> [int, float]: """Binary search to find the widest dispersion window in [left_idx, r_idx].""" right_border = left_idx left_border = left_idx window_disp = -np.inf while r_idx >= left_idx: m = left_idx + int((r_idx - left_idx) / 2) minx = self._rmq(mintx, left_border, m + 1, min) miny = self._rmq(minty, left_border, m + 1, min) maxx = self._rmq(maxtx, left_border, m + 1, max) maxy = self._rmq(maxty, left_border, m + 1, max) disp = _get_distance( np.array([minx, miny]), np.array([maxx, maxy]), self.distance ) if disp <= self.max_dispersion: right_border = m window_disp = disp left_idx = m + 1 else: r_idx = m - 1 return right_border, window_disp def _get_dur_window(self, left_idx: int, r: int, t: np.ndarray) -> [int, int]: """Sliding window to find the widest duration window in [left_idx, r].""" right_border = left_idx end_time = t[left_idx] + self.max_duration while right_border + 1 <= r and t[right_border + 1] < end_time: right_border += 1 start_time = t[left_idx] + self.min_duration return right_border if t[right_border + 1] >= start_time else -1 # @jit(forceobj=True, looplift=True) def _preprocess(self, X: pd.DataFrame) -> pd.DataFrame: points = X[[self.x, self.y]].values dist = self._get_distances(points, self.distance) x = X[self.x].values y = X[self.y].values t = X[self.t].values dt = np.diff(t) vel = dist / (dt + self.eps) n = len(vel) fixations = np.zeros(n) fixation_id = 1 disp = np.full(n, -np.inf) # === IDT Algorithm === left_idx = 0 r = n - 1 # [left_idx, r] mintx, maxtx = self._build_sparse_tables(x) minty, maxty = self._build_sparse_tables(y) # restrictions on duration and dispersion are considered consequently while left_idx < n: # 1. Maintain the widest duration window using sliding window. # 2. Inside, find the widest dispersion window using binary search. # 3. Classify gazes from window in step 2 as fixation and continue. dur_right_border = self._get_dur_window(left_idx, r, t) if dur_right_border == -1: break disp_right_border, window_disp = self._get_disp_window( left_idx, dur_right_border, mintx, maxtx, minty, maxty ) # max_duration and max_dispersion satisfied if t[disp_right_border] - t[left_idx] < self.min_duration: left_idx += 1 continue fixations[left_idx:disp_right_border] = ( fixation_id # [left_idx, dur_right_border] is single fixation ) disp[left_idx:disp_right_border] = window_disp fixation_id += 1 left_idx = disp_right_border + 1 # === IDT Algorithm === fixations_df = pd.DataFrame( data={ "fixation_id": fixations, self.x: x[:-1], self.y: y[:-1], "start_time": t[:-1], "end_time": t[:-1], "distance_min": dist, "distance_max": dist, "dispersion": disp, } ) fixations_df = fixations_df[fixations_df["fixation_id"] != 0] diameters = [] centers = [] for i in set(fixations): if i != 0: points = fixations_df.loc[ fixations_df["fixation_id"] == i, fixations_df.columns.isin([self.x, self.y]), ].values x, y, radius = _get_MEC(np.unique(points, axis=0)) diameters.append(radius * 2) centers.append(np.array([x, y])) fixations_df = fixations_df.groupby(by=["fixation_id"]).agg( { self.x: "mean", self.y: "mean", "start_time": "min", "end_time": "max", "distance_min": "min", "distance_max": "max", "dispersion": "max", # just for API, window has same values } ) if len(fixations_df) <= 1: raise RuntimeError( f"Found only {len(fixations_df)} fixations, you either provided " f"infeasible constraints or have a mismatch of units in data " f"and constraints." ) fixations_df["diameters"] = diameters fixations_df["centers"] = centers # default features feats = ( "duration", "saccade_duration", "saccade_length", "saccade_angle", "saccade2_angle", ) fixations_df = self._compute_feats(fixations_df, feats) return fixations_df
[docs] class IHMM(BaseFixationPreprocessor): """Hidden Markov Model Identification. Based on Viterbi algorithm. Complexity: O(n^2) for single group. Args: fix2sac: probability of transition from fixation to saccade. sac2fix: probability of transition from saccade to fixation. fix_distrib: distribution of fixations. sac_distrib: distribution of saccades. distrib_params: 'auto' for default params or dict with {"fixation": params1, "saccade": params2}, where "params" are arguments for appropriate `scipy.stats` function. """ def __init__( self, x: str, y: str, t: str, fix2sac: float = 0.05, sac2fix: float = 0.05, fix_distrib: str = "norm", # fixation distribution sac_distrib: str = "norm", distrib_params: str | dict[str, float] = "auto", distance: str = "euc", pk: list[str] = None, eps: float = 1e-20, ): super().__init__(x=x, y=y, t=t, pk=pk) self.fix2sac = fix2sac self.sac2fix = sac2fix self.fix_distrib = fix_distrib self.sac_distrib = sac_distrib self.distrib_params = distrib_params self.distance = distance self.eps = eps self.available_distances = ("euc", "manhattan", "chebyshev") self.available_distributions = ("norm",) # TODO: add more def _check_params(self): m = "IHMM" assert self.x is not None, self._err_no_field(m, "x") assert self.y is not None, self._err_no_field(m, "y") assert self.t is not None, self._err_no_field(m, "t") # TODO wrap this error message assert self.distance in self.available_distances, ( f"'distance' must be one of ({', '.join(self.available_distances)})," f"got '{self.distance}'." ) assert self.fix_distrib in self.available_distributions, ( f"'fix_distrib' must be one of ({', '.join(self.available_distributions)})," f"got '{self.sac_distrib}'." ) assert self.sac_distrib in self.available_distributions, ( f"'sac_distrib' must be one of ({', '.join(self.available_distributions)})," f"got '{self.sac_distrib}'." ) assert isinstance(self.fix2sac, float) and ( 0.0 < self.fix2sac < 1.0 ), "'fix2sac' must be float between 0.0 and 1.0." assert isinstance(self.sac2fix, float) and ( 0.0 < self.sac2fix < 1.0 ), "'sac2fix' must be float between 0.0 and 1.0." def _get_distribution(self, ed, ep): if ed == "norm": return scipy.stats.norm(**ep) raise NotImplementedError(f"Distribution '{ed}' is not supported.") def _preprocess(self, X: pd.DataFrame) -> pd.DataFrame: points = X[[self.x, self.y]].values dist = self._get_distances(points, self.distance) x = X[self.x].values y = X[self.y].values t = X[self.t].values dt = np.diff(t) vel = dist / (dt + self.eps) states = ("fixation", "saccade") start_probs = {"fixation": 0.5, "saccade": 0.5} transition_probs = { "fixation": {"fixation": 1 - self.fix2sac, "saccade": self.fix2sac}, "saccade": {"fixation": self.sac2fix, "saccade": 1 - self.sac2fix}, } # fixations have lower velocities than saccades if self.distrib_params == "auto": # TODO commented approach does not work # quantiles = np.quantile(vel, q=[0.15, 0.35, 0.55, 0.85]) # fixation_mean = quantiles[0] # fixation_std = vel[vel <= quantiles[1]].std() # # dp = { # "fixation": { # "loc": fixation_mean, # "scale": fixation_std # }, # "saccade": { # "loc": vel.mean(), # "scale": vel.std() # } # } dp = { "fixation": {"loc": 0.002, "scale": 0.001}, "saccade": {"loc": 0.03, "scale": 0.005}, } else: dp = self.distrib_params fix_vel_distr = self._get_distribution(self.fix_distrib, dp["fixation"]) sac_vel_distr = self._get_distribution(self.sac_distrib, dp["saccade"]) emission_probs = { "fixation": lambda velocity: 1 - fix_vel_distr.cdf(velocity), "saccade": lambda velocity: sac_vel_distr.cdf(velocity), } _, best_path = self._viterbi( observations=vel, states=states, sp=start_probs, tp=transition_probs, ep=emission_probs, ) is_fixation = (best_path == "fixation").astype(np.int32) fixations = self._squash_fixations(is_fixation) fixations_df = pd.DataFrame( data={ "fixation_id": fixations, self.x: x[:-1], self.y: y[:-1], "start_time": t[:-1], "end_time": t[:-1], "distance_min": dist, "distance_max": dist, } ) fixations_df = fixations_df[fixations_df["fixation_id"] != 0] diameters = [] centers = [] for i in set(fixations): if i != 0: points = fixations_df.loc[ fixations_df["fixation_id"] == i, fixations_df.columns.isin([self.x, self.y]), ].values x, y, radius = _get_MEC(np.unique(points, axis=0)) diameters.append(radius * 2) centers.append(np.array([x, y])) fixations_df = fixations_df.groupby(by=["fixation_id"]).agg( { self.x: "mean", self.y: "mean", "start_time": "min", "end_time": "max", "distance_min": "min", "distance_max": "max", } ) fixations_df["diameters"] = diameters fixations_df["centers"] = centers # default features feats = ( "duration", "saccade_duration", "saccade_length", "saccade_angle", "saccade2_angle", ) fixations_df = self._compute_feats(fixations_df, feats) return fixations_df def _viterbi(self, observations, states, sp, tp, ep): """Computes hidden states vector Q s.t. probability of observing vector 'observations' given sp, tp, and ep is maximum.\n st ~ states\n sp ~ start_probs\n tp ~ transition_probs\n ep ~ emission_probs\n observations ~ velocities\n """ prev_probs_nlog = None best_path = [] if len(observations) == 0: return None for i, o_i in enumerate(observations): cur_probs_nlog = {} for j, q in enumerate(states): if i == 0: q_prob_nlog_max = -np.log(max(sp[q], self.eps)) else: q_prob_nlog_max = max( [prev_probs_nlog[q_] - np.log(tp[q_][q]) for q_ in states] ) cur_probs_nlog[q] = -np.log(max(ep[q](o_i), self.eps)) + q_prob_nlog_max best_path.append(max(cur_probs_nlog.keys(), key=cur_probs_nlog.get)) prev_probs_nlog = cur_probs_nlog best_path_prob = max(np.exp(-prev_probs_nlog[q]) for q in states) return best_path_prob, np.array(best_path)