Source code for eyefeatures.features.stats

from abc import abstractmethod
from typing import Any

import numpy as np
import pandas as pd
from numpy.typing import NDArray

from eyefeatures.features.extractor import BaseTransformer
from eyefeatures.utils import (
    Types,
    _calc_dt,
    _get_angle,
    _get_angle2,
    _select_regressions_by_ranges,
    _split_dataframe,
)


[docs] class StatsTransformer(BaseTransformer): """Base class for statistical features. Aggregate function strings must be compatible with `pandas`. Expected dataframe with fixations. Args: features_stats: Dictionary of format {'feature_1': ['statistic_1', 'statistic_2'], ...}. x: X coordinate column name. y: Y coordinate column name. t: timestamp column name. duration: duration column name (milliseconds expected). dispersion: fixation dispersion column name. aoi: Area Of Interest column name(-s). If provided, features can be calculated inside the specified AOI. calc_without_aoi: if True, then, in addition to AOI-wise features, calculate regular features ignoring AOI. pk: primary key. return_df: whether to return output as DataFrame or numpy array. warn: whether to enable warnings. """ def __init__( self, features_stats: dict[str, list[str]], x: str = None, y: str = None, t: str = None, duration: str = None, # TODO consider units, i.e. ps, ns, ms. dispersion: str = None, aoi: str | list[str] = None, calc_without_aoi: bool = False, pk: list[str] = None, return_df: bool = True, warn: bool = True, ): super().__init__( x=x, y=y, t=t, duration=duration, dispersion=dispersion, aoi=aoi, pk=pk, return_df=return_df, ) if features_stats is None: features_stats = {} self.features_stats = features_stats # feature -- i.e. saccade length/speed self.feature_names_in = list(features_stats.keys()) self.available_feats = None self.eps = 1e-20 self.aoi = aoi self.calc_without_aoi = calc_without_aoi self.aoi_mapper = None self.warn = warn self.feature_names_in_ = None @staticmethod def _err_no_col(f, c): return f"Requested feature {f} requires {c} for calculation." def _check_feature_names(self, X, *, reset): """""" # Since feature names must always be provided with statistics to # calculate and are restricted to certain set of available features, # this method is irrelevant raise AttributeError("Use '_check_features_stats' instead.") def _validate_data( self, X="no_validation", y="no_validation", reset=True, validate_separately=False, cast_to_ndarray=True, **check_params, ): """""" # Same reason as for _check_feature_names raise AttributeError("This method must not be used.") @abstractmethod def _check_params(self): """ Method checks that all requested features could be calculated with provided data. """ ... def _check_features_stats(self): """ Method checks `self.features_stats` for correct feature names (i.e. keys). """ def err_msg(f): return ( f"Feature '{f}' is not supported. Must be one of: " f"{', '.join(self.available_feats)}." ) for feat in self.feature_names_in: assert feat in self.available_feats, err_msg(feat) self._check_params() # method called on fit def _check_aoi_fit(self, X): if self.aoi is not None: # check if aoi columns contain any NaNs assert isinstance(self.aoi, str) or isinstance( self.aoi, list ), f"`aoi` must be str or List[str], got {type(self.aoi)}." if isinstance(self.aoi, str): self.aoi = [self.aoi] assert ( "" not in self.aoi ), 'Empty string "" as value in `aoi` columns is not allowed.' for aoi_col in self.aoi: if aoi_col != "": aoi_view = X[aoi_col] if aoi_view.isnull().values.any(): raise RuntimeError( f"Passed column '{aoi_col}' for AOI contains NaNs." ) self._preprocess_aoi(X) def _preprocess_aoi(self, X: pd.DataFrame): if self.aoi is not None: self.aoi_mapper = {} if self.calc_without_aoi: self.aoi_mapper[""] = [""] for aoi_col in self.aoi: aoi_view = X[aoi_col] self.aoi_mapper[aoi_col] = aoi_view.drop_duplicates().values.tolist() else: self.aoi_mapper = {"": [""]} # convenience placeholder # method called on transform def _check_aoi_transform(self, X: pd.DataFrame): if self.aoi is not None: # check if aoi column contains any NaNs assert ( "" not in self.aoi ), 'Empty string "" as value in `aoi` columns is not allowed.' for aoi_col in self.aoi: if aoi_col == "": # lib placeholder continue aoi_view = X[aoi_col] if aoi_view.isnull().values.any(): raise RuntimeError( f"Passed column '{aoi_col}' for AOI contains NaNs." ) for v in aoi_view: assert ( v in self.aoi_mapper[aoi_col] ), f"Unknown AOI value {v} was not seen during `fit` in \ '{aoi_col}'." @property @abstractmethod def _fp(self) -> str: """ Feature prefix to use in feature names. """ ... @abstractmethod def _calc_feats( self, X: pd.DataFrame, features: list[str], transition_mask: NDArray ) -> list[tuple[str, pd.Series]]: """ Method calculates features passed to constructor, i.e. keys of `self.features_stats`. In case of `SaccadeFeatures`, it returns dictionary `{'length': np.array, 'velocity': np.array, ...}`. `transition_mask` is boolean mask of the same shape as X, i-th value is False if X's i-th value is first fixation in block. Block is defined as sequential fixations in same AOI, maximum by inclusion (which means that block cannot contain another block). Thus, each AOI is split in blocks and first fixation in each block is then removed. """ ... def _calc_with_aoi( self, feat_nms: list[str], X: pd.DataFrame, aoi_col: str, aoi_val: Any ) -> list[tuple[str, pd.Series]]: """ Helper function to calculate features based on `aoi_col` (name for aoi column) and `aoi_val` (aoi value in this column). """ if aoi_col == "": # internal lib placeholder transition_mask = np.ones(len(X)).astype(bool) feats: list[tuple[str, pd.Series]] = self._calc_feats( X, feat_nms, transition_mask ) else: X_aoi = X[X[aoi_col] == aoi_val] all_aoi = X[aoi_col] all_transition_mask: pd.Series = all_aoi == all_aoi.shift(1) transition_mask = all_transition_mask[all_aoi == aoi_val].values feats: list[tuple[str, pd.Series]] = self._calc_feats( X_aoi, feat_nms, transition_mask ) return feats
[docs] def get_feature_names_out(self, input_features=None) -> list[str]: if self.feature_names_in_ is not None: return self.feature_names_in_ # Fallback for unfitted transformer if no AOI is used (otherwise it's data-dependent) if self.aoi is None: names = [] for feat_nm in self.features_stats: for stat in self.features_stats[feat_nm]: names.append(f"{self._fp}_{feat_nm}_{stat}") return names # If AOI is used, we need fit() to know the AOI values return []
def fit(self, X: pd.DataFrame, y=None): self._check_features_stats() self._check_aoi_fit(X) # all output features that will appear on transform self.feature_names_in_ = [] for aoi_col in self.aoi_mapper: for aoi_val in self.aoi_mapper[aoi_col]: for feat_nm in self.features_stats: for stat in self.features_stats[feat_nm]: self.feature_names_in_.append( f"{self._fp}_{feat_nm}_{aoi_col}[{aoi_val}]_{stat}" if aoi_col != "" else f"{self._fp}_{feat_nm}_{stat}" ) return self def transform(self, X: pd.DataFrame) -> pd.DataFrame | NDArray: if self.features_stats is None: return X if self.return_df else X.values self._check_aoi_transform(X) feat_nms = list(self.features_stats.keys()) gathered_stats = [] column_nms = [] if self.pk is None: groups: Types.EncodedPartition = [("0", X)] else: groups: Types.EncodedPartition = _split_dataframe( X, self.pk ) # split by unique groups group_ids = [] for group_id, group_X in groups: group_ids.append(group_id) gathered_stats_group = [] for aoi_col in self.aoi_mapper: for aoi_val in self.aoi_mapper[aoi_col]: group_feats: list[tuple[str, pd.Series]] = self._calc_with_aoi( feat_nms, group_X, aoi_col, aoi_val ) add_cols_nms = len(group_ids) == 1 for feat_nm, feat_arr in group_feats: feat_stats: list[str] = self.features_stats[feat_nm] if not feat_arr.empty: # group_X with AOI was not empty stats_group = [ feat_arr.agg(func=stat) for stat in feat_stats ] else: # no AOI for given group stats_group = [None for _ in feat_stats] gathered_stats_group.extend(stats_group) # TODO remove, have self.features_names_in_ on fit. # Serves as sanity check for ordering of features names. if add_cols_nms: column_nms.extend( [ ( f"{self._fp}_{feat_nm}_{aoi_col}[{aoi_val}]_{stat}" if aoi_col != "" else f"{self._fp}_{feat_nm}_{stat}" ) for stat in feat_stats ] ) gathered_stats.append(gathered_stats_group) assert len(self.feature_names_in_) == len(column_nms) for i in range(len(column_nms)): assert ( self.feature_names_in_[i] == column_nms[i] ), f"Fit: {self.feature_names_in_}\nTransform: {column_nms}." stats_df = pd.DataFrame( data=gathered_stats, columns=column_nms, index=group_ids ) return stats_df if self.return_df else stats_df.values
[docs] class SaccadeFeatures(StatsTransformer): """Saccade Features Transformer. The transformer identifies saccades from fixations and extract their features. """ def __init__(self, features_stats=None, **kwargs): available_feats = ( "length", "acceleration", "speed", "direction_angle", "rotation_angle", ) if features_stats is None: features_stats = { feat: ["min", "max", "mean", "std"] for feat in available_feats } super().__init__(features_stats=features_stats, **kwargs) self.available_feats = available_feats @property def _fp(self) -> str: return "sac" def _check_params(self): for feat in self.feature_names_in: assert self.x is not None, self._err_no_col(feat, "x") assert self.y is not None, self._err_no_col(feat, "y") if feat in ("speed", "acceleration"): assert self.t is not None, self._err_no_col(feat, "t") def _calc_feats( self, X: pd.DataFrame, features: list[str], transition_mask: NDArray ) -> list[tuple[str, pd.Series]]: feats = [] dx: pd.Series = X[self.x].diff() dy: pd.Series = X[self.y].diff() dr = np.sqrt(dx**2 + dy**2) dt = ( _calc_dt(X, self.duration, self.t) if any(f != "length" for f in features) else None ) for feat_nm in features: if feat_nm == "length": sac_len = dr feat_arr = sac_len[transition_mask] elif feat_nm == "acceleration": # Acceleration: dx = v0 * t + 1/2 * a * t^2. # Above formula is law of uniformly accelerated motion # TODO consider direction sac_acc: pd.DataFrame = dr / (dt**2 + self.eps) * 1 / 2 feat_arr = sac_acc[transition_mask] elif feat_nm == "speed": sac_spd = dr / (dt + self.eps) feat_arr = sac_spd[transition_mask] elif feat_nm == "direction_angle": angles = pd.Series( [ _get_angle(dx_val, dy_val) for dx_val, dy_val in zip(dx, dy, strict=False) ], index=dx.index, ) feat_arr = angles[transition_mask] elif feat_nm == "rotation_angle": angles = pd.Series( [ _get_angle2(dx1, dy1, dx2, dy2) for dx1, dy1, dx2, dy2 in zip( dx.shift(1), dy.shift(1), dx, dy, strict=False ) ], index=dx.index, ) feat_arr = angles[transition_mask] else: raise NotImplementedError(feat_nm) feats.append((feat_nm, feat_arr)) return feats
[docs] class RegressionFeatures(StatsTransformer): """Regression Features Transformer. The transformer identifies saccades, and then selects regressions from them using user-defined set of ranges. Args: ranges: tuple of tuples (l, r), where l and r are angles in degrees between 0 and 360 such that l <= r. If one wants a range that passes 360 degrees, they could use two ranges like ((270, 360), (0, 90)). Default: ((135, 225),) which corresponds to left-wards movements. Example: Quick start with default parameters:: from eyefeatures.features.stats import RegressionFeatures # Detect regressions in [-90, 90] degrees. transformer = RegressionFeatures( features_stats={"length": ["mean", "std"]}, x="x", y="y", t="time", ranges=((270, 360), (0, 90)) ) features = transformer.fit_transform(fixations_df) """ def __init__( self, ranges: tuple[tuple[float, float], ...] = ((135, 225),), features_stats: dict[str, list[str]] = None, **kwargs, ): available_feats = ( "length", "acceleration", "speed", "direction_angle", "rotation_angle", "mask", ) if features_stats is None: features_stats = { feat: ["min", "max", "mean", "std"] for feat in available_feats } super().__init__(features_stats=features_stats, **kwargs) self.available_feats = available_feats self.ranges = ranges @property def _fp(self) -> str: return "reg" def _check_params(self): for r in self.ranges: assert len(r) == 2, f"Range must be tuple of length 2, got {r}." assert r[0] <= r[1], ( f"Range must be (l, r) where l <= r, got {r}. " f"If you want to cross 360, split into two ranges." ) assert ( 0 <= r[0] <= 360 and 0 <= r[1] <= 360 ), f"Angles must be between 0 and 360, got {r}." for feat in self.feature_names_in: assert self.x is not None, self._err_no_col(feat, "x") assert self.y is not None, self._err_no_col(feat, "y") if feat in ("speed", "acceleration"): assert self.t is not None self._err_no_col(feat, "t") def _calc_feats( self, X: pd.DataFrame, features: list[str], transition_mask: NDArray ) -> list[tuple[str, pd.Series]]: feats = [] dx: pd.Series = X[self.x].diff() dy: pd.Series = X[self.y].diff() sm = _select_regressions_by_ranges(dx, dy, self.ranges) # selection_mask dr = np.sqrt(dx**2 + dy**2) dt = ( _calc_dt(X, self.duration, self.t) if any(f != "length" for f in features) else None ) tm = transition_mask[sm] for feat_nm in features: if feat_nm == "length": sac_len = dr feat_arr = sac_len[sm][tm] elif feat_nm == "acceleration": sac_acc: pd.DataFrame = dr / (dt**2 + self.eps) * 1 / 2 feat_arr = sac_acc[sm][tm] elif feat_nm == "speed": sac_spd = dr / (dt + self.eps) feat_arr = sac_spd[sm][tm] elif feat_nm == "direction_angle": angles = pd.Series( [ _get_angle(dx_val, dy_val) for dx_val, dy_val in zip(dx, dy, strict=False) ], index=dx.index, ) feat_arr = angles[sm][tm] elif feat_nm == "rotation_angle": angles = pd.Series( [ _get_angle2(dx1, dy1, dx2, dy2) for dx1, dy1, dx2, dy2 in zip( dx.shift(1), dy.shift(1), dx, dy, strict=False ) ], index=dx.index, ) feat_arr = angles[sm][tm] elif feat_nm == "mask": feat_arr = pd.Series(sm) else: raise NotImplementedError(feat_nm) feats.append((feat_nm, feat_arr)) return feats
[docs] class MicroSaccadeFeatures(StatsTransformer): """Micro Saccade Features. The transformer identities saccades, and then selects micro saccades from them using user-defined set of rules. Args: min_dispersion: minimum dispersion of fixation. max_speed: maximum speed between fixations. """ def __init__( self, min_dispersion: float = 1.0, max_speed: float = 10.0, features_stats: dict[str, list[str]] = None, **kwargs, ): available_feats = ( "length", "acceleration", "speed", "direction_angle", "rotation_angle", "mask", ) if features_stats is None: features_stats = { feat: ["min", "max", "mean", "std"] for feat in available_feats } super().__init__(features_stats=features_stats, **kwargs) self.available_feats = available_feats self.min_dispersion = min_dispersion self.max_speed = max_speed @property def _fp(self) -> str: return "microsac" def _check_params(self): for feat in self.feature_names_in: assert self.x is not None, self._err_no_col(feat, "x") assert self.y is not None, self._err_no_col(feat, "y") assert self.dispersion is not None, self._err_no_col(feat, "dispersion") if feat in ("speed", "acceleration"): assert self.t is not None self._err_no_col(feat, "t") def _calc_feats( self, X: pd.DataFrame, features: list[str], transition_mask: NDArray ) -> list[tuple[str, pd.Series]]: feats = [] dx: pd.Series = X[self.x].diff() dy: pd.Series = X[self.y].diff() dr = np.sqrt(dx**2 + dy**2) # selection_mask sm = (dr < self.max_speed) & (X[self.dispersion] > self.min_dispersion) dt = ( _calc_dt(X, self.duration, self.t) if any(f != "length" for f in features) else None ) tm = transition_mask[sm] for feat_nm in features: if feat_nm == "length": sac_len = dr feat_arr = sac_len[sm][tm] elif feat_nm == "acceleration": sac_acc: pd.DataFrame = dr / (dt**2 + self.eps) * 1 / 2 feat_arr = sac_acc[sm][tm] elif feat_nm == "speed": sac_spd = dr / (dt + self.eps) feat_arr = sac_spd[sm][tm] elif feat_nm == "direction_angle": angles = pd.Series( [ _get_angle(dx_val, dy_val) for dx_val, dy_val in zip(dx, dy, strict=False) ], index=dx.index, ) feat_arr = angles[sm][tm] elif feat_nm == "rotation_angle": angles = pd.Series( [ _get_angle2(dx1, dy1, dx2, dy2) for dx1, dy1, dx2, dy2 in zip( dx.shift(1), dy.shift(1), dx, dy, strict=False ) ], index=dx.index, ) feat_arr = angles[sm][tm] elif feat_nm == "mask": feat_arr = pd.Series(sm) else: raise NotImplementedError(feat_nm) feats.append((feat_nm, feat_arr)) return feats
[docs] class FixationFeatures(StatsTransformer): """Fixation Features Transformer. The transformer uses input fixations to extract features. """ def __init__(self, features_stats=None, **kwargs): available_feats = ("duration", "vad") if features_stats is None: features_stats = { feat: ["min", "max", "mean", "std"] for feat in available_feats } super().__init__(features_stats=features_stats, **kwargs) self.available_feats = available_feats @property def _fp(self) -> str: return "fix" def _check_params(self): for feat in self.feature_names_in: if feat == "duration": assert self.duration is not None, self._err_no_col(feat, "duration") elif feat == "vad": assert self.dispersion is not None, self._err_no_col(feat, "dispersion") def _calc_feats( self, X: pd.DataFrame, features: list[str], transition_mask: NDArray ) -> list[tuple[str, pd.Series]]: feats = [] for feat_nm in features: if feat_nm == "duration": feat_arr = X[self.duration][transition_mask] elif feat_nm == "vad": feat_arr = X[self.dispersion][transition_mask] else: raise NotImplementedError(feat_nm) feats.append((feat_nm, feat_arr)) return feats