-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathparse_acnums.py
150 lines (128 loc) · 4.99 KB
/
parse_acnums.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
former_str = 'EPI_ISL_'
middle_str = '-'
sep = ', '
def num_pairs_to_str(nums_list):
def num_pair_to_str(num1, num2):
if num1 == num2:
return former_str + str(num1)
else:
return former_str + str(num1) + middle_str + str(num2)
ac_nums_str = ''
for i in range(int(len(nums_list) / 2)):
ac_nums_str += num_pair_to_str(nums_list[2 * i], nums_list[2 * i + 1])
ac_nums_str += sep
ac_nums_str = ac_nums_str.rstrip(sep)
return ac_nums_str
def str_to_num_pair(string):
if middle_str in string:
new_list = []
for i in string.split(middle_str):
new_list.append(int(i))
return new_list
else:
return [int(string), int(string)]
def transform_to_nums(ac_str, default_num=5000):
if ',' not in ac_str:
# 应对csv文件的内容
ac_str = ac_str.replace(former_str, '')
single_nums = ac_str.split('\n')
# print(single_nums)
new_nums = []
for item in single_nums:
if item:
new_nums.append(item)
new_nums = [int(new_nums[int(i/2)]) for i in range(len(new_nums)*2)]
return new_nums
ac_str = ac_str.replace('\n', '')
ac_str = ac_str.replace('\r', '')
ac_str = ac_str.replace(' ', '')
ac_str = ac_str.replace(former_str, '')
ac_str = ac_str.rstrip(',')
ac_list = ac_str.split(',')
ac_list2 = []
for item in ac_list:
num_pair = str_to_num_pair(item)
sub = num_pair[1] - num_pair[0] + 1
if sub > default_num:
for i in range(int(sub / default_num)):
ac_list2 += [num_pair[0] + default_num * i, num_pair[0] + default_num * (i + 1) - 1]
else:
ac_list2 += [num_pair[1] - sub % default_num + 1, num_pair[1]]
else:
ac_list2 += num_pair
return ac_list2
class AcNumAnalysis:
ac_nums = []
full_length = 0
def __init__(self):
self.default_num = 5000
self.default_sep = 0
def refresh(self, ac):
if not ac:
self.ac_nums = []
elif type(ac) is str:
from_file = ',' not in ac
self.ac_nums = transform_to_nums(ac)
if from_file:
self.zip_ac_nums()
elif type(ac) is list:
self.ac_nums = ac
self.full_length = self.get_length()
def analysis(self):
count = 0
nums_to_remove = []
if self.ac_nums[1] - self.ac_nums[0] >= self.default_num:
to_return = [self.ac_nums[0], self.ac_nums[0] + self.default_num - 1]
self.ac_nums[0] = self.ac_nums[0] + self.default_num
print('\rOutput {} Accession Nums'.format(str(self.default_num)), end='')
return num_pairs_to_str(to_return), self.default_num
for i in range(int(len(self.ac_nums) / 2)):
new_num_pair = [self.ac_nums[2 * i], self.ac_nums[2 * i + 1]]
sub = new_num_pair[1] - new_num_pair[0] + 1
if count + sub <= self.default_num:
count += sub
nums_to_remove += new_num_pair
# print('sub:{2}={1}-{0}'.format(new_num_pair[0], new_num_pair[1], sub))
if count >= self.default_num - self.default_sep:
break
print('\rOutput {} Accession Nums'.format(count), end='')
str_list = num_pairs_to_str(nums_to_remove)
for num in nums_to_remove:
self.ac_nums.remove(num)
return str_list, count
def get(self):
while self.ac_nums:
yield self.analysis()
def get_whole_list(self):
ans_list = []
while self.ac_nums:
ans_list.append(self.analysis()[0])
return ans_list
def get_length(self):
length = 0
for i in range(int(len(self.ac_nums) / 2)):
length += (self.ac_nums[2 * i + 1] - self.ac_nums[2 * i] + 1)
return length
def zip_ac_nums(self):
new_ac_nums = []
current_num_pair = []
for i in range(int(len(self.ac_nums)/2)):
if not current_num_pair:
current_num_pair = [self.ac_nums[2*i], self.ac_nums[2*i+1]]
else:
count = self.ac_nums[2*i+1] - current_num_pair[0] +1
if self.ac_nums[2*i] == current_num_pair[1] + 1 and count <= self.default_num:
# continues and in default numbers.
current_num_pair[1] = self.ac_nums[2*i+1]
else:
new_ac_nums += current_num_pair
current_num_pair = [self.ac_nums[2*i], self.ac_nums[2*i+1]]
new_ac_nums += current_num_pair
self.ac_nums = new_ac_nums
if __name__ == '__main__':
a = AcNumAnalysis()
with open('GISAID_hcov-19_ids_2022_07_20_12_50.csv') as f:
a.refresh(f.read())
w = a.get_whole_list()
print(w[0])
# print(len(w[0]))