Source code for moonstone.parsers.counts.taxonomy.metaphlan
from pandas import DataFrame
from moonstone.parsers.counts.taxonomy.base import BaseTaxonomyCountsParser
[docs]class BaseMetaphlanParser(BaseTaxonomyCountsParser):
[docs] def __init__(self, *args, analysis_type: str = 'rel_ab', **kwargs):
"""
Args:
analysis_type: output type of Metaphlan3 (see ``-t`` option of metaphlan3)
"""
self.analysis_type = analysis_type
super().__init__(*args, **kwargs)
[docs] def rows_differences(self, dataframe1, dataframe2) -> DataFrame:
rows_diff = dataframe1 - dataframe2
rows_diff[rows_diff.isnull()] = dataframe1
if self.analysis_type == 'rel_ab':
rows_diff[rows_diff < 0.0001] = 0
else:
rows_diff[rows_diff < 0] = 0
rows_diff = rows_diff.loc[rows_diff.sum(axis=1)[rows_diff.sum(axis=1) != 0].index]
return rows_diff
[docs] def compare_difference_between_two_levels(self, whole_df, df_at_lower_level, rank) -> DataFrame:
df_rank = whole_df[whole_df.index.map(lambda x: len(x.split('|'))) == rank]
# transformation lower_level to rank (level)
df_rank_computed = df_at_lower_level.copy()
df_rank_computed.index = df_rank_computed.index.map(lambda x: '|'.join(x.split('|')[:rank])) # to rank (level)
df_rank_computed = df_rank_computed.groupby(df_rank_computed.index).sum() # grouping by rank (level)
return self.rows_differences(df_rank, df_rank_computed)
[docs] def remove_duplicates(self, df) -> DataFrame:
df = df.set_index(self.taxa_column)
# dataframe at rank level
index_levels = df.index.map(lambda x: len(x.split('|'))) # first, creation of the index
self.rank_level = index_levels.max() # max rank level
first_rank = index_levels.min()
new_df = df[index_levels == self.rank_level]
# calculation of the total
if self.analysis_type == 'rel_ab':
total = 99.9999 # addition error margin
else:
total = df[df.index.map(lambda x: len(x.split('|'))) == first_rank].sum()
# verification that everything is defined up to the lower_level
samples_with_incomp_lowerlevel = new_df.sum()[new_df.sum() < total]
rank = self.rank_level
while samples_with_incomp_lowerlevel.size != 0 and rank > 1:
rank -= 1
rows_diff = self.compare_difference_between_two_levels(df, new_df, rank)
if rows_diff.size != 0:
new_df = new_df.append(rows_diff) # add missing rows to the dataframe of the lower level
# verification that everything is defined up to the lower_level
samples_with_incomp_lowerlevel = new_df.sum()[new_df.sum() < total]
new_df = new_df.reset_index()
return new_df
[docs]class Metaphlan2Parser(BaseMetaphlanParser):
"""
Parse output from `Metaphlan2 <https://github.com/biobakery/MetaPhlAn/>`_ merged table.
"""
taxa_column = 'ID'
def _load_data(self) -> DataFrame:
df = super()._load_data()
df = self.remove_duplicates(df)
df = self.split_taxa_fill_none(df, sep="|")
df = df.set_index(self.taxonomical_names[:self.rank_level])
return df
[docs]class Metaphlan3Parser(BaseMetaphlanParser):
"""
Parse output from `Metaphlan3 <https://github.com/biobakery/MetaPhlAn/>`_ merged table.
"""
taxa_column = 'clade_name'
NCBI_tax_column = 'NCBI_tax_id'
[docs] def __init__(self, *args, analysis_type: str = 'rel_ab', **kwargs):
"""
Args:
analysis_type: output type of Metaphlan3 (see ``-t`` option of metaphlan3)
"""
super().__init__(*args, analysis_type=analysis_type, parsing_options={'skiprows': 1}, **kwargs)
def _load_data(self) -> DataFrame:
df = super()._load_data()
df = df.drop(self.NCBI_tax_column, axis=1)
df = self.remove_duplicates(df)
df = self.split_taxa_fill_none(df, sep="|")
df = df.set_index(self.taxonomical_names[:self.rank_level])
return df