-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
187 lines (149 loc) · 6.56 KB
/
app.py
File metadata and controls
187 lines (149 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
__author__ = "Junhee Yoon"
__version__ = "1.0.4"
__maintainer__ = "Junhee Yoon"
__email__ = "swiri021@gmail.com"
# Flask related libraries
from flask import Flask, request, render_template, redirect, url_for, session
from flask_bootstrap import Bootstrap
from models import InitForm, SnakeMakeForm
# Flask apps libraries
from flask_nav import Nav
from flask_nav.elements import Navbar, View
from flask_wtf.csrf import CSRFProtect
# Additional function libraries
import uuid
import os
# Custom form making
from wtforms.validators import InputRequired
from wtforms import StringField, SubmitField, BooleanField
# Celery running
import json
from celery import Celery, current_task
from celery.result import AsyncResult
from subprocess import Popen, PIPE
# Internal libraries
from libraries.handlers import yamlHandlers
app = Flask(__name__)
SECRET_KEY = os.urandom(32)
# CSRF key, please change fixed key for local testing purpose
app.config['SECRET_KEY'] = SECRET_KEY
#app.config['SECRET_KEY'] = SECRET_KEY
app.config['WTF_CSRF_TIME_LIMIT']=None
## Celery setting
app.config.update(
CELERY_BROKER_URL='redis://redis:6379/0', # Redis docker
CELERY_RESULT_BACKEND='redis://redis:6379/0'
)
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
return celery
celery = make_celery(app)
# set Bootstrap
Bootstrap(app)
# setting Navigation Bar
nav = Nav(app)
@nav.navigation()
def mynavbar():
return Navbar( 'Workflow Controller', View('Home', 'home') )
nav.init_app(app)
# setting CSRF protector
csrf = CSRFProtect()
csrf.init_app(app)
#########Route###########
# Actual function
@app.route("/", methods=['GET', 'POST'])
def home():
"""
Landing page and selecting pipeline to control
"""
#session.clear() # When it gets back, clear session data
session['_id'] = uuid.uuid4().hex # random session id for recognize diffent snakemake running
form = InitForm(request.form)
# Getting 'data' from pipeline selector and redirect to yaml creator
if form.validate_on_submit():
session['selected_pipeline'] = form.pipeline_name.data # create session data for passing value
return redirect(url_for('config_yaml_creator'))
return render_template( 'home.html', title="Snakemake controller", form=form)
@app.route("/yamlcreator", methods=['GET', 'POST'])
def config_yaml_creator():
"""
Making a form by parsing config.yaml
"""
# Frame Form, could be added default setting for snakemake commandline
val = session.get('selected_pipeline', None) # yaml path
yaml_data = yamlHandlers._parsing_yamlFile(val) # Parsing yaml data
for key, value in yaml_data.items(): # Loop with yaml keys
setattr(SnakeMakeForm, key, StringField(key, validators=[InputRequired()], render_kw={'placeholder': value})) # set key with yaml key and placehoder with value
setattr(SnakeMakeForm, 'dag', BooleanField('Draw DAG ?')) # Check box for drawing DAG
setattr(SnakeMakeForm, 'submit', SubmitField('Submit')) # Make submit button on the bottomss
form = SnakeMakeForm(request.form) # set form
if form.validate_on_submit():
result_yaml_data={} # result dictionary
# order is same as form order
for formData, yamlKeys in zip(form, yaml_data.keys()):
result_yaml_data[yamlKeys]=formData.data
print(formData, yamlKeys)
yaml_output = yamlHandlers._reform_yamlFile(val, result_yaml_data, str(session.get('_id', None))) # make result yaml file
session['yaml_output'] = yaml_output
session['dag_call'] = form.dag.data
return redirect(url_for('workflow_status'))
return render_template('config_yaml_creator.html', form=form)
@celery.task(bind=True) # For tracking state, this function has to be bind(True) and function needs 'self'
def workflow_running(self, pipeline_path, yaml_file, dag_call):
print(dag_call)
if dag_call==False:
## Snakemake call
proc = Popen(['conda', 'run', '-vv', '-n', 'pipeline_controller_base', 'snakemake', '--snakefile', pipeline_path+'Snakefile',\
'--cores', str(3), '--directory', pipeline_path, '--configfile', yaml_file], stdin=PIPE, stdout=PIPE, stderr=PIPE)
#print(" ".join(['conda', 'run', '-vv', '-n', 'pipeline_controller_base', 'snakemake', '--snakefile', pipeline_path+'Snakefile',\
# '--cores', str(3), '--directory', pipeline_path, '--configfile', yaml_file]))
else:
## DAG call
proc = Popen(['conda', 'run', '-vv', '-n', 'pipeline_controller_base', 'snakemake', '--snakefile', pipeline_path+'Snakefile', \
'--directory', pipeline_path, '--configfile', yaml_file, '--dag', '|', 'dot', '-Tsvg', '>', pipeline_path+'dag.svg' ], stdin=PIPE, stdout=PIPE, stderr=PIPE)
msg_txt = proc.communicate()[1].decode('utf8').strip()
current_task.update_state(state='PROGRESS', meta={'msg': msg_txt})
# line by line, but it is not working
# while True:
# txt = proc.stderr.readline()
# if not txt:
# break
# msg_txt = txt.decode('utf8').strip()
# print(msg_txt)
# current_task.update_state(state='PROGRESS', meta={'msg': msg_txt})
return msg_txt.replace('\n', '<br />')
@celery.task(bind=True) # For tracking state, this function has to be bind(True) and function needs 'self'
def dag_running(self):
print()
@app.route("/workflow_progress")
def workflow_progress():
jobid = request.values.get('jobid')
print(jobid)
if jobid:
job = AsyncResult(jobid, app=celery)
print(job.state)
if job.state == 'PROGRESS':
return json.dumps(dict( state=job.state, msg=str(job.info['msg']),))
elif job.state == 'SUCCESS':
if session.get('dag_call', None)==True:
return json.dumps(dict( state=job.state, msg=str(job.get()), svg='dag.svg'))
else:
return json.dumps(dict( state=job.state, msg=str(job.get()),))
elif job.state == 'FAILURE':
return json.dumps(dict( state=job.state, msg="failture",)) ## return somewhere to exit
return '{}'
@app.route("/status")
def workflow_status():
pipeline_path = session.get('selected_pipeline', None) # Pipeline path
yaml_file = session.get('yaml_output', None) # yaml file
dag_call = session.get('dag_call', None) # DAG calling or not
job = workflow_running.delay(pipeline_path, yaml_file, dag_call)
return render_template('progress.html', JOBID=job.id)
#########Route###########
if __name__ == '__main__':
app.run(host='0.0.0.0')