-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathortho_select.py
More file actions
59 lines (52 loc) · 2.29 KB
/
ortho_select.py
File metadata and controls
59 lines (52 loc) · 2.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# -*- coding: utf-8 -*-
# ortho_select
# Version: 1.0
# This is used to select orthogroup from output of orthofinder.
import argparse
import os
import time
def get_args():
parser = argparse.ArgumentParser(description="This is used to select orthogroup from output of orthofinder.")
parser.add_argument("-l", "--least", type=int, required=True, help="Least number of species concluding.")
parser.add_argument("-b", "--big", default=20, type=int, help="Orthogroup considered as a big ortho when a species have more than xx sequences. default = 20")
parser.add_argument("-f", "--file", type=str, required=True, help="The path to the orthofinder og file.")
parser.add_argument("-o", "--output", type=str, required=True, help="The output path of dir.")
args = parser.parse_args()
return args
def mkdir(outputdir):
try:
os.mkdir(os.path.join(outputdir, 'orthogroup_small'))
os.mkdir(os.path.join(outputdir, 'orthogroup_big'))
except OSError as e:
print(f"Error creating directories: {e}")
print("Please check the outputdir.")
def select_seq(seq, outputdir, least, big):
species_seq = {}
with open(seq, "r") as s:
for line in s:
if line.startswith(">"):
species = line.split("_")[0][1:]
if species in species_seq:
species_seq[species] += 1
else:
species_seq[species] = 1
if len(species_seq) < least:
print(f"{seq}'s num of species is less than {least} and so abandoned.")
else:
if all(i <= big for i in species_seq.values()):
os.system(f"cp {seq} {os.path.join(outputdir, 'orthogroup_small')}/")
else:
os.system(f"cp {seq} {os.path.join(outputdir, 'orthogroup_big')}/")
def run_select(inputdir, outputdir, least, big):
inputdir = os.path.abspath(inputdir)
for i in os.listdir(inputdir):
select_seq(os.path.join(inputdir, i), outputdir, least, big)
def main():
args = get_args()
outputdir = os.path.abspath(args.output)
mkdir(outputdir)
run_select(args.file, outputdir, args.least, args.big)
if __name__ == "__main__":
t0 = time.time()
main()
print(f'Total time used: {time.time() - t0}s\nFinished!')