Source code for eyefeatures.features.extractor

import warnings
from abc import abstractmethod
from collections.abc import Callable
from typing import Any

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm import tqdm

from eyefeatures.utils import _get_id, _get_objs, _split_dataframe


class BaseTransformer(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        x: str = None,
        y: str = None,
        t: str = None,
        duration: str = None,
        dispersion: str = None,
        aoi: str = None,
        path_pk: list[str] = None,
        pk: list[str] = None,
        expected_paths: dict[str, pd.DataFrame] = None,
        fill_path: pd.DataFrame = None,
        expected_paths_method: str = "mean",
        warn: bool = True,
        return_df: bool = True,
    ):
        self.x = x
        self.y = y
        self.t = t
        self.duration = duration
        self.dispersion = dispersion
        self.path_pk = path_pk
        self.pk = pk
        self.aoi = aoi
        self.warn = warn
        self.return_df = return_df
        self.expected_paths = expected_paths
        self.fill_path = fill_path
        self.expected_paths_method = expected_paths_method

    def _check_init(self, items: list[tuple[Any, str]]):
        for value, nm in items:
            if value is None:
                raise RuntimeError(f"{nm} is not initialized")

    @abstractmethod
    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Returns the names of the features generated by this transformer."""
        pass

    def set_data(
        self,
        x: str = None,
        y: str = None,
        t: str = None,
        duration: str = None,
        dispersion: str = None,
        aoi: str = None,
        path_pk: list[str] = None,
        pk: list[str] = None,
        expected_paths: dict[str, pd.DataFrame] = None,
        fill_path: pd.DataFrame = None,
        expected_paths_method: str = "mean",
        warn: bool = True,
        return_df: bool = True,
    ):
        self.x = x
        self.y = y
        self.t = t
        self.duration = duration
        self.dispersion = dispersion
        self.path_pk = path_pk
        self.pk = pk
        self.aoi = aoi
        self.warn = warn
        self.return_df = return_df
        self.expected_paths = expected_paths
        self.fill_path = fill_path
        self.expected_paths_method = expected_paths_method

    def fit(self, X: pd.DataFrame, y=None):
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame | np.ndarray:
        return X if self.return_df else X.values


[docs] class Extractor(BaseEstimator, TransformerMixin): """Meta Transformer that encapsulates the logic of feature extraction, providing ``fit``/``transform`` API. Args: features: List of feature transformers to use. x: X coordinate column name. y: Y coordinate column name. t: timeseries coordinate column name. duration: fixation duration column name. dispersion: fixation dispersion column name. aoi: AOI column name. path_pk: list of columns by which to calculate expected path. pk: list of columns to use as primary key. expected_paths_method: the method to calculate expected path. extra: used in combination with ``aggr_extra``. List of columns of input dataframe (on ``transform``) to aggregate alongside with other feature transformers, using aggregation function ``aggr_extra``. aggr_extra: aggregation function (pandas str or custom callable) to apply with ``extra`` argument. warn: whether to enable warnings. leave_pk: if True, then input ``pk`` columns are present in output dataframe (after ``transform``). return_df: if True, then pandas DataFrame is returned, else np.ndarray. """ def __init__( self, features: list[BaseTransformer] = None, x: str = None, y: str = None, t: str = None, duration: str = None, dispersion: str = None, aoi: str = None, path_pk: list[str] = None, pk: list[str] = None, expected_paths_method: str = "mean", extra: list[str] = None, aggr_extra: Callable | str = None, warn: bool = True, leave_pk: bool = False, return_df: bool = True, ): self.features = features self.x = x self.y = y self.t = t self.duration = duration self.dispersion = dispersion self.aoi = aoi self.path_pk = path_pk self.pk = pk self.expected_paths_method = expected_paths_method self.extra = extra self.aggr_extra = aggr_extra self.warn = warn self.leave_pk = leave_pk self.return_df = return_df self.is_fitted = False self.feature_names_in_ = None def get_feature_names_out(self, input_features=None) -> list[str]: if self.features is None: return [] feature_names = [] for feature in self.features: feature_names.extend(feature.get_feature_names_out()) return feature_names def _process_input(self, X: pd.DataFrame, y=None): if self.pk is not None and X[self.pk].isnull().values.any(): raise ValueError("Found missing values in pk.") elif X.isnull().values.any(): groups: list[str, pd.DataFrame] = _split_dataframe( X, self.pk ) # split by pk for group_id, group_X in groups: if group_X.isnull().values.any() and self.warn: warnings.warn( f"Group {group_id} has missing values. Dropping them.", stacklevel=5, ) X = X.dropna() return X, y def fit(self, X: pd.DataFrame, y=None): X, y = self._process_input(X, y) self.is_fitted = True if self.features is not None: for feature in self.features: feature.set_data( x=self.x, y=self.y, t=self.t, duration=self.duration, dispersion=self.dispersion, aoi=self.aoi, path_pk=self.path_pk, pk=self.pk, expected_paths_method=self.expected_paths_method, warn=self.warn, return_df=self.return_df, ) feature.fit(X) # This cannot be done in __init__. If transformer uses AOI, # it needs to be fitted first to know AOI values and therefore # determine feature names. # TODO: rename AOI features? self.feature_names_in_ = self.get_feature_names_out() return self def transform(self, X: pd.DataFrame) -> pd.DataFrame | np.ndarray: if not self.is_fitted: raise RuntimeError("Class is not fitted") if self.features is None: return X if self.return_df else X.values gathered_features = [] data_df: pd.DataFrame = X for feature in tqdm(self.features): gathered_features.append(feature.transform(data_df)) if self.extra is not None: columns = self.pk + [col for col in self.extra if col not in self.pk] extra_df = data_df[columns].groupby(self.pk).apply(self.aggr_extra) extra_df.index = [_get_id(index) for index in extra_df.index] gathered_features.append(extra_df) features_df = pd.concat(gathered_features, axis=1) if self.leave_pk: index = features_df.index.values index_as_cols = [_get_objs(id_) for id_ in index] for index_i in range(len(self.pk)): features_df[self.pk[index_i]] = [ objs[index_i] for objs in index_as_cols ] return features_df if self.return_df else features_df.values