Source code for rna_tools.tools.rna_alignment.rna_alignment

#!/usr/bin/env python2
"""RNAalignment - a module to work with RNA sequence alignments.

To see a full demo what you can do with this util, please take a look at the jupiter notebook (https://github.com/mmagnus/rna-pdb-tools/blob/master/rna_tools/tools/rna_alignment/rna_alignment.ipynb)

Load an alignment in the Stockholm::

    alignment = ra.RNAalignment('test_data/RF00167.stockholm.sto')

 or fasta format::

     import rna_alignment as ra
     alignment = ra.fasta2stokholm(alignment.fasta)
     alignment = ra.RNAalignment

Parameters of the aligmnent::

     print(alignment.describe())

Consensus SS::

    print(alignment.ss_cons_with_pk)

Get sequnce/s from teh aligment::

    >>> seq = a.io[0]

"""

from Bio import AlignIO
from Bio.Phylo.TreeConstruction import DistanceCalculator
from rna_tools import SecondaryStructure
from rna_tools.rna_tools_config import RCHIE_PATH
from collections import OrderedDict

import tempfile
import subprocess
import os
import shutil
import re
import gzip


class RNAalignmentError(Exception):
    pass


class RChieError(Exception):
    pass


class RFAMFetchError(Exception):
    pass


[docs] class RChie: """RChie - plotting arc diagrams of RNA secondary structures. .. image:: ../pngs/rchie.png http://www.e-rna.org/r-chie/ The offline version of R-chie, which requires first installing R4RNA is available here, or clone our git repository here How to install it: - Ensure R is installed already, or download it freely from http://www.r-project.org/ - Download the R4RNA (https://github.com/jujubix/r-chie), open R and install the package:: install.packages("<path_to_file>/R4RNA", repos = NULL, type="source") # Install the optparse and RColorBrewer install.packages('optparse') install.packages('RColorBrewer') - Go to rna_tools/rna_tools_config_local.py and set RCHIE_PATH to the folder with RChie, e.g. ``"/home/magnus/work/opt/r-chie/"``. To test if Rchie works on your machine (from rna_align folder):: <path to your rchie>/rchie.R --msafile test_data/rchie_test_files/fasta.txt test_data/rchie_test_files/helix.txt you should have rchie.png file in the folder. More at http://www.e-rna.org/r-chie/download.cgi Cite: Daniel Lai, Jeff R. Proctor, Jing Yun A. Zhu, and Irmtraud M. Meyer (2012) R-chie: a web server and R package for visualizing RNA secondary structures. Nucleic Acids Research, first published online March 19, 2012. doi:10.1093/nar/gks241 """ def __init__(self): pass
[docs] def plot_cov(self, seqs, ss_cons, plot_fn='rchie.png', verbose=False): """Plot an RChie plot_conv. :param seqs: seqs in the fasta format :param ss_cons: a string of secondary structure consensus, use only ``().``. Works with pseuoknots. """ fasta_alignment = tempfile.NamedTemporaryFile(delete=False) with open(fasta_alignment.name, 'w') as f: f.write(seqs) plot = tempfile.NamedTemporaryFile(delete=False) plot.name += '.png' ss = tempfile.NamedTemporaryFile(delete=False) with open(ss.name, 'w') as f: f.write(ss_cons) if not RCHIE_PATH: raise RChieError('RChie path not set up!') cmd = RCHIE_PATH + \ "rchie.R --msafile='%s' --format1 vienna '%s' --output '%s'" % ( fasta_alignment.name, ss.name, plot.name) if verbose: print(cmd) o = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out = o.stdout.read().strip() err = o.stderr.read().strip() # *****PROCESSING MSA FILE***** # Error in readFasta(opt$msafile, filter = TRUE) : no FASTA sequences found # Error: ERROR: Invalid FASTA file # Execution halted if "error" in str(err).lower(): raise Exception('\n'.join([cmd, err])) if verbose: print('\n'.join([cmd, err])) self.plotfn = plot.name if verbose: print(self.plotfn) if plot_fn: shutil.move(plot.name, plot_fn) print('Rchie: plot saved to %s' % plot_fn) from IPython.display import Image return Image(filename=plot_fn)
[docs] def show(self): from IPython.display import Image return Image(filename=self.plotfn)
[docs] def write(self, outfn): shutil.copyfile(self.plotfn, outfn) print('Write to %s' % outfn)
[docs] class RNASeq(object): """RNASeq. Args: id (str) : id of a sequence seq (str) : seq, it be uppercased. ss (str) : secondary structure, default None Attributes: seq_no_gaps(str) : seq.replace('-', '') ss_no_gaps(str) : ss.replace('-', '') .. warning:: >>> if 'EF' in s.id: print('Y') Y >>> if 'EF' in s: print('Y') # nothing """ def __init__(self, id, seq, ss=None): self.id = id self.seq = seq.upper() # self.ss_raw = ss # this will not be changed after remove_gaps. # so maybe don't use ss_raw at call self.ss = ss self.ss = self.get_ss_std() self.seq_no_gaps = seq.replace('-', '') self.ss_no_gaps = ss.replace('-', '') #@property
[docs] def get_ss_std(self): nss = '' for s in self.ss: nss += get_rfam_ss_notat_to_dot_bracket_notat(s) return nss
def __repr__(self): return self.id def __len__(self): return len(self.seq) def __getitem__(self, i): if self.ss: return RNASeq(self.id + '_slice', self.seq[i], self.ss[i]) else: return RNASeq(self.id + '_slice', self.seq[i])
[docs] def remove_columns(self, to_remove): """indexing from 0""" nseq = '' for i, s in enumerate(self.seq): if i not in to_remove: nseq += s nss = '' if self.ss: for i, s in enumerate(self.ss): if i not in to_remove: nss += s self.seq = nseq self.ss = nss
[docs] def draw_ss(self, title='', verbose=False, resolution=1.5): """Draw secondary structure of RNA with VARNA. VARNA: Visualization Applet for RNA A Java lightweight component and applet for drawing the RNA secondary structure .. image :: ../pngs/varna.png Cite: VARNA: Interactive drawing and editing of the RNA secondary structure Kevin Darty, Alain Denise and Yann Ponty Bioinformatics, pp. 1974-197,, Vol. 25, no. 15, 2009 http://varna.lri.fr/""" drawfn = tempfile.NamedTemporaryFile(delete=False).name + '.png' SecondaryStructure.draw_ss(title, self.seq, self.ss, drawfn, resolution, verbose=verbose) from IPython.display import Image return Image(filename=drawfn)
[docs] def remove_gaps(self, check_bps=True, only_canonical=True, allow_gu=True): """Remove gaps from seq and secondary structure of the seq. Args: check_bps (bool) : fix mistakes as only_canonical (bool) : keep in ss only pairs GC, AU allow_gu (bool) : keep in ss also GU pair .. image :: ../pngs/ss_misgap.png A residue "paired" with a gap. .. image :: ../pngs/ss_misgap_wrong.png .. when check_bps (by default), then when removing gaps, the functions check is the gap is paired with any residues (in the blue circle). If yes, then this residues is unpair (in this case ``)`` -> ``.``). .. image :: ../pngs/ss_misgap_ok.png if ``only_canonical`` (by default) is True then only GC, AU can be paired. .. image :: ../pngs/ss_only_canonical.png If ``allow_gu`` is False (be default is True) then GU pair is also possible. .. image :: ../pngs/ss_no_gu.png If you provide seq and secondary structure such as:: GgCcGGggG.GcggG.cc.u.aAUACAAuACCC.GaAA.GGGGAAUAaggCc.gGCc.gu......CU.......uugugcgGUuUUcaAgCccCCgGcCaCCcuuuu (((((((((....((.((............(((......)))......))))..(((.(.....................)))).......)))))))))........ gaps will be remove as well. """ GAPS = ['-', '.'] nseq = '' nss = '' for (nt, nt_ss) in zip(self.seq, self.ss): if nt in GAPS and nt_ss in GAPS: pass else: nseq += nt nss += nt_ss self.seq = nseq self.ss = nss nss = list() bps = self.ss_to_bps() if check_bps: nss = list(self.ss) for bp in bps: nt_left = self.seq[bp[0]] nt_right = self.seq[bp[1]] if nt_left == '-' or nt_right == '-': nss[bp[0]] = '.' nss[bp[1]] = '.' self.ss = ''.join(nss) if only_canonical: nseq = list(self.seq) nss = list(self.ss) for bp in bps: nt_left = nseq[bp[0]] nt_right = nseq[bp[1]] if (nt_left == 'A' and nt_right == 'U') or (nt_left == 'U' and nt_right == 'A'): pass elif (nt_left == 'G' and nt_right == 'C') or (nt_left == 'C' and nt_right == 'G'): pass elif (nt_left == 'G' and nt_right == 'U') or (nt_left == 'U' and nt_right == 'G'): if allow_gu: pass else: nss[bp[0]] = '.' nss[bp[1]] = '.' else: nss[bp[0]] = '.' nss[bp[1]] = '.' self.ss = ''.join(nss) # two?????? what is "two"? nss = [] nseq = '' for i, (c, s) in enumerate(zip(self.seq, self.ss)): if c != '-': nseq += c nss.append(s) self.seq = nseq self.ss = ''.join(nss)
[docs] def ss_to_bps(self): """Convert secondary structure into a list of basepairs. Returns: bps (list): a list of base pairs, e.g. [[0, 80], [1, 79], [2, 78], [4, 77], [6, 75], [7, 74], ...] """ j = [] bps = [] pair_types = ['()', '[]', '<>', '{}'] for pair_type in pair_types: for i, s in enumerate(self.ss): if s == pair_type[0]: j.append(i) if s == pair_type[1]: bps.append([j.pop(), i]) if len(j): # if something left, this is a problem (!!!) raise Exception('Mis-paired secondary structure') bps.sort() return bps
[docs] def get_conserved(self, consensus, start=0, to_pymol=True, offset=0): """Start UCGGGGUGCCCUUCUGCGUG--------------------------------------------------AAGGC-UGAGAAAUACCCGU-------------------------------------------------AUCACCUG-AUCUGGAU-AAUGC XXXXXXXXXXXXGXGXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX----------------------------XXXXX-XCUGAGAXXXXXXXXXXXXXXXXXXXXXX----------------------------------XXXXXXXX-XXXXXXXX-ACXUG """ c = start + offset index = [] print(self.seq) print(consensus) for nt_seq, nt_consensus in zip(self.seq, consensus): if nt_consensus in ['-', 'X']: pass else: index.append(c) if nt_seq != '-': c += 1 if to_pymol: return("color red, " + str(index).replace(', ', '+').replace('[','').replace(']','')) else: return index
[docs] def get_distance_to(self, nseq): """Get distance of self.seq to nseq.""" try: import Levenshtein except: print('pip install python-Levenshtein') return round(Levenshtein.ratio(self.seq, nseq), 2)
[docs] class RNAalignment(object): """RNA alignment - adapter class around BioPython to do RNA alignment stuff Usage (for more see IPython notebook https://github.com/mmagnus/rna-tools/blob/master/rna_tools/tools/rna_alignment/rna_alignment.ipynb) >>> a = RNAalignment('test_data/RF00167.stockholm.sto') >>> print(a.tail()) >>> print(a.ss_cons) Args: fn (str): Filename io (Bio.AlignIO): AlignIO.read(fn, "stockholm") lines (list): List of all lines of fn seqs (list): List of all sequences as class:`RNASeq` objects rf (str): ReFerence annotation, the consensus RNA sequence Read more: - http://biopython.org/DIST/docs/api/Bio.AlignIO.StockholmIO-module.html and on the format itself - https://en.wikipedia.org/wiki/Stockholm_format - http://sonnhammer.sbc.su.se/Stockholm.html .. warning:: fetch requires urllib3 """ def __init__(self, fn='', fetch=''): if fetch: import urllib3 http = urllib3.PoolManager() response = http.request('GET', 'http://rfam.xfam.org/family/' + fetch + '/alignment/stockholm?gzip=1&download=1') if not response.status == 200: raise RFAMFetchError( "The alignment could not be downloaded. Please check the RFAM id that you requested! (don't put .stk etc in the id)") with open(fetch + '.stk.gz', 'wb') as f: f.write(response.data) with gzip.open(fetch + '.stk.gz', 'rb') as f: file_content = f.read() with open(fetch + '.stk', 'wb') as f: f.write(file_content) fn = fetch + '.stk' self.fn = fn self.lines = open(fn).read().split('\n') self.io = AlignIO.read(fn, "stockholm") self.ss_cons = self.get_ss_cons() self.ss_cons_pk = self.get_ss_cons_pk() self.copy_ss_cons_to_all() self._ss_cons_std = self.ss_cons self.rf = self.get_gc_rf() self.shift = self.get_shift_seq_in_align() # get all lines not # nor // # fix for blocked alignment seq_lines = [l for l in self.lines if ( not l.startswith('#')) and (not l.startswith('//')) and (l)] seqs_dict = OrderedDict() for seq in seq_lines: seq_id, seq_seq = seq.split() if seq_id not in seqs_dict: seqs_dict[seq_id] = seq_seq else: seqs_dict[seq_id] += seq_seq self.seqs = [] for seq in seqs_dict: self.seqs.append(RNASeq(seq, seqs_dict[seq], ss=self.ss_cons_with_pk_std)) # this is sick! # I create a Cols object to be able to slice alignments class Cols: def __init__(self, alignment): self.alignment = alignment def __getitem__(self, i): """Return new alignment""" if type(i) is list: pass else: # collect "new" sequences n_seqs = [] for s in self.alignment: new_seq = RNASeq(s.id, s.seq[i], s.ss[i]) n_seqs.append(new_seq) # this is not very smart :( # save new seqs to a file # and load it as RNAalignment tf = tempfile.NamedTemporaryFile(delete=False) tf.name += '.stk' with open(tf.name, 'w') as f: f.write('# STOCKHOLM 1.0\n') for s in n_seqs: f.write(' '.join([s.id, s.seq, '\n'])) # add ss_cons & // f.write('#=GC SS_cons ' + self.alignment.ss_cons[i] + '\n') if self.alignment.ss_cons_pk: f.write('#=GC SS_cons_pk' + self.alignment.ss_cons_pk[i] + '\n') f.write('#=GC RF ' + self.alignment.rf[i] + '\n') f.write('//\n') return RNAalignment(tf.name) self.cols = Cols(self) # ^^^^ sick ^^^^^^^^^^^
[docs] def reload_alignment(self): tmpfn = tempfile.NamedTemporaryFile(delete=False).name self.write(tmpfn) self.io = AlignIO.read(tmpfn, "stockholm")
def __len__(self): """Return length of all sequenes.""" return len(self.seqs) def __getitem__(self, i): if type(i) is str: for s in self: if s.id == i: return s elif type(i) is list: seqs = [] for j in i: seqs.append(self.seqs[j]) return seqs else: return self.seqs[i] @property def ss_cons_std(self): return get_rfam_ss_notat_to_dot_bracket_notat(self.ss_cons) @ss_cons_std.setter def ss_cons_std(self, ss): self._ss_cons_std = ss print(self._ss_cons_std)
[docs] def subset(self, ids, verbose=False): """Get subset for ids:: # STOCKHOLM 1.0 #=GF WK Tetrahydrofolate_riboswitch .. AAQK01002704.1/947-1059 -U-GC-AAAAUAGGUUUCCAUGC.. #=GC SS_cons .(.((.((----((((((((((... #=GC RF .g.gc.aGAGUAGggugccgugc.. // """ nalign = '' for l in self.lines: if l.startswith('//'): nalign += l + '\n' if l.startswith('#'): nalign += l + '\n' else: for i in ids: if l.startswith(i): nalign += l + '\n' tf = tempfile.NamedTemporaryFile(delete=False) tf.name += '.stk' print('Saved to ', tf.name) if verbose: print(nalign) f = open(tf.name, 'w') f.write(nalign) #f.write(self.rf + '\n') f.close() return RNAalignment(tf.name)
def __add__(self, rna_seq): self.seqs.append(rna_seq) self.reload_alignment()
[docs] def write(self, fn, verbose=False): """Write the alignment to a file""" if verbose: print('Save to ', fn) with open(fn, 'w') as f: f.write('# STOCKHOLM 1.0\n') shift = max([len(x) for x in [s.id for s in self.seqs] + ['#=GC=SS_cons']]) for s in self: f.write(s.id.ljust(shift + 2, ' ') + s.seq + '\n') f.write('#=GC SS_cons'.ljust(shift + 2, ' ') + self.ss_cons + '\n') f.write('//')
[docs] def copy_ss_cons_to_all(self, verbose=False): for s in self.io: if verbose: self.ss_cons self.io[0].seq try: s.letter_annotations['secondary_structure'] = self.ss_cons except TypeError: raise Exception( 'Please check if all your sequences and ss lines are of the same length!') s.ss = self.ss_cons s.ss_clean = self.get_clean_ss(s.ss) s.seq_nogaps = str(s.seq).replace('-', '') s.ss_nogaps = self.get_ss_remove_gaps(s.seq, s.ss_clean)
[docs] def copy_ss_cons_to_all_editing_sequence(self, seq_id, before, after): """Change a sequence's sec structure. :param seq_id: string, sequence id to change, eg: ``AE009948.1/1094322-1094400`` :param before: string, character to change from, eg: ``,`` :param after: string, character to change to, eg: ``.`` .. warning:: before and after has to be one character long """ for s in self.io: if s.id == seq_id: s.letter_annotations['secondary_structure'] = self.ss_cons.replace(before, after) else: s.letter_annotations['secondary_structure'] = self.ss_cons
[docs] def get_ss_remove_gaps(self, seq, ss): """ :param seq: string, sequence :param ss: string, ss UAU-AACAUAUAAUUUUGACAAUAUGG-GUCAUAA-GUUUCUACCGGAAUACC--GUAAAUAUUCU---GACUAUG-UAUA- (((.(.((((,,,(((((((_______.))))))).,,,,,,,,(((((((__.._____))))))...),,)))).)))). """ nss = '' for i, j in zip(seq, ss): if i != '-': nss += j return nss
[docs] def plot(self, plot_fn='rchie.png'): return RChie().plot_cov(self.io.format("fasta"), self.ss_cons_std, plot_fn)
[docs] def get_ss_cons_pk(self): """ :return: SS_cons_pk line or None if there is now SS_cons_pk:""" ss_cons_pk = '' for l in self.lines: if l.startswith('#=GC SS_cons_pk'): ss_cons_pk += l.replace('#=GC SS_cons_pk', '').strip() return ss_cons_pk
[docs] def get_ss_cons(self): """ :return: SS_cons_pk line or None if there is now SS_cons_pk. """ ss_cons = '' for l in self.lines: if '#=GC SS_cons' in l and '#=GC SS_cons_pk' not in l: ss_cons += l.replace('#=GC SS_cons', '').strip() return ss_cons
@property def ss_cons_with_pk(self): """go over ss_cons and overwrite bp is there is pk (ss_cons_pk) ss_cons: (((.(.((((,,,(((((((_______.))))))).,,,,,,,,(((((((__.._____))))))...),,)))).)))). ss_cons_pk: .......................[[...............................]]........................ ss_cons_with_pk: (((.(.((((,,,(((((((___[[__.))))))).,,,,,,,,(((((((__.._]]__))))))...),,)))).)))). "return ss_cons_with_pk: string, e.g. (((.(.((((,,,(((((((___[[__.))))""" if self.ss_cons_pk: ss_cons_with_pk = '' for i, (s, p) in enumerate(zip(self.ss_cons, self.ss_cons_pk)): if p != '.': ss_cons_with_pk += p else: ss_cons_with_pk += s return ss_cons_with_pk else: return self.ss_cons @property def ss_cons_with_pk_std(self): if self.ss_cons_pk: ss_cons_with_pk_std = '' for i, (s, p) in enumerate(zip(get_rfam_ss_notat_to_dot_bracket_notat(self.ss_cons), self.ss_cons_pk)): if p != '.': ss_cons_with_pk_std += p else: ss_cons_with_pk_std += s return ss_cons_with_pk_std else: return self.ss_cons
[docs] def get_gc_rf(self): """Return (str) ``#=GC RF`` or '' if this line is not in the alignment. """ for l in self.lines: if l.startswith('#=GC RF'): return l.replace('#=GC RF', '').replace('_cons', '').strip() else: return ''
# raise RNAalignmentError('There is on #=GC RF in the alignment!')
[docs] def get_shift_seq_in_align(self): """RF_cons vs '#=GC RF' ???""" for l in self.lines: if l.startswith('#=GC RF'): # #=GC RF .g.gc.a l = l.replace('#=GC RF', '') c = 7 # 12 # len of '#=GC RF' # .g.gc.a for i in l: if i == ' ': c += 1 self.shift = c return c
[docs] def map_seq_on_seq(self, seq_id, seq_id_target, resis, v=True): """ :param seq_id: seq_id, 'AAML04000013.1/228868-228953' :param seq_id_target: seq_id of target, 'CP000721.1/2204691-2204778' :param resis: list resis, [5,6] map:: [4, 5, 6] UAU-A UAU-AA UAU-AAC [5, 6, 7] CAC-U CAC-U- CAC-U-U [4, None, 5] """ # get number for first seq print(resis) nresis = [] for s in self.io: if s.id.strip() == seq_id.strip(): for i in resis: print(s.seq[:i + 1]) # UAU-A nresis.append(i + s.seq[:i].count('-')) print(nresis) print(self.map_seq_on_align(seq_id_target, nresis)) return resis_target = [] for s in self.io: if s.id.strip() == seq_id_target.strip(): for i in nresis: if v: print(s.seq[:i]) if s.seq[i - 1] == '-': resis_target.append(None) else: resis_target.append(i - s.seq[:i].count('-')) return resis_target
[docs] def map_seq_on_align(self, seq_id, resis, v=True): """ :param seqid: seq_id, 'CP000721.1/2204691-2204775' :param resis: list resis, [5,6] maps:: [5, 6, 8] CAC-U CAC-U- CAC-U-UA [4, None, 6] """ if v: print(resis) nresis = [] for s in self.io: if s.id.strip() == seq_id.strip(): for i in resis: if v: print(s.seq[:i]) if s.seq[i - 1] == '-': nresis.append(None) else: nresis.append(i - s.seq[:i].count('-')) return nresis
[docs] def head(self): return '\n'.join(self.lines[:5])
[docs] def tail(self): return '\n'.join(self.lines[-5:])
[docs] def describe(self): """Describe the alignment. > print(a.describe()) SingleLetterAlphabet() alignment with 13 rows and 82 columns """ return str(self.io).split('\n')[0]
[docs] def remove_empty_columns(self, verbose=False): """Remove empty columns in place. Example:: >>> a = RNAalignment("test_data/zmp.stk") >>> print(a) SingleLetterAlphabet() alignment with 6 rows and 319 columns ---ACCUUGCGCGACUGGCGAAUCC-------------------...AAU CP001644.1/756294-756165 --GCUCUCGCGCGACUGGCGACUUUG------------------...GAA CU234118.1/352539-352459 UGAGUUUUCUGCGACUGACGGAUUAU------------------...CUG BAAV01000055.1/2897-2982 GCCCGUUCGCGUGACUGGCGCUAGU-------------------...CGA CP000927.1/5164264-5164343 -----GGGUCGUGACUGGCGAACA--------------------...--- zmp UCACCCCUGCGUGACUGGCGAUA---------------------...GUU AP009385.1/718103-718202 >>> a.remove_empty_columns() >>> print(a) SingleLetterAlphabet() alignment with 6 rows and 138 columns ---ACCUUGCGCGACUGGCGAAUCC-UGAAGCUGCUUUG-AGCG...AAU CP001644.1/756294-756165 --GCUCUCGCGCGACUGGCGACUUUG------------------...GAA CU234118.1/352539-352459 UGAGUUUUCUGCGACUGACGGAUUAU------------------...CUG BAAV01000055.1/2897-2982 GCCCGUUCGCGUGACUGGCGCUAGU-------------------...CGA CP000927.1/5164264-5164343 -----GGGUCGUGACUGGCGAACA--------G-----------...--- zmp UCACCCCUGCGUGACUGGCGAUA--------GAACCCUCGGGUU...GUU AP009385.1/718103-718202 go over all seq modifes self.nss_cons""" cols_to_rm = [] # get only seqs for i in range(len(self[0])): gap = True for seq in self: if seq.seq[i] != '-': gap = False if gap: cols_to_rm.append(i) # remove from sequences for s in self: s.remove_columns(cols_to_rm) # update io # hack # tmpfn = tempfile.NamedTemporaryFile(delete=False).name self.write(tmpfn, verbose=False) self.io = AlignIO.read(tmpfn, "stockholm") # nss_cons update nss_cons = '' for i, s in enumerate(self.ss_cons): if i not in cols_to_rm: nss_cons += s self.ss_cons = nss_cons
[docs] def format_annotation(self, t): return self.shift * ' ' + t
[docs] def find_core(self, ids=None): """Find common core for ids. .. image:: ../pngs/find_core.png Fig. By core, we understand columns that have all homologous residues. The core is here marked by `x`. :param id: list, ids of seq in the alignment to use """ if not ids: ids = [] for s in self.io: ids.append(s.id) xx = list(range(0, len(self.io[0]))) for i in range(0, len(self.io[0])): # if . don't use it for s in self.io: # print s.id if s.id in ids: if s.seq[i] == '-': xx[i] = '-' break else: xx[i] = 'x' return ''.join(xx) shift = self.get_shift_seq_in_align() fnlist = open(self.fn).read().strip().split('\n') fnlist.insert(-2, 'x' + ' ' * (shift - 1) + ''.join(xx)) # print fnlist for l in fnlist: print(l)
[docs] def find_seq(self, seq, verbose=False): """Find seq (also subsequences) and reverse in the alignment. Args: seq (str): seq is upper() verbose (bool): be verbose :: seq = "ggaucgcugaacccgaaaggggcgggggacccagaaauggggcgaaucucuuccgaaaggaagaguaggguuacuccuucgacccgagcccgucagcuaaccucgcaagcguccgaaggagaauc" hit = a.find_seq(seq, verbose=False) ggaucgcugaacccgaaaggggcgggggacccagaaauggggcgaaucucuuccgaaaggaagaguaggguuacuccuucgacccgagcccgucagcuaaccucgcaagcguccgaaggagaauc Match: AL939120.1/174742-174619 ID: AL939120.1/174742-174619 Name: AL939120.1 Description: AL939120.1/174742-174619 Number of features: 0 /start=174742 /end=174619 /accession=AL939120.1 Per letter annotation for: secondary_structure Seq('CCAGGUAAGUCGCC-G-C--ACCG---------------GUCA-----------...GGA', SingleLetterAlphabet()) GGAUCGCUGAACCCGAAAGGGGCGGGGGACCCAGAAAUGGGGCGAAUCUCUUCCGAAAGGAAGAGUAGGGUUACUCCUUCGACCCGAGCCCGUCAGCUAACCUCGCAAGCGUCCGAAGGAGAAUC """ seq = seq.replace('-', '').upper() for s in self.io: seq_str = str(s.seq).replace('-', '').upper() if verbose: print(seq_str) if seq_str.find(seq) > -1 or seq.find(seq_str) > -1: print('Match:', s.id) print(s) print(seq) return s print('Not found')
[docs] def find_seq_exact(self, seq, verbose=False): """Find seq (also subsequences) and reverse in the alignment. :param seq: string, seq, seq is upper() :param verbose: boolean, be verbose or not """ seq = seq.replace('-', '').upper() for s in self.io: seq_str = str(s.seq).replace('-', '').upper() if verbose: print(seq_str) if seq_str == seq: print('Match:', s.id) print(s) print(seq) return s print('Not found')
[docs] def get_clean_ss(self, ss): nss = '' for s in ss: nss += get_rfam_ss_notat_to_dot_bracket_notat(s) return nss
[docs] def get_seq_ss(self, seq_id): # seq,ss): s = self.get_seq(seq_id).seq # print seq, ss # new nseq = '' nss = '' for i, j in zip(seq, ss): if i != '-': # gap # print i,j nseq += i nss += get_rfam_ss_notat_to_dot_bracket_notat(j) return nseq.strip(), nss.strip()
[docs] def get_seq(self, seq_id): for s in self.io: if s.id == seq_id: return s raise Exception('Seq not found')
[docs] def get_seq_with_name(self, seq_name): for s in self.io: if s.name == seq_name: return s raise Exception('Seq not found')
[docs] def align_seq(self, seq): """Align seq to the alignment. Using self.rf. Args: seq (str): sequence, e.g. ``-GGAGAGUA-GAUGAUUCGCGUUAAGUGUGUGUGA-AUGGGAUGUC...`` Returns: str: seq that can be inserted into alignemnt, ``.-.GG.AGAGUA-GAUGAUUCGCGUUA`` ! . -> - """ seq = list(seq) seq.reverse() nseq = '' for n in self.rf: # n nuclotide if n != '.': try: j = seq.pop() except: j = '.' nseq += j if n == '.': nseq += '.' # + j return nseq.replace('.', '-')
def __repr__(self): return (str(self.io))
[docs] def trimmed_rf_and_ss(self): """Remove from RF and SS gaps. Returns: (str,str): trf, tss - new RF and SS """ trf = '' tss = '' for r, s in zip(self.rf, self.ss_cons_std): if r not in ['-', '.']: trf += r tss += s return trf, tss
[docs] def get_distances(self): """Get distances (seq identity) all-vs-all. With BioPython. blastn: ``Bad alphabet 'U' in sequence 'AE008922.1/409481-409568' at position '7'`` only for DNA? read more (also about matrix at <http://biopython.org/wiki/Phylo> and HTTP://biopython.org/DIST/docs/api/Bio.Phylo.TreeConstruction.DistanceCalculator-class.html """ calculator = DistanceCalculator('identity') dm = calculator.get_distance(self.io) return dm
[docs] def get_the_closest_seq_to_ref_seq(self, verbose=False): """ Example:: >>> a = RNAalignment("test_data/RF02221.stockholm.sto") >>> a.get_the_closest_seq_to_ref_seq() AF421314.1/431-344 """ self + RNASeq('ConSeq', self.rf, '') dist = self.get_distances() distConSeq = dist['ConSeq'][:-1] # to remove ending 0, bc of distance to itself minimal = min(distConSeq) index = dist['ConSeq'].index(minimal) #id = dist.names[index] if verbose: print('dist:\n', str(dist)) print('distConSeq:', dist['ConSeq']) return self[index]
[docs] class CMAlign(): """CMAalign class around cmalign (of Inferal). cmalign - aligns the RNA sequences in <seqfile> to the covariance model (CM) in <cmfile>. The new alignment is output to stdout in Stockholm format. Example:: cma = ra.CMAlign() cma.run_cmalign("ade_seq.fa", "RF00167.cm") seq = cma.get_seq() print 'cma hit ', seq print 'seq ', a.align_seq(seq) print 'a.rf ', a.rf cmd cmalign -g RF00167.cm ade_seq.fa # STOCKHOLM 1.0 #=GF AU Infernal 1.1.2 ade ----------------CGCUUCAUAUAAUCCUAAUGAUAUGGUUUGGGAGUUUCUACCAAGAG-CCUUAAA-CUCUUGAUUAUGAAGUGA------------ #=GR ade PP ................99*********************************************.*******.***************999............ #=GC SS_cons :::::::::::::::::((((((((,,,<<<<<<<_______>>>>>>>,,,,,,,,<<<<<<<_______>>>>>>>,,)))))))):::::::::::::: #=GC RF aaaaaauaaaaaaaauucccuCgUAUAAucccgggAAUAUGGcccgggaGUUUCUACCaggcagCCGUAAAcugccuGACUAcGagggaaauuuuuuuuuuu // cma hit ----------------CGCUUCAUAUAAUCCUAAUGAUAUGGUUUGGGAGUUUCUACCAAGAG-CCUUAAA-CUCUUGAUUAUGAAGUGA------------ seq ----------------CGCU-U-CAUAUAAUCCUAAUGAUAUGG-UUUGGGA-GUUUCUACCAAGAG-CC--UUAAA-CUCUU---GAUUAUG-AAGUGA------------- a.rf aaaaaauaaaaaaaauuccc.u.CgUAUAAucccgggAAUAUGG.cccggga.GUUUCUACCaggcagCC..GUAAAcugccu...GACUAcG.agggaaauuuuuuuuuuu. Install http://eddylab.org/infernal/ Cite: Nawrocki and S. R. Eddy, Infernal 1.1: 100-fold faster RNA homology searches, Bioinformatics 29:2933-2935 (2013). """ def __init__(self, outputfn=None): """Use run_cmalign or load cmalign output from a file""" if outputfn: self.output = open(outputfn).read().strip().split('\n')
[docs] def run_cmalign(self, seq, cm, verbose=True): """Run cmalign and process the result. :param seq: seq string :param cm: cm fn Run:: $ cmalign RF01831.cm 4lvv.seq # STOCKHOLM 1.0 #=GF AU Infernal 1.1.2 4lvv -GGAGAGUA-GAUGAUUCGCGUUAAGUGUGUGUGA-AUGGGAUGUCG-UCACACAACGAAGC---GAGA---GCGCGGUGAAUCAUU-GCAUCCGCUCCA #=GR 4lvv PP .********.******************9999998.***********.8999999******8...5555...8**************.************ #=GC SS_cons (((((----(((((((((((,,,,,<<-<<<<<<<<___________>>>>>>>>>>,,,<<<<______>>>>,,,)))))))))))-------))))) #=GC RF ggcaGAGUAGggugccgugcGUuAAGUGccggcgggAcGGGgaGUUGcccgccggACGAAgggcaaaauugcccGCGguacggcaccCGCAUcCgCugcc // .. warning :: requires cmalign to be set in your shell """ tf = tempfile.NamedTemporaryFile(delete=False) tf.name += '.seq' with open(tf.name, 'w') as f: f.write('>target\n') f.write(seq + '\n') cmd = 'cmalign -g ' + cm + ' ' + tf.name # global if verbose: print('cmd' + cmd) o = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = o.stdout.read().strip() stderr = o.stderr.read().strip() if verbose: print(stdout) self.output = stdout.split('\n')
[docs] def get_gc_rf(self): """Get ``#=GC RF``. :var self.output: string """ for l in self.output: if l.startswith('#=GC RF'): return l.replace('#=GC RF', '').strip()
[docs] def get_seq(self): """ :var self.output: output of cmalign, string """ for l in self.output: if l.strip(): if not l.startswith('#'): # 4lvv -GGAGAGUA-GAUGAU return l.split()[1].strip()
def clean_seq_and_ss(seq, ss): nseq = '' nss = '' for i, j in zip(seq, ss): if i != '-': # gap # print i,j nseq += i nss += get_rfam_ss_notat_to_dot_bracket_notat_per_char(j) return nseq.strip(), nss.strip() def get_rfam_ss_notat_to_dot_bracket_notat_per_char(c): """Take (c)haracter and standardize ss (including pks in letter (AAaa) notation). .. warning:: DD DD will be treated as BB BB (<< >>) (it might be wrong!)""" if c in [',', '_', ':', '-']: return '.' if c == '<': return '(' if c == '{': return '(' if c == '}': return ')' if c == ']': return ')' if c == '[': return '(' if c == '>': return ')' if c == '>': return ')' if c == 'A': return '[' if c == 'a': return ']' if c == 'B': return '<' if c == 'b': return '>' if c == 'C': return '{' if c == 'c': return '}' # !!!!!!!!!!!!!!!!!!!!!!!!!! if c == 'D': return '<' if c == 'd': return '>' # !!!!!!!!!!!!!!!!!!!!!!!!!! return c def get_rfam_ss_notat_to_dot_bracket_notat(ss): """Change all <<>> to "standard" dot bracket notation. Works also with pseudknots AA BB CC etc.""" nss = '' for s in ss: ns = get_rfam_ss_notat_to_dot_bracket_notat_per_char(s) nss += ns return nss def fasta2stokholm(fn): """Take a gapped fasta file and return an RNAalignment object. A fasta file should look like this:: >AE009948.1/1094322-1094400 UAC-U-UAUUUAUGCUGAGGAU--UGG--CUUAGC-GUCUCUACAAGACA-CC--GU-AA-UGUCU---AACAAUA-AGUA- ... >CP000721.1/2204691-2204778 CAC-U-UAUAUAAAUCUGAUAAUAAGG-GUCGGAU-GUUUCUACCAAGCUACC--GUAAAUUGCUAUAGGACUAUA-AGUG- >SS_cons (((.(.((((...(((((((........))))))).........(((((((.........))))))...)..)))).)))). >SS_cons_pk .........................[[........................]]............................. ``SS_cons_pk`` in optionally and is an extra line used to define a pseudoknot. You can also define second psedoknot as ``<<<...>>>`` and the third one with ``{{{ }}}``. :param fn: file :return: RNAalignment object """ seqs = [] s = None for l in open(fn): if l.startswith('>'): if s: seqs.append(s) s = RNASeq(l.replace('>', '').strip(), '') else: s.seq += l.strip() txt = '' for l in open(fn): if l.startswith('>'): id = '\n' + l.replace('>', '\n').strip() + ' ' id = re.sub('ss_cons_pk', '#=GC SS_cons_pk', id, flags=re.IGNORECASE) id = re.sub('ss_cons', '#=GC SS_cons', id, flags=re.IGNORECASE) txt += id else: txt += l.strip() txt = txt.strip() + '\n' # clean upfront \n and add tailing \n tf = tempfile.NamedTemporaryFile(delete=False) tf.name += '.stk' with open(tf.name, 'w') as f: f.write('# STOCKHOLM 1.0\n') f.write(txt) f.write('//\n') return RNAalignment(tf.name) def fetch_stokholm(rfam_acc, dpath=None): """Fetch Stokholm file from Rfam. :param rfam_acc: str, Rfam accession number, eg. RF00028 :param dpath: str or None, if None saves to current location, otherwise save to dpath folder .. warning :: urllib3 is required, pip install urllib3 """ import urllib3 http = urllib3.PoolManager() # try: print(dpath) response = http.request('GET', url='http://rfam.xfam.org/family/' + rfam_acc.lower() + '/alignment?acc=' + rfam_acc.lower() + '&format=stockholm&download=1') # except urllib3.HTTPError: # raise Exception('The PDB does not exists: ' + pdb_id) txt = response.data if dpath is None: npath = rfam_acc + '.stk' else: npath = dpath + os.sep + rfam_acc + '.stk' print('downloading...' + npath) with open(npath, 'wb') as f: f.write(txt) print('ok') return rfam_acc + '.stk' # def test_seq_multilines_alignment() def test_alignment_with_pk(): a = RNAalignment('test_data/RF00167.stockholm.sto') print(a.tail()) print(a.ss_cons) print(a.ss_cons_pk) print(a.ss_cons_with_pk) print(a[[1, 2]]) # main if __name__ == '__main__': ## a = RNAalignment('test_data/RF00167.stockholm.sto') # print a.get_shift_seq_in_align() # print a.map_seq_on_align('CP000721.1/2204691-2204778', [5,6,8]) # print a.map_seq_on_seq('AAML04000013.1/228868-228953', 'CP000721.1/2204691-2204778', [4,5,6]) # print(record.letter_annotations['secondary_structure']) ## seq = a.get_seq('AAML04000013.1/228868-228953') # print seq.seq # print seq.ss # print a.ss_cons # print 'x' # for s in a.io: # print s.seq # print s.ss_clean #letter_annotations['secondary_structure'] # for s in a.io: # print s.seq_nogaps # print s.ss_nogaps # a.write('test_output/out.sto') # a = RNAalignment("/home/magnus/work/rna-evo/rp12/seq/RF00379.stockholm.stk")##_rm85.stk') # print a.get_seq_ss("AL939120.1/174742-174619") # subset = a.subset(["AL939120.1/174742-174619", # "rp12bsubtilis", # "AAWL01000001.1/57092-56953", # "CP000612.1/87012-87130", # "BA000028.3/1706087-1706245"]) # % cat /var/folders/yc/ssr9692s5fzf7k165grnhpk80000gp/T/tmpTzPenx # for s in subset: # print s.seq # subset.remove_empty_columns() # subset.write('/home/magnus/out2.stk') # for s in subset: # print s.seq # print 'subset.ss_cons_std:', subset.ss_cons_std # a = RNAalignment("/home/magnus/work/rna-evo/rp12/seq/RF00379.stockholm.stk")##_rm85.stk') # a.ss_cons = "::::::::::{{{{,,,<<<_..." # print a.ss_cons # print a.ss_cons_std # print get_rfam_ss_notat_to_dot_bracket_notat(subset.ss_cons) ## a.ss_cons_std = 'test of setter' # print a._ss_cons_std # pass # slice ## slices = a.cols[0:10] # for s in a: # print s.seq # add pk # for s in slices: # print s, s.seq, s.ss #a = fasta2stokholm('test_output/ade_gapped.fa') # print a #a = RNAalignment('test_data/RF00001.blocked.stk') # print a # print a.ss_cons # a.plot('rchie.png') # a = RNAalignment("test_data/RF02221.stockholm.sto") ## a = RNAalignment('test_data/test_data/RF02221.stockholm.sto') ## a + RNASeq('ConSeq', '-A-GU-AGAGUA-GGUCUUAUACGUAA-----------------AGUG-UCAUCGGA-U-GGGGAGACUUCCGGUGAACGAA-G-G-----------------------------GUUA---------------------------CCGCGUUAUAUGAC-C-GCUUCCG-CUA-C-U-','') ## dist = a.get_distances() # distConSeq = dist['ConSeq'][:-1] # to remove ending 0, bc of distance to itself ## minimal = min(distConSeq) ## index = dist['ConSeq'].index(minimal) # print(dist.names[index]) a = RNAalignment("test_data/dist_test2.stk") rep = a.get_the_closest_seq_to_ref_seq() rep.remove_gaps() print(rep) print(rep.ss) print(rep.seq) # a.write('tmp.stk') # s.remove_gaps() # print(s.seq) # print(s.ss) import doctest doctest.testmod()