Updated handling of celery task status in views

This commit is contained in:
dvanaken 2020-02-06 14:25:57 -05:00 committed by Dana Van Aken
parent d9e2806b9e
commit ec9066e5b2
1 changed files with 74 additions and 35 deletions

View File

@ -12,6 +12,8 @@ import shutil
from collections import OrderedDict from collections import OrderedDict
from io import StringIO from io import StringIO
import celery
from celery import chain, signature, uuid
from django.contrib.auth import authenticate, login, logout from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.decorators import login_required from django.contrib.auth.decorators import login_required
from django.contrib.auth import update_session_auth_hash from django.contrib.auth import update_session_auth_hash
@ -466,7 +468,6 @@ def new_result(request):
def handle_result_files(session, files, execution_times=None): def handle_result_files(session, files, execution_times=None):
from celery import chain
# Combine into contiguous files # Combine into contiguous files
files = {k: b''.join(v.chunks()).decode() for k, v in list(files.items())} files = {k: b''.join(v.chunks()).decode() for k, v in list(files.items())}
@ -637,26 +638,50 @@ def handle_result_files(session, files, execution_times=None):
result_id = result.pk result_id = result.pk
response = None response = None
if session.algorithm == AlgorithmType.GPR: if session.algorithm == AlgorithmType.GPR:
response = chain(aggregate_target_results.s(result.pk, session.algorithm), subtask_list = [
map_workload.s(), ('aggregate_target_results', (result_id, session.algorithm)),
configuration_recommendation.s()).apply_async() ('map_workload', ()),
('configuration_recommendation', ()),
]
#response = chain(aggregate_target_results.s(result.pk, session.algorithm),
# map_workload.s(),
# configuration_recommendation.s()).apply_async()
elif session.algorithm == AlgorithmType.DDPG: elif session.algorithm == AlgorithmType.DDPG:
response = chain(train_ddpg.s(result.pk), subtask_list = [
configuration_recommendation_ddpg.s()).apply_async() ('train_ddpg', (result_id,)),
('configuration_recommendation_ddpg', ()),
]
#response = chain(train_ddpg.s(result.pk),
# configuration_recommendation_ddpg.s()).apply_async()
elif session.algorithm == AlgorithmType.DNN: elif session.algorithm == AlgorithmType.DNN:
response = chain(aggregate_target_results.s(result.pk, session.algorithm), subtask_list = [
map_workload.s(), ('aggregate_target_results', (result_id, session.algorithm)),
configuration_recommendation.s()).apply_async() ('map_workload', ()),
('configuration_recommendation', ()),
]
#response = chain(aggregate_target_results.s(result.pk, session.algorithm),
# map_workload.s(),
# configuration_recommendation.s()).apply_async()
taskmeta_ids = [] subtasks = []
current_task = response for name, args in subtask_list:
while current_task: task_id = '{}-{}'.format(name, uuid())
taskmeta_ids.insert(0, current_task.id) s = signature(name, args=args, options={'task_id': task_id})
current_task = current_task.parent subtasks.append(s)
result.task_ids = ','.join(taskmeta_ids) response = chain(*subtasks).apply_async()
result.task_ids = JSONUtil.dumps(response.as_tuple())
result.save() result.save()
#taskmeta_ids = []
#current_task = response
#while current_task:
# taskmeta_ids.insert(0, current_task.id)
# current_task = current_task.parent
#result.task_ids = ','.join(taskmeta_ids)
#result.save()
if execution_times: if execution_times:
try: try:
batch = [] batch = []
@ -1137,31 +1162,45 @@ def give_result(request, upload_code): # pylint: disable=unused-argument
return HttpResponse("Invalid upload code: " + upload_code, status=400) return HttpResponse("Invalid upload code: " + upload_code, status=400)
latest_result = Result.objects.filter(session=session).latest('creation_time') latest_result = Result.objects.filter(session=session).latest('creation_time')
task_ids = [t for t in (latest_result.task_ids or '').split(',') if t.strip() != ''] task_tuple = JSONUtil.loads(latest_result.task_ids)
tasks = TaskUtil.get_tasks(task_ids) task_res = celery.result.result_from_tuple(task_tuple)
overall_status, num_completed = TaskUtil.get_task_status(tasks, len(task_ids))
task_list = []
task = task_res
while task is not None:
task_list.append(task)
task = task.parent
group_res = celery.result.GroupResult(task_res.task_id, results=task_list)
next_config = latest_result.next_configuration next_config = latest_result.next_configuration
LOG.debug("result_id: %s, overall_status: %s, tasks_completed: %s/%s, " LOG.debug("result_id: %s, succeeded: %s, failed: %s, ready: %s, tasks_completed: %s/%s, "
"next_config: %s\n", latest_result.pk, overall_status, num_completed, "next_config: %s\n", latest_result.pk, group_res.successful(),
len(task_ids), next_config) group_res.failed(), group_res.ready(), group_res.completed_count(),
len(group_res), next_config)
response = dict(celery_status=overall_status, result_id=latest_result.pk, message='', errors=[]) response = dict(celery_status='', result_id=latest_result.pk, message='', errors=[])
if overall_status == 'SUCCESS':
if group_res.ready():
if group_res.failed():
errors = [t.traceback for t in task_list if t.traceback]
if errors:
LOG.warning('\n\n'.join(errors))
response.update(
celery_status='FAILURE', errors=errors,
message='Celery failed to get the next configuration')
status_code = 400
else:
assert group_res.successful()
next_config = JSONUtil.loads(next_config) next_config = JSONUtil.loads(next_config)
response.update(next_config, response.update(
next_config, celery_status='SUCCESS',
message='Celery successfully recommended the next configuration') message='Celery successfully recommended the next configuration')
status_code = 200 status_code = 200
elif overall_status in ('FAILURE', 'REVOKED', 'RETRY'): else: # One or more tasks are still waiting to execute
task_errors = [t.traceback for t in tasks if t.traceback] response.update(celery_status='PENDING', message='Result not ready')
if task_errors:
LOG.warning('\n\n'.join(task_errors))
response.update(message='Celery failed to get the next configuration', errors=task_errors)
status_code = 400
else: # overall_status in ('PENDING', 'RECEIVED', 'STARTED'):
response.update(message='Result not ready')
status_code = 202 status_code = 202
return HttpResponse(JSONUtil.dumps(response, pprint=True), status=status_code, return HttpResponse(JSONUtil.dumps(response, pprint=True), status=status_code,