Source code for eyefeatures.features.stats
from abc import abstractmethod
from typing import Any
import numpy as np
import pandas as pd
from numpy.typing import NDArray
from eyefeatures.features.extractor import BaseTransformer
from eyefeatures.utils import (
Types,
_calc_dt,
_get_angle,
_get_angle2,
_select_regressions_by_ranges,
_split_dataframe,
)
[docs]
class StatsTransformer(BaseTransformer):
"""Base class for statistical features. Aggregate function strings must be
compatible with `pandas`. Expected dataframe with fixations.
Args:
features_stats: Dictionary of format
{'feature_1': ['statistic_1', 'statistic_2'], ...}.
x: X coordinate column name.
y: Y coordinate column name.
t: timestamp column name.
duration: duration column name (milliseconds expected).
dispersion: fixation dispersion column name.
aoi: Area Of Interest column name(-s). If provided, features
can be calculated inside the specified AOI.
calc_without_aoi: if True, then, in addition to AOI-wise features,
calculate regular features ignoring AOI.
pk: primary key.
return_df: whether to return output as DataFrame or numpy array.
warn: whether to enable warnings.
"""
def __init__(
self,
features_stats: dict[str, list[str]],
x: str = None,
y: str = None,
t: str = None,
duration: str = None, # TODO consider units, i.e. ps, ns, ms.
dispersion: str = None,
aoi: str | list[str] = None,
calc_without_aoi: bool = False,
pk: list[str] = None,
return_df: bool = True,
warn: bool = True,
):
super().__init__(
x=x,
y=y,
t=t,
duration=duration,
dispersion=dispersion,
aoi=aoi,
pk=pk,
return_df=return_df,
)
if features_stats is None:
features_stats = {}
self.features_stats = features_stats
# feature -- i.e. saccade length/speed
self.feature_names_in = list(features_stats.keys())
self.available_feats = None
self.eps = 1e-20
self.aoi = aoi
self.calc_without_aoi = calc_without_aoi
self.aoi_mapper = None
self.warn = warn
self.feature_names_in_ = None
@staticmethod
def _err_no_col(f, c):
return f"Requested feature {f} requires {c} for calculation."
def _check_feature_names(self, X, *, reset):
""""""
# Since feature names must always be provided with statistics to
# calculate and are restricted to certain set of available features,
# this method is irrelevant
raise AttributeError("Use '_check_features_stats' instead.")
def _validate_data(
self,
X="no_validation",
y="no_validation",
reset=True,
validate_separately=False,
cast_to_ndarray=True,
**check_params,
):
""""""
# Same reason as for _check_feature_names
raise AttributeError("This method must not be used.")
@abstractmethod
def _check_params(self):
"""
Method checks that all requested features could be calculated
with provided data.
"""
...
def _check_features_stats(self):
"""
Method checks `self.features_stats` for correct feature names (i.e. keys).
"""
def err_msg(f):
return (
f"Feature '{f}' is not supported. Must be one of: "
f"{', '.join(self.available_feats)}."
)
for feat in self.feature_names_in:
assert feat in self.available_feats, err_msg(feat)
self._check_params()
# method called on fit
def _check_aoi_fit(self, X):
if self.aoi is not None: # check if aoi columns contain any NaNs
assert isinstance(self.aoi, str) or isinstance(
self.aoi, list
), f"`aoi` must be str or List[str], got {type(self.aoi)}."
if isinstance(self.aoi, str):
self.aoi = [self.aoi]
assert (
"" not in self.aoi
), 'Empty string "" as value in `aoi` columns is not allowed.'
for aoi_col in self.aoi:
if aoi_col != "":
aoi_view = X[aoi_col]
if aoi_view.isnull().values.any():
raise RuntimeError(
f"Passed column '{aoi_col}' for AOI contains NaNs."
)
self._preprocess_aoi(X)
def _preprocess_aoi(self, X: pd.DataFrame):
if self.aoi is not None:
self.aoi_mapper = {}
if self.calc_without_aoi:
self.aoi_mapper[""] = [""]
for aoi_col in self.aoi:
aoi_view = X[aoi_col]
self.aoi_mapper[aoi_col] = aoi_view.drop_duplicates().values.tolist()
else:
self.aoi_mapper = {"": [""]} # convenience placeholder
# method called on transform
def _check_aoi_transform(self, X: pd.DataFrame):
if self.aoi is not None: # check if aoi column contains any NaNs
assert (
"" not in self.aoi
), 'Empty string "" as value in `aoi` columns is not allowed.'
for aoi_col in self.aoi:
if aoi_col == "": # lib placeholder
continue
aoi_view = X[aoi_col]
if aoi_view.isnull().values.any():
raise RuntimeError(
f"Passed column '{aoi_col}' for AOI contains NaNs."
)
for v in aoi_view:
assert (
v in self.aoi_mapper[aoi_col]
), f"Unknown AOI value {v} was not seen during `fit` in \
'{aoi_col}'."
@property
@abstractmethod
def _fp(self) -> str:
"""
Feature prefix to use in feature names.
"""
...
@abstractmethod
def _calc_feats(
self, X: pd.DataFrame, features: list[str], transition_mask: NDArray
) -> list[tuple[str, pd.Series]]:
"""
Method calculates features passed to constructor, i.e. keys of
`self.features_stats`. In case of `SaccadeFeatures`, it returns
dictionary `{'length': np.array, 'velocity': np.array, ...}`.
`transition_mask` is boolean mask of the same shape as X, i-th value
is False if X's i-th value is first fixation in block. Block is
defined as sequential fixations in same AOI, maximum by inclusion
(which means that block cannot contain another block). Thus, each
AOI is split in blocks and first fixation in each block is then removed.
"""
...
def _calc_with_aoi(
self, feat_nms: list[str], X: pd.DataFrame, aoi_col: str, aoi_val: Any
) -> list[tuple[str, pd.Series]]:
"""
Helper function to calculate features based on `aoi_col` (name for aoi column)
and `aoi_val` (aoi value in this column).
"""
if aoi_col == "": # internal lib placeholder
transition_mask = np.ones(len(X)).astype(bool)
feats: list[tuple[str, pd.Series]] = self._calc_feats(
X, feat_nms, transition_mask
)
else:
X_aoi = X[X[aoi_col] == aoi_val]
all_aoi = X[aoi_col]
all_transition_mask: pd.Series = all_aoi == all_aoi.shift(1)
transition_mask = all_transition_mask[all_aoi == aoi_val].values
feats: list[tuple[str, pd.Series]] = self._calc_feats(
X_aoi, feat_nms, transition_mask
)
return feats
[docs]
def get_feature_names_out(self, input_features=None) -> list[str]:
if self.feature_names_in_ is not None:
return self.feature_names_in_
# Fallback for unfitted transformer if no AOI is used (otherwise it's data-dependent)
if self.aoi is None:
names = []
for feat_nm in self.features_stats:
for stat in self.features_stats[feat_nm]:
names.append(f"{self._fp}_{feat_nm}_{stat}")
return names
# If AOI is used, we need fit() to know the AOI values
return []
def fit(self, X: pd.DataFrame, y=None):
self._check_features_stats()
self._check_aoi_fit(X)
# all output features that will appear on transform
self.feature_names_in_ = []
for aoi_col in self.aoi_mapper:
for aoi_val in self.aoi_mapper[aoi_col]:
for feat_nm in self.features_stats:
for stat in self.features_stats[feat_nm]:
self.feature_names_in_.append(
f"{self._fp}_{feat_nm}_{aoi_col}[{aoi_val}]_{stat}"
if aoi_col != ""
else f"{self._fp}_{feat_nm}_{stat}"
)
return self
def transform(self, X: pd.DataFrame) -> pd.DataFrame | NDArray:
if self.features_stats is None:
return X if self.return_df else X.values
self._check_aoi_transform(X)
feat_nms = list(self.features_stats.keys())
gathered_stats = []
column_nms = []
if self.pk is None:
groups: Types.EncodedPartition = [("0", X)]
else:
groups: Types.EncodedPartition = _split_dataframe(
X, self.pk
) # split by unique groups
group_ids = []
for group_id, group_X in groups:
group_ids.append(group_id)
gathered_stats_group = []
for aoi_col in self.aoi_mapper:
for aoi_val in self.aoi_mapper[aoi_col]:
group_feats: list[tuple[str, pd.Series]] = self._calc_with_aoi(
feat_nms, group_X, aoi_col, aoi_val
)
add_cols_nms = len(group_ids) == 1
for feat_nm, feat_arr in group_feats:
feat_stats: list[str] = self.features_stats[feat_nm]
if not feat_arr.empty: # group_X with AOI was not empty
stats_group = [
feat_arr.agg(func=stat) for stat in feat_stats
]
else: # no AOI for given group
stats_group = [None for _ in feat_stats]
gathered_stats_group.extend(stats_group)
# TODO remove, have self.features_names_in_ on fit.
# Serves as sanity check for ordering of features names.
if add_cols_nms:
column_nms.extend(
[
(
f"{self._fp}_{feat_nm}_{aoi_col}[{aoi_val}]_{stat}"
if aoi_col != ""
else f"{self._fp}_{feat_nm}_{stat}"
)
for stat in feat_stats
]
)
gathered_stats.append(gathered_stats_group)
assert len(self.feature_names_in_) == len(column_nms)
for i in range(len(column_nms)):
assert (
self.feature_names_in_[i] == column_nms[i]
), f"Fit: {self.feature_names_in_}\nTransform: {column_nms}."
stats_df = pd.DataFrame(
data=gathered_stats, columns=column_nms, index=group_ids
)
return stats_df if self.return_df else stats_df.values
[docs]
class SaccadeFeatures(StatsTransformer):
"""Saccade Features Transformer.
The transformer identifies saccades from fixations and extract
their features.
"""
def __init__(self, features_stats=None, **kwargs):
available_feats = (
"length",
"acceleration",
"speed",
"direction_angle",
"rotation_angle",
)
if features_stats is None:
features_stats = {
feat: ["min", "max", "mean", "std"] for feat in available_feats
}
super().__init__(features_stats=features_stats, **kwargs)
self.available_feats = available_feats
@property
def _fp(self) -> str:
return "sac"
def _check_params(self):
for feat in self.feature_names_in:
assert self.x is not None, self._err_no_col(feat, "x")
assert self.y is not None, self._err_no_col(feat, "y")
if feat in ("speed", "acceleration"):
assert self.t is not None, self._err_no_col(feat, "t")
def _calc_feats(
self, X: pd.DataFrame, features: list[str], transition_mask: NDArray
) -> list[tuple[str, pd.Series]]:
feats = []
dx: pd.Series = X[self.x].diff()
dy: pd.Series = X[self.y].diff()
dr = np.sqrt(dx**2 + dy**2)
dt = (
_calc_dt(X, self.duration, self.t)
if any(f != "length" for f in features)
else None
)
for feat_nm in features:
if feat_nm == "length":
sac_len = dr
feat_arr = sac_len[transition_mask]
elif feat_nm == "acceleration":
# Acceleration: dx = v0 * t + 1/2 * a * t^2.
# Above formula is law of uniformly accelerated motion
# TODO consider direction
sac_acc: pd.DataFrame = dr / (dt**2 + self.eps) * 1 / 2
feat_arr = sac_acc[transition_mask]
elif feat_nm == "speed":
sac_spd = dr / (dt + self.eps)
feat_arr = sac_spd[transition_mask]
elif feat_nm == "direction_angle":
angles = pd.Series(
[
_get_angle(dx_val, dy_val)
for dx_val, dy_val in zip(dx, dy, strict=False)
],
index=dx.index,
)
feat_arr = angles[transition_mask]
elif feat_nm == "rotation_angle":
angles = pd.Series(
[
_get_angle2(dx1, dy1, dx2, dy2)
for dx1, dy1, dx2, dy2 in zip(
dx.shift(1), dy.shift(1), dx, dy, strict=False
)
],
index=dx.index,
)
feat_arr = angles[transition_mask]
else:
raise NotImplementedError(feat_nm)
feats.append((feat_nm, feat_arr))
return feats
[docs]
class RegressionFeatures(StatsTransformer):
"""Regression Features Transformer.
The transformer identifies saccades, and then selects regressions
from them using user-defined set of ranges.
Args:
ranges: tuple of tuples (l, r), where l and r are angles in degrees
between 0 and 360 such that l <= r. If one wants a range that
passes 360 degrees, they could use two ranges like
((270, 360), (0, 90)). Default: ((135, 225),) which corresponds to
left-wards movements.
Example:
Quick start with default parameters::
from eyefeatures.features.stats import RegressionFeatures
# Detect regressions in [-90, 90] degrees.
transformer = RegressionFeatures(
features_stats={"length": ["mean", "std"]},
x="x", y="y", t="time",
ranges=((270, 360), (0, 90))
)
features = transformer.fit_transform(fixations_df)
"""
def __init__(
self,
ranges: tuple[tuple[float, float], ...] = ((135, 225),),
features_stats: dict[str, list[str]] = None,
**kwargs,
):
available_feats = (
"length",
"acceleration",
"speed",
"direction_angle",
"rotation_angle",
"mask",
)
if features_stats is None:
features_stats = {
feat: ["min", "max", "mean", "std"] for feat in available_feats
}
super().__init__(features_stats=features_stats, **kwargs)
self.available_feats = available_feats
self.ranges = ranges
@property
def _fp(self) -> str:
return "reg"
def _check_params(self):
for r in self.ranges:
assert len(r) == 2, f"Range must be tuple of length 2, got {r}."
assert r[0] <= r[1], (
f"Range must be (l, r) where l <= r, got {r}. "
f"If you want to cross 360, split into two ranges."
)
assert (
0 <= r[0] <= 360 and 0 <= r[1] <= 360
), f"Angles must be between 0 and 360, got {r}."
for feat in self.feature_names_in:
assert self.x is not None, self._err_no_col(feat, "x")
assert self.y is not None, self._err_no_col(feat, "y")
if feat in ("speed", "acceleration"):
assert self.t is not None
self._err_no_col(feat, "t")
def _calc_feats(
self, X: pd.DataFrame, features: list[str], transition_mask: NDArray
) -> list[tuple[str, pd.Series]]:
feats = []
dx: pd.Series = X[self.x].diff()
dy: pd.Series = X[self.y].diff()
sm = _select_regressions_by_ranges(dx, dy, self.ranges) # selection_mask
dr = np.sqrt(dx**2 + dy**2)
dt = (
_calc_dt(X, self.duration, self.t)
if any(f != "length" for f in features)
else None
)
tm = transition_mask[sm]
for feat_nm in features:
if feat_nm == "length":
sac_len = dr
feat_arr = sac_len[sm][tm]
elif feat_nm == "acceleration":
sac_acc: pd.DataFrame = dr / (dt**2 + self.eps) * 1 / 2
feat_arr = sac_acc[sm][tm]
elif feat_nm == "speed":
sac_spd = dr / (dt + self.eps)
feat_arr = sac_spd[sm][tm]
elif feat_nm == "direction_angle":
angles = pd.Series(
[
_get_angle(dx_val, dy_val)
for dx_val, dy_val in zip(dx, dy, strict=False)
],
index=dx.index,
)
feat_arr = angles[sm][tm]
elif feat_nm == "rotation_angle":
angles = pd.Series(
[
_get_angle2(dx1, dy1, dx2, dy2)
for dx1, dy1, dx2, dy2 in zip(
dx.shift(1), dy.shift(1), dx, dy, strict=False
)
],
index=dx.index,
)
feat_arr = angles[sm][tm]
elif feat_nm == "mask":
feat_arr = pd.Series(sm)
else:
raise NotImplementedError(feat_nm)
feats.append((feat_nm, feat_arr))
return feats
[docs]
class MicroSaccadeFeatures(StatsTransformer):
"""Micro Saccade Features.
The transformer identities saccades, and then selects micro saccades
from them using user-defined set of rules.
Args:
min_dispersion: minimum dispersion of fixation.
max_speed: maximum speed between fixations.
"""
def __init__(
self,
min_dispersion: float = 1.0,
max_speed: float = 10.0,
features_stats: dict[str, list[str]] = None,
**kwargs,
):
available_feats = (
"length",
"acceleration",
"speed",
"direction_angle",
"rotation_angle",
"mask",
)
if features_stats is None:
features_stats = {
feat: ["min", "max", "mean", "std"] for feat in available_feats
}
super().__init__(features_stats=features_stats, **kwargs)
self.available_feats = available_feats
self.min_dispersion = min_dispersion
self.max_speed = max_speed
@property
def _fp(self) -> str:
return "microsac"
def _check_params(self):
for feat in self.feature_names_in:
assert self.x is not None, self._err_no_col(feat, "x")
assert self.y is not None, self._err_no_col(feat, "y")
assert self.dispersion is not None, self._err_no_col(feat, "dispersion")
if feat in ("speed", "acceleration"):
assert self.t is not None
self._err_no_col(feat, "t")
def _calc_feats(
self, X: pd.DataFrame, features: list[str], transition_mask: NDArray
) -> list[tuple[str, pd.Series]]:
feats = []
dx: pd.Series = X[self.x].diff()
dy: pd.Series = X[self.y].diff()
dr = np.sqrt(dx**2 + dy**2)
# selection_mask
sm = (dr < self.max_speed) & (X[self.dispersion] > self.min_dispersion)
dt = (
_calc_dt(X, self.duration, self.t)
if any(f != "length" for f in features)
else None
)
tm = transition_mask[sm]
for feat_nm in features:
if feat_nm == "length":
sac_len = dr
feat_arr = sac_len[sm][tm]
elif feat_nm == "acceleration":
sac_acc: pd.DataFrame = dr / (dt**2 + self.eps) * 1 / 2
feat_arr = sac_acc[sm][tm]
elif feat_nm == "speed":
sac_spd = dr / (dt + self.eps)
feat_arr = sac_spd[sm][tm]
elif feat_nm == "direction_angle":
angles = pd.Series(
[
_get_angle(dx_val, dy_val)
for dx_val, dy_val in zip(dx, dy, strict=False)
],
index=dx.index,
)
feat_arr = angles[sm][tm]
elif feat_nm == "rotation_angle":
angles = pd.Series(
[
_get_angle2(dx1, dy1, dx2, dy2)
for dx1, dy1, dx2, dy2 in zip(
dx.shift(1), dy.shift(1), dx, dy, strict=False
)
],
index=dx.index,
)
feat_arr = angles[sm][tm]
elif feat_nm == "mask":
feat_arr = pd.Series(sm)
else:
raise NotImplementedError(feat_nm)
feats.append((feat_nm, feat_arr))
return feats
[docs]
class FixationFeatures(StatsTransformer):
"""Fixation Features Transformer.
The transformer uses input fixations to extract features.
"""
def __init__(self, features_stats=None, **kwargs):
available_feats = ("duration", "vad")
if features_stats is None:
features_stats = {
feat: ["min", "max", "mean", "std"] for feat in available_feats
}
super().__init__(features_stats=features_stats, **kwargs)
self.available_feats = available_feats
@property
def _fp(self) -> str:
return "fix"
def _check_params(self):
for feat in self.feature_names_in:
if feat == "duration":
assert self.duration is not None, self._err_no_col(feat, "duration")
elif feat == "vad":
assert self.dispersion is not None, self._err_no_col(feat, "dispersion")
def _calc_feats(
self, X: pd.DataFrame, features: list[str], transition_mask: NDArray
) -> list[tuple[str, pd.Series]]:
feats = []
for feat_nm in features:
if feat_nm == "duration":
feat_arr = X[self.duration][transition_mask]
elif feat_nm == "vad":
feat_arr = X[self.dispersion][transition_mask]
else:
raise NotImplementedError(feat_nm)
feats.append((feat_nm, feat_arr))
return feats