Source code for eyefeatures.preprocessing.aoi_extraction

import numpy as np
import pandas as pd
from scipy.ndimage import maximum_filter, sobel
from sklearn.base import BaseEstimator, ClusterMixin, TransformerMixin

from eyefeatures.features.measures import ShannonEntropy
from eyefeatures.preprocessing.base import BaseAOIPreprocessor
from eyefeatures.utils import _split_dataframe


# ======== AOI PREPROCESSORS ========
[docs] class ShapeBased(BaseAOIPreprocessor): """Defines AOI using the specified shapes. Args: x: x coordinate of fixation. y: y coordinate of fixation. aoi_name: name of AOI column. pk: list of column names used to split pd.DataFrame. shapes: list of shapes (list of tuple lists). Parameters for shape:\n \n 0: 'r', 'c', 'e': rectangle, circle, ellipse\n For the rectangle:\n 1: coordinates of the lower left corner of the rectangle.\n 2: coordinates of the upper right corner of the rectangle.\n For the circle:\n 1: coordinates of the center of the circle.\n 2: radius of the circle.\n For the ellipse:\n :math:`\\frac{((x - x')\\cos(\\alpha) + (y - y')\\sin(\\alpha))^2}{a^2}` :math:`+ \\frac{(-(x - x')\\sin(\\alpha) + (y - y')\\cos(\\alpha))^2}{b^2} = c` 1: coordinates of the center of the ellipse :math:`(x', y')`.\n 2: "a" in the ellipse equation\n 3: "b" in the ellipse equation\n 4: "c" in the ellipse equation\n 5: angle of inclination of th ellipse in radians (:math:`\\alpha`)\n """ def __init__( self, x: str = None, y: str = None, shapes: list = None, aoi_name: str = "AOI", pk: list[str] = None, ): super().__init__(x=x, y=y, t=None, aoi=aoi_name, pk=None) self.shapes = shapes self.instance = pk def _check_params(self): m = "ShapeBased" assert self.x is not None, self._err_no_field(m, "x") assert self.y is not None, self._err_no_field(m, "y") assert self.shapes is not None, self._err_no_field(m, "shapes") def _is_inside_of_fig(self, X: pd.DataFrame, shape_id): ind = 0 for shape in self.shapes[shape_id]: if shape[0] == "r": # Rectangle X.loc[ (X[self.x] <= shape[2][0]) & (X[self.x] >= shape[1][0]) & (X[self.y] <= shape[2][1]) & (X[self.y] >= shape[1][1]), self.aoi, ] = f"aoi_{ind}" elif shape[0] == "c": # Circle X["length"] = X.apply( lambda z: np.linalg.norm( np.array((z[self.x], z[self.y])) - np.array(shape[1]) ), axis=1, ) X.loc[X["length"] <= shape[2], self.aoi] = f"aoi_{ind}" elif shape[0] == "e": # Ellipse X.loc[ ( (X[self.x] - shape[1][0]) * np.cos(shape[5]) + (X[self.y] - shape[1][1]) * np.sin(shape[5]) ) ** 2 / (shape[2] ** 2) + ( -(X[self.x] - shape[1][0]) * np.sin(shape[5]) + (X[self.y] - shape[1][1]) * np.cos(shape[5]) ) / (shape[3] ** 2) <= shape[4], self.aoi, ] = f"aoi_{ind}" ind += 1 return X[self.aoi] def _preprocess(self, X: pd.DataFrame) -> pd.DataFrame: assert X.shape[0] != 0, "Error: there are no points" # if self.instance is not None: # X.drop(columns=self.instance, inplace=True) # X[self.aoi] = None to_concat = [] flag = False if len(self.shapes) != 1: flag = True shape_id = 0 if self.instance is None: X[self.aoi] = self._is_inside_of_fig(X, shape_id) fixations = X else: instances: list[str, pd.DataFrame] = _split_dataframe( X, self.instance, encode=False ) assert (not flag) or len(instances) == len(self.shapes), "Not enough shapes" for instance_ids, instance_X in instances: instance_X[self.aoi] = self._is_inside_of_fig(instance_X, shape_id) to_concat.append(instance_X) if flag: shape_id += 1 fixations = pd.concat(to_concat, axis=0) return fixations
[docs] class ThresholdBased(BaseAOIPreprocessor): """Defines the AOI for each fixation using density maximum and Kmeans. Finds local maximum, pre-threshold it, and uses it as a center of aoi. Args: x: x coordinate of fixation. y: y coordinate of fixation. window_size: size of search window. threshold: threshold density. pk: list of column names used to split pd.DataFrame. aoi_name: name of AOI column. algorithm_type: type of clustering algorithm to use. threshold_dist: maximum allowed distance between fixations in single AOI. """ def __init__( self, x: str = None, y: str = None, window_size: int = None, threshold: float = None, pk: list[str] = None, aoi_name: str = None, algorithm_type: str = "kmeans", threshold_dist: float = None, ): super().__init__(x=x, y=y, t=None, aoi=aoi_name, pk=pk) self.window_size = window_size self.threshold = threshold self.algorithm_type = algorithm_type self.threshold_dist = threshold_dist def _check_params(self): m = "ThresholdBased" assert self.x is not None, self._err_no_field(m, "x") assert self.y is not None, self._err_no_field(m, "y") assert self.window_size is not None, self._err_no_field(m, "window_size") assert self.threshold is not None, self._err_no_field(m, "threshold") assert self.window_size > 0, "Error: window size must be greater than zero" assert ( self.threshold >= 0 ), "Error: threshold must be greater than zero or equal to zero" assert self.algorithm_type in [ "kmeans", "basic", ], "Error: only 'kmeans' or 'basic' are supported" if self.algorithm_type == "basic": assert self.threshold_dist is not None, self._err_no_field( m, "threshold_dist" ) def _preprocess(self, X: pd.DataFrame) -> pd.DataFrame: assert X.shape[0] != 0, "Error: there are no points" if self.pk is not None: X.drop(columns=self.pk, inplace=True) aoi_list = [] density, X_grid, Y_grid = self._get_fixation_density(X) # For each sliding window (window_size x window_size) finds maximum density mx = maximum_filter(density, size=(self.window_size, self.window_size)) # Filter local maxima (0 if maximus less than threshold) loc_max_matrix = np.where((mx == density) & (mx >= self.threshold), density, 0) loc_max_coord = super()._find_local_max_coordinates(loc_max_matrix) assert ( loc_max_coord.shape[0] != 0 ), "Error: Can't find the maximum with such parameters" aoi_counts: dict[str, int] = {} # Dict[aoi name] = count of points in aoi aoi_points: dict[str, list[tuple[float, float]]] = ( {} ) # Dict[aoi name] = list of points axis_x = X_grid.T[0] axis_y = Y_grid[0] centers: dict[str, tuple[float, float]] = {} # Dict with the centers of aoi for i in range(loc_max_coord.shape[0]): # Initial centers for each AOI centers[f"aoi_{i}"] = ( X_grid[loc_max_coord[i][0]][0], Y_grid[loc_max_coord[i][1]][0], ) aoi_points[f"aoi_{i}"] = [centers[f"aoi_{i}"]] for index, row in X.iterrows(): min_dist = np.inf min_dist_aoi = None x_coord = min(np.searchsorted(axis_x, row[self.x]), 99) y_coord = min(np.searchsorted(axis_y, row[self.y]), 99) # Find the nearst aoi for key in centers.keys(): if self.algorithm_type == "kmeans": dist = np.linalg.norm( np.array([row[self.x], row[self.y]]) - np.array(centers[key]) ) if dist < min_dist: min_dist = dist min_dist_aoi = key if self.algorithm_type == "basic": length = np.inf for point in aoi_points[key]: # Find minimal distance between fixation and points in aoi dist = np.linalg.norm( np.array([row[self.x], row[self.y]]) - np.array(point) ) length = min(length, dist) if dist >= self.threshold_dist: length = np.inf break if min_dist > length: min_dist = length min_dist_aoi = key # Add fixation to the aoi (basic algorithm) if ( self.algorithm_type == "basic" and min_dist_aoi is not None and density[x_coord][y_coord] > self.threshold ): aoi_points[min_dist_aoi].append((row[self.x], row[self.y])) aoi_counts[min_dist_aoi] = len(aoi_points[min_dist_aoi]) - 1 # Add fixation to the aoi (kmeans algorithm) and compute the new center if self.algorithm_type == "kmeans" and min_dist_aoi is not None: if ( min_dist_aoi in aoi_counts.keys() and density[x_coord][y_coord] > self.threshold ): aoi_counts[min_dist_aoi] += 1 new_center_x = ( centers[min_dist_aoi][0] * (aoi_counts[min_dist_aoi] - 1) / aoi_counts[min_dist_aoi] + row[self.x] / aoi_counts[min_dist_aoi] ) new_center_y = ( centers[min_dist_aoi][1] * (aoi_counts[min_dist_aoi] - 1) / aoi_counts[min_dist_aoi] + row[self.y] / aoi_counts[min_dist_aoi] ) centers[min_dist_aoi] = (new_center_x, new_center_y) elif ( min_dist_aoi not in aoi_counts.keys() and density[x_coord][y_coord] > self.threshold ): aoi_counts[min_dist_aoi] = 1 new_center_x = (centers[min_dist_aoi][0] + row[self.x]) / 2 new_center_y = (centers[min_dist_aoi][1] + row[self.y]) / 2 centers[min_dist_aoi] = (new_center_x, new_center_y) elif density[x_coord][y_coord] <= self.threshold: min_dist_aoi = None aoi_list.append(min_dist_aoi) X[self.aoi] = aoi_list return X
[docs] class GradientBased(BaseAOIPreprocessor): """Defines the AOI for each fixation using a gradient-based algorithm. Finds the local maximum, pre-threshold it, and uses it as a center of aoi. After that, uses the Sobel operator to compute the gradient magnitude for each point. Next, defines the queue of areas of interest. Algorithm of aoi defining:\n * Gets the point from the queue. It is a center\n * Looks at the points near the center\n * Tries to find the point with defined aoi and maximum gradient magnitude\n * Adds center to this aoi\n * Repeats for all points in the matrix\n Args: x: X coordinate of fixation. y: Y coordinate of fixation. window_size: size of search window. threshold: threshold density. pk: list of column names used to split pd.DataFrame. aoi_name: name of AOI column. """ def __init__( self, x: str = None, y: str = None, window_size: int = None, threshold: float = None, pk: list[str] = None, aoi_name: str = None, ): super().__init__(x=x, y=y, t=None, aoi=aoi_name, pk=pk) self.window_size = window_size self.threshold = threshold def _check_params(self): m = "GradientBased" assert self.x is not None, self._err_no_field(m, "x") assert self.y is not None, self._err_no_field(m, "y") assert self.window_size is not None, self._err_no_field(m, "window_size") assert self.threshold is not None, self._err_no_field(m, "threshold") assert self.window_size > 0, "Error: window size must be greater than zero" assert ( self.threshold >= 0 ), "Error: threshold must be greater than zero or equal to zero" def _preprocess(self, X: pd.DataFrame) -> pd.DataFrame: assert X.shape[0] != 0, "Error: there are no points" if self.pk is not None: X.drop(columns=self.pk, inplace=True) aoi_list = [] density, X_grid, Y_grid = super()._get_fixation_density(X) # For each sliding window (window_size x window_size) finds maximum density mx = maximum_filter(density, size=(self.window_size, self.window_size)) # Filter local maxima (0 if maximus less than threshold) loc_max_matrix = np.where((mx == density) & (mx >= self.threshold), density, 0) loc_max_coord = super()._find_local_max_coordinates(loc_max_matrix) assert ( loc_max_coord.shape[0] != 0 ), "Error: Can't find the maximum with such parameters" axis_x = X_grid.T[0] axis_y = Y_grid[0] centers: dict[str, tuple[float, float]] = {} # Dict with the centers of aoi aoi_matrix = np.zeros((density.shape[0], density.shape[1]), dtype=int) # Compute the gradient magnitude horizontal_sobel = sobel(density, axis=0) vertical_sobel = sobel(density, axis=1) magnitude_sobel = np.sqrt(horizontal_sobel**2 + vertical_sobel**2) magnitude_sobel = np.pad( magnitude_sobel, 2, mode="constant", constant_values=-1 ) queue_of_centers: list[list[tuple[int, int]]] = ( [] ) # List of points to add for each aoi for i in range(loc_max_coord.shape[0]): # Initial centers for each AOI centers[f"aoi_{i}"] = ( X_grid[loc_max_coord[i][0]][0], Y_grid[loc_max_coord[i][1]][0], ) queue_of_centers.append([]) for j in range(-1, 2): for k in range(-1, 2): if ( 0 <= loc_max_coord[i][0] + j < density.shape[0] and 0 <= loc_max_coord[i][1] + k < density.shape[0] and not (j == 0 and k == 0) and aoi_matrix[loc_max_coord[i][0] + j][loc_max_coord[i][1] + k] == 0 ): queue_of_centers[-1].append( (loc_max_coord[i][0] + j, loc_max_coord[i][1] + k) ) aoi_matrix[loc_max_coord[i][0]][loc_max_coord[i][1]] = i + 1 ind = 0 while any(len(x) > 0 for x in queue_of_centers): # If the list of points to add for particular aoi is not empty, # then try to add them to aoi, else this aoi is built if len(queue_of_centers[ind]) > 0: x_coord, y_coord = queue_of_centers[ind].pop( 0 ) # Get the point without aoi. It is a center of window if aoi_matrix[x_coord][y_coord] != 0: # are all the fixation covered? continue # Add 2 due to padding window_magnitude = magnitude_sobel[ x_coord + 1 : x_coord + 4, y_coord + 1 : y_coord + 4 ] max_magnitude = -1 aoi_to_add = None # Find point in window with max gradient magnitude and its aoi for j in range(-1, 2): for k in range(-1, 2): if ( 0 <= x_coord + j < density.shape[0] and 0 <= y_coord + k < density.shape[0] and not (j == 0 and k == 0) ): # If non-center point has greater magnitude and aoi, # then we take its aoi if ( aoi_matrix[x_coord + j][y_coord + k] != 0 and max_magnitude <= window_magnitude[1 + j][1 + k] ): aoi_to_add = aoi_matrix[x_coord + j][y_coord + k] max_magnitude = window_magnitude[1 + j][1 + k] # If non-center point has no aoi, add it to queue elif ( aoi_matrix[x_coord + j][y_coord + k] == 0 and (x_coord + j, y_coord + k) not in queue_of_centers[ind] ): # and magnitude[1+j][1+k] >= gradient_eps: queue_of_centers[ind].append((x_coord + j, y_coord + k)) aoi_matrix[x_coord][y_coord] = aoi_to_add # Match points from queue with best AOI in window on density ind = (ind + 1) % len(queue_of_centers) # Match fixations and aoi in aoi matrix for index, row in X.iterrows(): x_coord = min(np.searchsorted(axis_x, row[self.x]), 99) y_coord = min(np.searchsorted(axis_y, row[self.y]), 99) if aoi_matrix[x_coord][y_coord] == 0: aoi_list.append(None) else: aoi_list.append(f"aoi_{aoi_matrix[x_coord][y_coord]}") X[self.aoi] = aoi_list return X
[docs] class OverlapClustering(BaseAOIPreprocessor): """Defines the AOI for each fixation using the overlapping clustering algorithm. Args: x: X coordinate of fixation. y: Y coordinate of fixation. diameters: diameters of fixation. centers: centers of fixation. pk: list of column names used to split pd.DataFrame. aoi_name: name of AOI column. eps: additional length to sum of radius """ def __init__( self, x: str = None, y: str = None, diameters: str = None, centers: str = None, pk: list[str] = None, aoi_name: str = None, eps: float = 0.0, ): super().__init__(x=x, y=y, t=None, aoi=aoi_name, pk=pk) self.diameters = diameters self.centers = centers self.eps = eps def _check_params(self): m = "OverlapClustering" assert self.x is not None, self._err_no_field(m, "x") assert self.y is not None, self._err_no_field(m, "y") assert self.diameters is not None, self._err_no_field(m, "diameters") assert self.centers is not None, self._err_no_field(m, "centers") def _build_clusters(self, X: pd.DataFrame) -> pd.DataFrame: """First step of the overlapping clustering algorithm. Builds the clusters. If the fixation locates inside another one, then these fixations are in one aoi. """ X[self.aoi] = 0 cluster_id = 1 for index, row in X.iterrows(): if X.loc[index, X.columns == self.aoi].values[0] == 0: X.loc[index, X.columns == self.aoi] = cluster_id center = row[self.centers] diameter = row[self.diameters] # Calculate the distance between centers X["diff_center"] = X[self.centers].apply( lambda p: (p[0] - center[0]) ** 2 + (p[1] - center[1]) ** 2 ) X["diff_diam"] = abs(diameter - X[self.diameters]) / 2 # Add all fixation, which are fixation_in_cluster = X[X["diff_center"] <= X["diff_diam"]].index X.loc[fixation_in_cluster, self.aoi] = cluster_id cluster_id += 1 return X def _merge_clusters(self, X: pd.DataFrame) -> pd.DataFrame: """Second step of the overlapping clustering algorithm. Merges the clusters. Selects aoi with the most amount of fixation in itself. Creates the queue of fixation and starts the cycle of merging. """ used = [] while len(X[~X[self.aoi].isin(used)]) > 0: # Find the largest cluster(aoi) max_cluster = ( X[~X[self.aoi].isin(used)].groupby(self.aoi).count().idxmax().iloc[0] ) points = X[X[self.aoi] == max_cluster].index.values.tolist() ind = 0 end_ = len(points) used.append(max_cluster) while ind < end_: row = X.iloc[points[ind]] # Calculate the distance between centers X["length"] = X[self.centers].apply( lambda p: np.linalg.norm(p - row[self.centers]) ) # Merge areas of interest that intersect the max cluster to_merge = X[ ( X["length"] <= abs((X[self.diameters] + row[self.diameters]) / 2 + self.eps) ) & (~X[self.aoi].isin(used)) ][self.aoi].unique() add_fixations = X[ (X[self.aoi].isin(to_merge)) & (~X[self.aoi].isin(used)) ].index.values X.loc[X[self.aoi].isin(to_merge), (X.columns == self.aoi)] = max_cluster # Add fixation from those areas of interest to queue points.extend(add_fixations) ind += 1 end_ += len(add_fixations) return X def _preprocess(self, X: pd.DataFrame) -> pd.DataFrame: X.drop(columns=self.pk, inplace=True) X.reset_index(drop=True, inplace=True) copy_X = X.copy() copy_X = self._build_clusters(copy_X) copy_X = self._merge_clusters(copy_X) X[self.aoi] = copy_X[self.aoi] return X
# ======== EXTRACTOR FOR AOI CLASSES ======== class AOIExtractor(BaseEstimator, TransformerMixin): """Extractor of areas of interest. Selects the partition into zones of interest with the lowest entropy. Args: methods: list of aoi algorithms. x: X coordinate of fixation. y: Y coordinate of fixation. window_size: size of search window. threshold: threshold density. pk: list of column names used to split pd.DataFrame for scaling. instance_columns: names of columns used to split DataFrame for aoi. aoi_name: name of AOI column. show_best: if true, then return the best method for each instance """ def __init__( self, methods: list[BaseAOIPreprocessor], x: str, y: str, window_size: int = None, threshold: float = None, pk: list[str] = None, instance_columns: list[str] = None, aoi_name: str = None, show_best: bool = False, ): self.x = x self.y = y self.methods = methods self.window_size = window_size self.threshold = threshold self.pk = pk self.instance_columns = instance_columns self.aoi = aoi_name self.show_best = show_best def fit(self, X: pd.DataFrame, y=None): for method in self.methods: method.x = self.x method.y = self.y if self.window_size is not None: method.window_size = self.window_size if self.threshold is not None: method.threshold = self.threshold # method.pk = self.pk method.aoi = self.aoi if not isinstance(method, ClusterMixin): method.fit(X) return self def transform(self, X: pd.DataFrame) -> pd.DataFrame: if self.methods is None: return X data_df: pd.DataFrame = X[[self.x, self.y]] if self.pk is not None: data_df = pd.concat([data_df, X[self.pk]], axis=1) if self.instance_columns is not None: to_add = [x for x in self.instance_columns if x not in self.pk] data_df = pd.concat([data_df, X[to_add]], axis=1) else: self.instance_columns = self.pk fixations = None instances: list[str, pd.DataFrame] = _split_dataframe( X, self.instance_columns, encode=False ) shapes_id = 0 # For ShapeBased # Entropy for selecting the best method entropy_transformer = ShannonEntropy(aoi=self.aoi, pk=self.instance_columns) # Extract areas of interest for each instance for instance_ids, instance_X in instances: min_entropy = np.inf fixations_with_aoi = None # Select best aoi extraction for method in self.methods: copy_x = None copy_y = None to_transform = None groups: list[str, pd.DataFrame] = _split_dataframe( instance_X, self.pk, encode=False ) # Map points into (100, 100) matrix and build kde for groups for group_ids, group_X in groups: if copy_x is None: copy_x = group_X[self.x] copy_y = group_X[self.y] else: copy_x = pd.concat( [copy_x, group_X[self.x]], ignore_index=True, axis=0 ) copy_y = pd.concat( [copy_y, group_X[self.y]], ignore_index=True, axis=0 ) group_X[self.x] -= group_X[self.x].mean() group_X[self.y] -= group_X[self.y].mean() if to_transform is None: to_transform = group_X else: to_transform = pd.concat( [to_transform, group_X], ignore_index=True, axis=0 ) if isinstance(method, ClusterMixin): cur_aoi = method.fit_predict(to_transform[[self.x, self.y]]) cur_fixations = pd.concat( [to_transform, pd.Series(cur_aoi, name=self.aoi)], axis=1 ) elif isinstance(method, ShapeBased): save_shapes = method.shapes assert (len(save_shapes) == 1) or ( len(save_shapes) != len(instances) ), "Not enough shapes" method.shapes = [ save_shapes[shapes_id], ] method.instance = None cur_fixations = method.transform(to_transform) method.shapes = save_shapes if len(save_shapes) != 1: shapes_id += 1 else: cur_fixations = method.transform(to_transform) all_areas = np.unique( [el for el in cur_fixations[self.aoi].values if el is not None] ) areas_names = [f"aoi_{i}" for i in range(len(all_areas))] map_areas = dict(zip(all_areas, areas_names, strict=False)) cur_fixations[self.aoi] = cur_fixations[self.aoi].map(map_areas) entropy = entropy_transformer.transform(cur_fixations)[ "entropy" ].values[0] if min_entropy > entropy: min_entropy = entropy fixations_with_aoi = cur_fixations fixations_with_aoi[self.x] = copy_x fixations_with_aoi[self.y] = copy_y if self.show_best: fixations_with_aoi["best_method"] = method.__class__.__name__ if fixations is None: fixations = fixations_with_aoi else: fixations = pd.concat( [fixations, fixations_with_aoi], ignore_index=True, axis=0 ) return fixations
[docs] class AOIMatcher(BaseEstimator, TransformerMixin): """Matches AOI in the dataset. Args: x: X coordinate column name. y: Y coordinate column name. pk: list of column names used to split pd.DataFrame for scaling. instance_columns: list of column names used to split pd.DataFrame into the similar instances for aoi extraction. aoi: name of AOI column. n_aoi: count of aoi in the group.\n 0: any number the areas of interest.\n integer > 0: count of the areas of interest. """ def __init__( self, x: str, y: str, pk: list[str] = None, instance_columns: list[str] = None, aoi: str = None, n_aoi: int = 0, ): self.x = x self.y = y self.pk = pk self.instance_columns = instance_columns self.aoi = aoi self.n_aoi = n_aoi def fit(self, X: pd.DataFrame, y=None): return self def transform(self, X: pd.DataFrame) -> pd.DataFrame: data_df: pd.DataFrame = X.copy() fixations = None instances: list[str, pd.DataFrame] = _split_dataframe( data_df, self.instance_columns, encode=False ) prev_pattern = {} for instance_ids, instance_X in instances: copy_x = None copy_y = None groups: list[str, pd.DataFrame] = _split_dataframe( instance_X, self.pk, encode=False ) cur_fixations = None for group_ids, group_X in groups: if copy_x is None: copy_x = group_X[self.x] copy_y = group_X[self.y] else: copy_x = pd.concat( [copy_x, group_X[self.x]], ignore_index=True, axis=0 ) copy_y = pd.concat( [copy_y, group_X[self.y]], ignore_index=True, axis=0 ) group_X[self.x] -= group_X[self.x].mean() group_X[self.y] -= group_X[self.y].mean() if cur_fixations is None: cur_fixations = group_X else: cur_fixations = pd.concat( [cur_fixations, group_X], ignore_index=True, axis=0 ) # === Correction of the AOI labels === # Make the new aoi labels all_areas = np.unique(cur_fixations[self.aoi].astype(str).values) if (self.n_aoi > 0) and (len(all_areas) > self.n_aoi): centers = [] for i in range(len(all_areas)): x = cur_fixations[(cur_fixations[self.aoi] == all_areas[i])][ self.x ].mean() y = cur_fixations[(cur_fixations[self.aoi] == all_areas[i])][ self.y ].mean() count = ( cur_fixations[(cur_fixations[self.aoi] == all_areas[i])] .count() .values[0] ) centers.append([all_areas[i], count, x, y, True]) count_of_aoi = len(all_areas) while count_of_aoi > self.n_aoi: min_dist = np.inf points_to_merge = [] for i in range(len(centers)): for j in range(i + 1, len(centers)): dist = np.linalg.norm( np.array(centers[i][2:]) - np.array(centers[j][2:]) ) if (dist < min_dist) and centers[j][-1]: min_dist = dist points_to_merge = [centers[i], centers[j]] cur_fixations.loc[ cur_fixations[self.aoi] == points_to_merge[1][0], cur_fixations.columns == self.aoi, ] = points_to_merge[0][0] for i in range(len(centers)): if centers[i][0] == points_to_merge[0][0]: centers[i][2] = ( (points_to_merge[0][2] * points_to_merge[0][1]) + (points_to_merge[1][2] * points_to_merge[1][1]) ) / (points_to_merge[0][1] + points_to_merge[1][1]) centers[i][3] = ( (points_to_merge[0][3] * points_to_merge[0][1]) + (points_to_merge[1][3] * points_to_merge[1][1]) ) / (points_to_merge[0][1] + points_to_merge[1][1]) centers[i][1] = ( points_to_merge[0][1] + points_to_merge[1][1] ) if centers[i][0] == points_to_merge[1][0]: centers[i][-1] = False count_of_aoi -= 1 all_areas = np.unique(cur_fixations[self.aoi].astype(str).values) areas_names = [f"aoi_{i}" for i in range(len(all_areas))] map_areas = dict(zip(all_areas, areas_names, strict=False)) cur_fixations[self.aoi] = cur_fixations[self.aoi].map(map_areas) used = [] to_zip = [] new_pattern = {} # Match labels with previous patterns for i in range(len(all_areas), 0, -1): if prev_pattern.get(i, 0) != 0: pattern = prev_pattern[i] # Compare areas # From Python 3.6, dict.items() order corresponds to # insertion order for key, value in pattern.items(): # if key not in used: x_max, y_max, x_min, y_min = ( value[0], value[1], value[2], value[3], ) intersection = -1 new_name = None for cur_area in areas_names: cur_x_max, cur_y_max, cur_x_min, cur_y_min = ( cur_fixations[cur_fixations[self.aoi] == cur_area][ self.x ].max(), cur_fixations[cur_fixations[self.aoi] == cur_area][ self.y ].max(), cur_fixations[cur_fixations[self.aoi] == cur_area][ self.x ].min(), cur_fixations[cur_fixations[self.aoi] == cur_area][ self.y ].min(), ) width = min(x_max, cur_x_max) - max(x_min, cur_x_min) height = min(y_max, cur_y_max) - max(y_min, cur_y_min) # Find aoi with the largest intersection if ( height > 0 and width > 0 and (cur_area not in used) and intersection <= width * height ): intersection = width * height new_name = cur_area used.append(new_name) to_zip.append(key) len_of_used = len(used) for j in range(len(areas_names) - len(used)): used.append(None) to_zip.append(areas_names[len_of_used + j]) for j in range(len(used)): if used[j] is None: for area in areas_names: if area not in used: used[j] = area # Match the remaining areas of interest for j in range(len(used)): if used[j] is None: for k in range(len(areas_names)): if areas_names[k] not in used: used[j] = areas_names[k] break cur_fixations[self.aoi] = cur_fixations[self.aoi].map( dict(zip(used, to_zip, strict=False)) ) break # Add new sample for area in areas_names: cur_x_max, cur_y_max, cur_x_min, cur_y_min = ( cur_fixations[cur_fixations[self.aoi] == area][self.x].max(), cur_fixations[cur_fixations[self.aoi] == area][self.y].max(), cur_fixations[cur_fixations[self.aoi] == area][self.x].min(), cur_fixations[cur_fixations[self.aoi] == area][self.y].min(), ) new_pattern[area] = [cur_x_max, cur_y_max, cur_x_min, cur_y_min] # === End of the correction === prev_pattern[len(np.unique(cur_fixations[self.aoi].astype(str).values))] = ( new_pattern ) cur_fixations[self.x] = copy_x cur_fixations[self.y] = copy_y if fixations is None: fixations = cur_fixations else: fixations = pd.concat( [fixations, cur_fixations], ignore_index=True, axis=0 ) return fixations