-
Notifications
You must be signed in to change notification settings - Fork 0
/
remove_gappy_columns.py
executable file
·163 lines (141 loc) · 5.62 KB
/
remove_gappy_columns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/python
import argparse
class Sequence(object):
def __init__(self, sequence, name = '', info = '', **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
self.sequence = sequence
self.name = name
def __len__(self):
return len(self.sequence)
def __str__(self):
out = "%s: %s" % (self.name, self.sequence)
return out
def __contains__(self, item):
return item in self.sequence
def __getitem__(self, ndx):
return self.sequence[ndx]
def __eq__(self, other):
if isinstance(other, Sequence):
return self.sequence == other.sequence
return False
class Alignment(object):
def __init__(self, sequences):
self.seqs = [s for s in sequences]
self.seqs_dict = dict([(s.name, s) for s in self.seqs])
self.alignlen = len(sequences[0])
def __len__(self):
return len(self.seqs)
def __getitem__(self, ndx):
return self.seqs[ndx]
def __contains__(self, item):
return item.sequence in [s.sequence for s in self.seqs]
def __str__(self):
output = ""
for seq in self.seqs:
output = "%s\t%s\n" % (seq.name, seq.sequence)
return output
def get_sequence(self, seq_name):
try:
return self.seqs_dict[seq_name]
except KeyError:
raise KeyError("Sequence %s was not found in the alignment" % seq_name)
def write_clustal_file(self, filename):
"""
Save a Alignment in CLUSTAL format.
"""
symbolsPerLine = 60
max_name_length = max(len(seq.name) for seq in self.seqs)
namelen = 0
string = ''
for seq in self.seqs:
namelen = max(len(seq.name), namelen)
wholeRows = self.alignlen / symbolsPerLine
for i in range(wholeRows):
for j in range(len(self.seqs)):
string = self.seqs[j].name.ljust(max_name_length) ' '
string = self.seqs[j][i * symbolsPerLine:(i 1) * symbolsPerLine] '\n'
string = '\n'
# Possible last row
last_row_length = self.alignlen - wholeRows * symbolsPerLine
if last_row_length > 0:
for j in range(len(self.seqs)):
if max_name_length > 0:
string = self.seqs[j].name.ljust(max_name_length) ' '
string = self.seqs[j][-last_row_length:] '\n'
if filename:
fh = open(filename, 'w')
# fake header so that clustal believes it
fh.write('CLUSTAL O(1.2.0) multiple sequence alignment\n\n\n')
fh.write(string)
fh.close()
return
return string
def get_ungapped(self):
"""
Return new alignment with gappy columns removed
"""
gappy_columns = set()
for seq in self:
for i in range(self.alignlen):
if i not in gappy_columns and seq[i] == "-":
gappy_columns.add(i)
new_seqs = []
for seq in self:
content = "".join([seq[i] for i in range(self.alignlen) if i not in gappy_columns])
new_seqs.append(Sequence(sequence=content, name=seq.name))
return Alignment(new_seqs)
def get_ungapped_using_reference(self, seq_name):
"""
Return a new alignment where gappy columns have been removed using in respect to
a user specified reference sequence
:param name: Name of template sequence
:return:
"""
template_seq = self.get_sequence(seq_name)
#Find gapped column indices
gappy_columns = [i for i in xrange(len(template_seq)) if template_seq[i] == "-"]
new_seqs = []
for seq in self:
content = "".join([seq[i] for i in range(self.alignlen) if i not in gappy_columns])
new_seqs.append(Sequence(sequence=content, name=seq.name))
return Alignment(new_seqs)
def read_clustal_file(filename):
"""
Read a CLUSTAL Alignment file and return a dictionary of sequence data
"""
fh = open(filename, 'r')
names = []
seqdata = dict()
data = [line.strip('\n') for line in fh.readlines() if line is not None]
for line in data:
if line.startswith('CLUSTAL') or line.startswith('#'):
continue
if len(line) == 0:
continue
if line[0] == ' ' or '*' in line or ':' in line:
continue
sections = line.split()
name, seqstr = sections[0], "".join(sections[1:])
names.append(name)
if seqdata.has_key(name):
seqdata[name] = seqstr
else:
seqdata[name] = seqstr
sequences = [Sequence(seqstr, name=seqname) for seqname, seqstr in sorted(seqdata.items(),
key=lambda x: names.index(x[0]))]
return Alignment(sequences)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Removes gaps from a clustal \
formmated sequence alignment file')
parser.add_argument('-i', '--input', help='Input alignment file', required=True)
parser.add_argument('-o', '--output', help='Output file', required=False, default="./")
parser.add_argument('-r', '--reference', help='Reference sequence name', required=False)
args = parser.parse_args()
the_aln = read_clustal_file(args.input)
if args.reference:
ref_name = args.reference
new_aln = the_aln.get_ungapped_using_reference(ref_name)
else:
new_aln = the_aln.get_ungapped()
new_aln.write_clustal_file(args.output)