Source code for moonstone.filtering.mean_filtering

import logging
import pandas as pd
import plotly.graph_objects as go
import plotly.io
from plotly.subplots import make_subplots
from typing import Union

from moonstone.analysis.stats import FilteringStats
from moonstone.filtering.base import CountsFiltering

logger = logging.getLogger(__name__)


[docs]class MeanFiltering(CountsFiltering): """ Remove items with a mean read count below a given threshold. You can either give a mean read count threshold or the percentage of data that you wish to keep (the threshold will then be computed for you). """
[docs] def __init__( self, dataframe, threshold: float = None, percentage_to_keep: Union[int, float] = 90, ): """ :param threshold: mean read count threshold, when not specified the threshold is therefore computed based on percentage_to_keep :param percentage_to_keep: percentage of read you wish to keep, between 0 and 100, overridden if threshold is set """ super().__init__(dataframe) self.threshold = threshold if threshold is None: self.percentage_to_keep = percentage_to_keep
[docs] def compute_threshold_best_n_percent(self) -> float: """ method that computes a threshold based on the percentage of read to keep. This method is called in the method filter() when no threshold is given """ FS_instance = FilteringStats(self.df) self._items_dict, self._reads_dict = FilteringStats.by_mean(FS_instance) reads_dict_sort = sorted( self._reads_dict.items(), key=lambda x: x[0] ) # Sorting by threshold equate to reverse # sorting by value but a little quicker for i in reads_dict_sort: # i[0]:threshold; i[1]:remaining number of reads if (i[1] / self.raw_reads_number) * 100 > self.percentage_to_keep: threshold = float("%.1f" % i[0]) reads = int(i[1]) logger.info("Computing filtering mean read count threshold...") logger.info("Filtering mean read count threshold set to %.2f." % threshold) logger.info( "Retaining %.1f%% of the data results in %i retained reads." % (self.percentage_to_keep, reads) ) self.threshold = threshold return threshold
[docs] def filter(self) -> pd.DataFrame: if self.threshold is None: self.compute_threshold_best_n_percent() logger.info("Filtering with threshold set to %.2f..." % self.threshold) filtered_df = self.df filtered_df.drop( filtered_df[filtered_df.mean(axis=1) < self.threshold].index, inplace=True ) self.remaining_items_number = filtered_df.shape[0] self.remaining_reads_number = filtered_df.sum().sum() logger.info( "Started with %i items and a total of %i reads" % (self.raw_items_number, self.raw_reads_number) ) logger.info( "Saving new matrix with %i items and %i reads" % (self.remaining_items_number, self.remaining_reads_number) ) logger.info( "Retained %.2f%% of items and %.2f%% of reads" % ( self.remaining_items_number / self.raw_items_number * 100, self.remaining_reads_number / self.raw_reads_number * 100, ) ) return filtered_df
def _plot_threshold_vs_remaining_data(self, html_output_file, items_name="items"): """The x and y set below are are are either integers or floats. Be aware that some operation will require an np.array""" x_items = list(self._items_dict.keys()) # get x values for plotting x_reads = list(self._reads_dict.keys()) y_items = list(self._items_dict.values()) # get y values for plotting y_reads = list(self._reads_dict.values()) """ The figure visualizing the data and the filtering. """ fig = make_subplots(specs=[[{"secondary_y": True}]]) # To set a second y-axis. fig.add_trace( go.Scatter(x=x_items, y=y_items, name="Retained Items: left axis"), secondary_y=False, ) fig.add_trace( go.Scatter(x=x_reads, y=y_reads, name="Retained Reads: right axis"), secondary_y=True, ) fig.add_trace( go.Scatter( x=[self.threshold, self.threshold], y=[0, self.raw_reads_number], mode="lines", name="90% Reads Threshold", ), secondary_y=True, ) # Add figure title fig.update_layout( title_text="Number of %s and reads in function of the threshold value" % items_name, title_x=0.5, ) # Set x-axis title fig.update_xaxes(title_text="threshold value") # Set y-axes titles fig.update_yaxes(title_text="number of %s" % items_name, secondary_y=False) fig.update_yaxes(title_text="number of reads", secondary_y=True) fig.show() if html_output_file: plotly.io.write_html(fig, html_output_file)
[docs] def visualize(self, html_output_file: str = ""): """ method to visualize the filtering on the data :param html_output_file: name of the html output file """ if getattr(self, "_items_dict", None) is None: fs_instance = FilteringStats(self.df) self._items_dict, self._reads_dict = FilteringStats.by_mean(fs_instance) self._plot_threshold_vs_remaining_data(html_output_file)
[docs] def generate_report_data(self) -> dict: """ method that generates a report summurazing the filtering on the data (parameters, results) """ data_dic = { "threshold": self.threshold, "n_items_removed": self.raw_items_number - self.remaining_items_number, "n_reads_removed": self.raw_reads_number - self.remaining_reads_number, } if getattr(self, "percentage_to_keep", None) is not None: data_dic["percentage_to_keep"] = self.percentage_to_keep return {"title": "Filtering by mean", "data": data_dic}