Source code for moonstone.filtering.basics_filtering

import logging
from typing import List, Union

import pandas as pd

from moonstone.filtering.base import (
    BothAxisFiltering, CountsFiltering
)

logger = logging.getLogger(__name__)


[docs]class NoCountsFiltering(BothAxisFiltering, CountsFiltering): """ Remove rows (default) or columns with no counts. """
[docs] def filter(self) -> pd.DataFrame: indices = self.df.sum(axis=self.axis) != 0.0 if self.axis == 1: return self.df.loc[indices] return self.df.loc[:, indices]
[docs]class NamesFiltering(BothAxisFiltering): """ Filtering based on row (default) or column names. """
[docs] def __init__(self, dataframe: pd.DataFrame, names: List[str], axis: int = 0, keep: bool = True): """ :param names: list of row or column names :param axis: axis to apply filtering (index (0) or columns(1)) :param keep: keep column (discard them if set to False) """ self.names = list(names) self.keep = keep self._log_action() super().__init__(dataframe, axis=axis)
def _log_action(self): if self.keep: logger.debug(f"Selecting {self.names} from dataframe...") else: logger.debug(f"Removing {self.names} from dataframe...") def _validate_parameters(self): if isinstance(self.df.index, pd.MultiIndex): error_message = f"{self.__class__.__name__} does not support filtering on MultiIndex dataframes." + \ " You might want to use moonstone.filtering.TaxonomyNamesFiltering instead." raise TypeError(error_message) def _select_names(self): if self.axis == 0: return self.df.loc[self.names, :] return self.df.loc[:, self.names] def _exclude_names(self): return self.df.drop(self.names, axis=self.axis)
[docs] def filter(self) -> pd.DataFrame: # intersection of names to remove or keep and index's/columns' names old_names = self.names if self.axis == 0: self.names = list(self.df.index.intersection(self.names)) else: self.names = list(self.df.columns.intersection(self.names)) if len(old_names) - len(self.names) > 0: list_names_not_found = list(set(old_names).difference(set(self.names))) list_names_not_found.sort() logger.info(f"{list_names_not_found}: {len(old_names) - len(self.names)} \ name(s) not found in the dataframe.") if self.keep: return self._select_names() else: return self._exclude_names()
[docs]class NaNPercentageFiltering(BothAxisFiltering): """ Remove rows (default) or columns with a percentage of NaN values above a given percentage. """
[docs] def __init__(self, dataframe: pd.DataFrame, percentage: Union[int, float] = 80, axis: int = 0): """ :param percentage: maximum percentage of NaN values allowed (between 0 and 100) :param axis: axis to apply filtering (index (0) or columns(1)) """ self.percentage_of_nan_allowed = percentage super().__init__(dataframe, axis=axis)
[docs] def filter(self) -> pd.DataFrame: thresh = self.df.shape[1-self.axis] - self.df.shape[1-self.axis] * (self.percentage_of_nan_allowed/100) return self.df.dropna(axis=self.axis, thresh=thresh)
[docs]class NumberOfDifferentValuesFiltering(BothAxisFiltering): """ Filtering of rows (default) or columns based on the number of different (unique) values they hold. """
[docs] def __init__(self, dataframe: pd.DataFrame, min: int = None, max: int = None, na: bool = False, axis: int = 0): """ :param min: minimum number of different values accepted :param max: maximum number of different values accepted :param na: NaN values counted as a different value or not :param axis: axis to apply filtering (index (0) or columns(1)) """ if min is None and max is None: logger.warning("No min or max specified.") if min is None: self.min_number_values = 0 else: self.min_number_values = min if max is None: self.max_number_values = float('inf') else: self.max_number_values = max self.na = na super().__init__(dataframe, axis=axis)
[docs] def filter(self) -> pd.DataFrame: new_df = self.df if self.axis == 0: for row in self.df.index: x = pd.Series(new_df.loc[row].unique()) if not self.na: x = x.dropna() if len(x) < self.min_number_values or len(x) > self.max_number_values: new_df.drop(row, inplace=True, axis=0) logger.info("%s/%s rows dropped", new_df.shape[0], self.df.shape[0]) return new_df for col in self.df.columns: x = pd.Series(new_df[col].unique()) if not self.na: x = x.dropna() if len(x) < self.min_number_values or len(x) > self.max_number_values: new_df.drop(col, inplace=True, axis=1) logger.info("%s/%s columns dropped", new_df.shape[1], self.df.shape[1]) return new_df