#!/usr/bin/env python3
import dendropy
import re
def get_object_converter(object_conversion_schema):
if object_conversion_schema.lower() == 'dendropy':
return DendropyConvert()
raise ValueError('Currently only conversion to DendroPy objects is supported.')
_name_gap_ott_num = re.compile(r'^(.+)[ _]ott(\d+)$')
def _decorate_taxa_in_taxon_namespace_by_parsing_labels(tree):
taxon_namespace = tree.taxon_namespace
for nd in tree.preorder_node_iter():
if (nd.taxon is not None) or (not nd.label):
continue
m = _name_gap_ott_num.match(nd.label)
if m:
nd.taxon = taxon_namespace.new_taxon(nd.label)
for taxon in taxon_namespace:
label = taxon.label
m = _name_gap_ott_num.match(label)
if m:
name = m.group(1).strip()
if name and not hasattr(taxon, 'ott_taxon_name'):
taxon.ott_taxon_name = name
if not hasattr(taxon, 'ott_id'):
taxon.ott_id = int(m.group(2))
# noinspection PyMethodMayBeStatic
[docs]class DendropyConvert(object):
"""
Class to convert newicks to dendropy objects
"""
def tree_from_newick(self, newick, suppress_internal_node_taxa=False, **kwargs):
tree = dendropy.Tree.get(data=newick, schema="newick",
suppress_internal_node_taxa=suppress_internal_node_taxa, **kwargs)
_decorate_taxa_in_taxon_namespace_by_parsing_labels(tree)
return tree
def tree_list_from_newicks(self, newick_list, suppress_internal_node_taxa=False, **kwargs):
concat = '\n'.join(newick_list)
tree_list = dendropy.TreeList.get(data=concat,
schema="newick",
suppress_internal_node_taxa=suppress_internal_node_taxa,
**kwargs)
for tree in tree_list:
_decorate_taxa_in_taxon_namespace_by_parsing_labels(tree)
return tree_list
def taxon_namespace_and_id_dict_from_nexson_otus_obj(self, otus_obj, otus_id):
tn = dendropy.TaxonNamespace(label=otus_id)
id_to_taxon = {}
for oid, otu_obj in otus_obj.items():
dt = tn.new_taxon(oid)
if oid in id_to_taxon:
raise ValueError('otu id "{}" repeated'.format(oid))
id_to_taxon[oid] = dt
dt.otu = oid
dt.ott_taxon_name, dt.ott_id, dt.original_label = None, None, None
for meta_key, meta_v in otu_obj.items():
if meta_key == '^ot:ottTaxonName':
dt.ott_taxon_name = meta_v
elif meta_key == '^ot:originalLabel':
dt.original_label = meta_v
elif meta_key == '^ot:ottId':
dt.ott_id = meta_v
return tn, id_to_taxon
def tree_from_nexson(self, nexson, tree_id, label_format="ot:originallabel"):
to_taxon_attr = {
"ot:originallabel": "original_label",
"ot:ottid": "ott_id",
"ot:otttaxonname": "ott_taxon_name",
"id": "ott_id",
"name": "ott_taxon_name"
}
taxon_attr = to_taxon_attr[label_format.lower()]
nexml = nexson['nexml']
trees_sets_by_id = nexml.get('treesById', {})
if not trees_sets_by_id:
raise KeyError('No trees found in NexSON')
otu_set_id, tree_obj = None, None
for tree_set_id, tree_set in trees_sets_by_id.items():
otu_set_id = tree_set.get('@otus')
tbi = tree_set.get('treeById', {})
tree_obj = tbi.get(tree_id)
if tree_obj:
break
if tree_obj is None:
raise KeyError('Tree with id "{}" not found in NexSON'.format(tree_id))
if otu_set_id is None:
raise KeyError('Tree set missing "@otus" property')
otus_by_id = nexml['otusById']
otu_set = otus_by_id[otu_set_id]
# if len(otu_set) != 1:
# raise ValueError('expecting just "otuById" in OTUs object')
# # Hmmm. Some have otu_set.keys() = dict_keys(['@label', '^skos:historyNote', 'otuById']). Seems fine.
obi = otu_set["otuById"]
tn, id2taxon = self.taxon_namespace_and_id_dict_from_nexson_otus_obj(obi, otu_set_id)
for taxon in tn:
tl = getattr(taxon, taxon_attr, None)
if tl is not None:
taxon.label = tl
tree = dendropy.Tree(label=tree_obj.get('@label', tree_id), taxon_namespace=tn)
tt = tree_obj["@xsi:type"]
if tt == 'nex:FloatTree':
tree.length_type = float
root_node_id = tree_obj["^ot:rootNodeId"]
spec_root_id = tree_obj["^ot:specifiedRoot"]
if spec_root_id:
assert spec_root_id == root_node_id
non_annotations = frozenset(["^ot:inGroupClade", "^ot:rootNodeId", "^ot:specifiedRoot"])
for key, value in tree_obj.items():
if key in non_annotations:
continue
if key.startswith('^ot:'):
tree.annotations.add_new(name=key[4:], value=value)
nex_nds = tree_obj['nodeById']
edges_by_src = tree_obj['edgeBySourceId']
nd_id2dend = {}
nexson_nd = nex_nds[root_node_id]
_proc_nd(tree.seed_node, nexson_nd, id2taxon, root_node_id, nd_id2dend)
to_proc = [tree.seed_node]
while to_proc:
next_nd = to_proc.pop(0)
edges_dict = edges_by_src.get(next_nd.id, {})
for edge_id, e_dict in edges_dict.items():
new_nd = tree.node_factory()
assert e_dict['@source'] == next_nd.id
dest_id = e_dict['@target']
_proc_nd(new_nd, nex_nds[dest_id], id2taxon, dest_id, nd_id2dend)
next_nd.add_child(new_nd)
edge_len = e_dict.get('@length')
new_nd.edge.id = edge_id
if edge_len is not None:
new_nd.edge.length = edge_len
to_proc.append(new_nd)
return tree
def _proc_nd(node, nexson_nd, id2taxon, node_id, nd_id2dend):
node.id = node_id
nd_id2dend[node_id] = node
_decorate_nd(node, nexson_nd, id2taxon)
def _decorate_nd(node, nexson_nd, id2taxon):
otu_id = nexson_nd.get('@otu')
if otu_id:
node.taxon = id2taxon[otu_id]
node.label = node.id