Source code for eyefeatures.preprocessing.blinks_extraction

import numpy as np
import pandas as pd
from scipy import stats
from scipy.signal import find_peaks, savgol_filter


# =========================== BLINKS DETECTION ===========================
# Helper function
def _interpolate_nans(
    array: np.ndarray, timestamps: np.ndarray, gap_dur: float = np.inf
) -> np.ndarray:
    """Function finds sequences of NaN values, selects ones with
    duration <= 'gap_dur' and linearly interpolates them.

    Args:
        array: array with NaNs.
        timestamps: timestamps of array.
        gap_dur: threshold gap duration.

    Returns:
        array with interpolated gaps.
    """
    assert len(array.shape) == 1, "Only 1D array is allowed."
    assert (
        array.shape == timestamps.shape
    ), "'array' and 'timestamps' must correspond in shape."
    assert gap_dur > 0, "Gap duration must be positive."

    # Find index for nans where gaps are longer than 'gap_dur' samples
    mask = np.isnan(array)

    # If there are no nans, return
    if not np.any(mask):
        return array

    # Find onsets and offsets of gaps
    d = np.diff(np.concatenate((np.array([0]), mask * 1, np.array([0]))))
    onsets = np.where(d == 1)[0]
    offsets = np.where(d == -1)[0]

    # Decrease offsets come too late by -1
    if np.any(offsets >= len(array)):
        idx = np.where(offsets >= len(array))[0][0]
        offsets[idx] = offsets[idx] - 1

    dur = timestamps[offsets] - timestamps[onsets]

    # If the gaps are longer than 'gaps', replace temporarily with other values
    for i, on in enumerate(onsets):
        if dur[i] > gap_dur:
            array[onsets[i] : offsets[i]] = -1000

    # New is-nan mask after 'gaps' grasp
    mask = np.isnan(array)
    imask = ~mask
    array[mask] = np.interp(mask.nonzero()[0], imask.nonzero()[0], array[imask])

    # Put nans back
    array[array == -1000] = np.nan

    return array


# Helper function
def _nearest_odd_integer(x: float) -> int:
    """Method computes nearest odd integer to 'x'."""
    return int(2 * np.floor(np.abs(x) / 2) + 1) * np.sign(x)


# Helper function
def _mask2bounds(mask: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """Method constructs onset and offset arrays of indices based on mask.
    Onset - starting index of segment (segment is continuous block of ones),
    offset - ending index of blink.

    Args:
        mask: boolean array.

    Returns:
        [onsets, offsets].
    """

    # Find segments
    d = np.diff(np.hstack((0, mask, 0)))
    onsets = np.where(d == 1)[0]
    offsets = np.where(d == -1)[0] - 1

    # Match onsets with offsets
    if len(offsets) > len(onsets):
        if onsets[0] > offsets[0]:
            offsets = offsets[1:]
        else:
            offsets = offsets[:-1]
    elif len(offsets) < len(onsets):
        if onsets[0] > offsets[0]:
            onsets = onsets[1:]
        else:
            onsets = onsets[:-1]

    return onsets, offsets


# Helper function
def _merge_blinks(
    blink_onsets: list | np.ndarray,
    blink_offsets: list | np.ndarray,
    min_dur: int,
    min_separation: int,
    blink_properties: list | np.ndarray = None,
) -> list[list]:
    """Method merges blinks given onsets and offsets, also collapses too short ones.

    Args:
        blink_onsets: onsets of blinks, ms.
        blink_offsets: offsets of blinks, ms.
        min_dur: min duration of blink, ms.
        min_separation: min duration between blinks, ms.
        blink_properties: array of lists, properties of blinks.

    Returns:
        array of triples (onset, offset, duration).
    """
    have_properties = blink_properties is not None

    # Merge blink candidate close together, and remove short, isolated ones
    new_onsets = []
    new_offsets = []
    new_properties = []
    change_onset = True
    temp_onset = None

    for i, onset in enumerate(blink_onsets):
        if change_onset:
            temp_onset = blink_onsets[i]

        if i < len(blink_onsets) - 1:
            if (blink_onsets[i + 1] - blink_offsets[i]) < min_separation:
                change_onset = False
            else:
                change_onset = True

                # Remove blink with too short duration
                if (blink_offsets[i] - temp_onset) < min_dur:
                    continue

                new_onsets.append(temp_onset)
                new_offsets.append(blink_offsets[i])
                if have_properties:
                    new_properties.append(blink_properties[i, :])
        else:
            # Remove blink with too short duration
            if (blink_offsets[i] - temp_onset) < min_dur:
                continue

            new_onsets.append(temp_onset)
            new_offsets.append(blink_offsets[i])
            if have_properties:
                new_properties.append(blink_properties[i, :])

    # Compute durations and convert to array
    blinks = []
    for i in range(len(new_onsets)):
        dur = new_offsets[i] - new_onsets[i]
        blink_data = [new_onsets[i], new_offsets[i], dur]
        if have_properties:
            blink_data += list(new_properties[i])
        blinks.append(blink_data)

    return blinks


# Helper function
def _indices_to_values(
    onsets: np.ndarray, offsets: np.ndarray, timestamps: np.ndarray, Fs: int = None
) -> tuple[np.ndarray, np.ndarray]:
    """Method converts index-based onsets/offsets to
    timestamp-based onsets/offsets.

    Args:
        onsets: indexes of starting blink indexes.
        offsets: indexes of ending blink indexes.
        timestamps: data recording timestamps.
        Fs: sample rate of eye tracker, Hz.

    Returns:
        onsets, offsets.
    """

    # Convert onsets/offsets to ms
    blinks = []
    for onset, offset in zip(onsets, offsets, strict=False):
        blinks.append([timestamps[onset], timestamps[offset]])

    if Fs is not None:
        assert Fs > 0
        # Remove blinks with on-, or offsets that happened in a period of missing data
        # (i.e., where samples are completely lost, for some reason)
        idx = np.where(
            np.diff(timestamps) > (2 * 1 / Fs * 1000)
        )  # Missing data where deltaT > 2 * 1/Fs

        for i, blink in enumerate(blinks):
            for idx_k in idx[0]:
                if np.logical_and(
                    blink[0] >= timestamps[idx_k], blink[0] <= timestamps[idx_k + 1]
                ) or np.logical_and(
                    blink[1] >= timestamps[idx_k], blink[1] <= timestamps[idx_k + 1]
                ):
                    blinks.pop(i)
                    break

    onsets = np.array([b[0] for b in blinks])
    offsets = np.array([b[1] for b in blinks])

    return onsets, offsets


# Helper function
def _apply_moving_average(
    pupil_signal: np.ndarray,
    timestamps: np.ndarray,
    is_na: np.ndarray,
    max_window_size: float,
) -> np.ndarray:
    """Method applies moving average for pupillometry data. Nan values are remained.

    Args:
        pupil_signal: sizes of pupil.
        timestamps: data recording timestamps.
        is_na: boolean array representing whether pupil is nan or not.
        max_window_size: maximum size of smoothing window, in milliseconds.

    Returns:
        smoothed pupil sizes.
    """

    if len(pupil_signal) <= 2:
        return pupil_signal
    assert len(pupil_signal) == len(timestamps) == len(is_na)
    assert max_window_size > 0.0

    n = len(pupil_signal)
    tmp = pupil_signal.copy()
    tmp = np.where(is_na, 0, tmp)
    csum_size = np.cumsum(tmp)
    csum_time = timestamps - np.roll(timestamps, shift=1)
    csum_time[0] = 0
    csum_time = np.cumsum(csum_time)

    smoothed_sizes = np.zeros((n,))
    smoothed_sizes[0] = pupil_signal[0]

    window_end = -1
    for i in range(1, n):
        if is_na[i]:
            window_end = -1
            smoothed_sizes[i] = np.nan
            continue

        window_end = max([window_end - 1, i + 1])
        while (
            window_end < n
            and not is_na[window_end]
            and (csum_time[window_end] - csum_time[i - 1]) <= max_window_size
        ):
            window_end += 1
        smoothed_sizes[i] = (csum_size[window_end - 1] - csum_size[i - 1]) / (
            window_end - i
        )

    return smoothed_sizes


# Blinks Detector




# Blinks Detector




# Blinks Detector