From ec9066e5b2bdadc4f1aa1dfd075303dbc3155705 Mon Sep 17 00:00:00 2001 From: dvanaken Date: Thu, 6 Feb 2020 14:25:57 -0500 Subject: [PATCH] Updated handling of celery task status in views --- server/website/website/views.py | 109 ++++++++++++++++++++++---------- 1 file changed, 74 insertions(+), 35 deletions(-) diff --git a/server/website/website/views.py b/server/website/website/views.py index 18d74b0..65c5f2a 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -12,6 +12,8 @@ import shutil from collections import OrderedDict from io import StringIO +import celery +from celery import chain, signature, uuid from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required from django.contrib.auth import update_session_auth_hash @@ -466,7 +468,6 @@ def new_result(request): def handle_result_files(session, files, execution_times=None): - from celery import chain # Combine into contiguous files files = {k: b''.join(v.chunks()).decode() for k, v in list(files.items())} @@ -637,26 +638,50 @@ def handle_result_files(session, files, execution_times=None): result_id = result.pk response = None if session.algorithm == AlgorithmType.GPR: - response = chain(aggregate_target_results.s(result.pk, session.algorithm), - map_workload.s(), - configuration_recommendation.s()).apply_async() + subtask_list = [ + ('aggregate_target_results', (result_id, session.algorithm)), + ('map_workload', ()), + ('configuration_recommendation', ()), + ] + #response = chain(aggregate_target_results.s(result.pk, session.algorithm), + # map_workload.s(), + # configuration_recommendation.s()).apply_async() elif session.algorithm == AlgorithmType.DDPG: - response = chain(train_ddpg.s(result.pk), - configuration_recommendation_ddpg.s()).apply_async() + subtask_list = [ + ('train_ddpg', (result_id,)), + ('configuration_recommendation_ddpg', ()), + ] + #response = chain(train_ddpg.s(result.pk), + # configuration_recommendation_ddpg.s()).apply_async() elif session.algorithm == AlgorithmType.DNN: - response = chain(aggregate_target_results.s(result.pk, session.algorithm), - map_workload.s(), - configuration_recommendation.s()).apply_async() + subtask_list = [ + ('aggregate_target_results', (result_id, session.algorithm)), + ('map_workload', ()), + ('configuration_recommendation', ()), + ] + #response = chain(aggregate_target_results.s(result.pk, session.algorithm), + # map_workload.s(), + # configuration_recommendation.s()).apply_async() - taskmeta_ids = [] - current_task = response - while current_task: - taskmeta_ids.insert(0, current_task.id) - current_task = current_task.parent + subtasks = [] + for name, args in subtask_list: + task_id = '{}-{}'.format(name, uuid()) + s = signature(name, args=args, options={'task_id': task_id}) + subtasks.append(s) - result.task_ids = ','.join(taskmeta_ids) + response = chain(*subtasks).apply_async() + result.task_ids = JSONUtil.dumps(response.as_tuple()) result.save() + #taskmeta_ids = [] + #current_task = response + #while current_task: + # taskmeta_ids.insert(0, current_task.id) + # current_task = current_task.parent + + #result.task_ids = ','.join(taskmeta_ids) + #result.save() + if execution_times: try: batch = [] @@ -1137,31 +1162,45 @@ def give_result(request, upload_code): # pylint: disable=unused-argument return HttpResponse("Invalid upload code: " + upload_code, status=400) latest_result = Result.objects.filter(session=session).latest('creation_time') - task_ids = [t for t in (latest_result.task_ids or '').split(',') if t.strip() != ''] - tasks = TaskUtil.get_tasks(task_ids) - overall_status, num_completed = TaskUtil.get_task_status(tasks, len(task_ids)) + task_tuple = JSONUtil.loads(latest_result.task_ids) + task_res = celery.result.result_from_tuple(task_tuple) + + task_list = [] + task = task_res + while task is not None: + task_list.append(task) + task = task.parent + + group_res = celery.result.GroupResult(task_res.task_id, results=task_list) next_config = latest_result.next_configuration - LOG.debug("result_id: %s, overall_status: %s, tasks_completed: %s/%s, " - "next_config: %s\n", latest_result.pk, overall_status, num_completed, - len(task_ids), next_config) + LOG.debug("result_id: %s, succeeded: %s, failed: %s, ready: %s, tasks_completed: %s/%s, " + "next_config: %s\n", latest_result.pk, group_res.successful(), + group_res.failed(), group_res.ready(), group_res.completed_count(), + len(group_res), next_config) - response = dict(celery_status=overall_status, result_id=latest_result.pk, message='', errors=[]) - if overall_status == 'SUCCESS': - next_config = JSONUtil.loads(next_config) - response.update(next_config, - message='Celery successfully recommended the next configuration') - status_code = 200 + response = dict(celery_status='', result_id=latest_result.pk, message='', errors=[]) - elif overall_status in ('FAILURE', 'REVOKED', 'RETRY'): - task_errors = [t.traceback for t in tasks if t.traceback] - if task_errors: - LOG.warning('\n\n'.join(task_errors)) - response.update(message='Celery failed to get the next configuration', errors=task_errors) - status_code = 400 + if group_res.ready(): + if group_res.failed(): + errors = [t.traceback for t in task_list if t.traceback] + if errors: + LOG.warning('\n\n'.join(errors)) + response.update( + celery_status='FAILURE', errors=errors, + message='Celery failed to get the next configuration') + status_code = 400 - else: # overall_status in ('PENDING', 'RECEIVED', 'STARTED'): - response.update(message='Result not ready') + else: + assert group_res.successful() + next_config = JSONUtil.loads(next_config) + response.update( + next_config, celery_status='SUCCESS', + message='Celery successfully recommended the next configuration') + status_code = 200 + + else: # One or more tasks are still waiting to execute + response.update(celery_status='PENDING', message='Result not ready') status_code = 202 return HttpResponse(JSONUtil.dumps(response, pprint=True), status=status_code,