diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 737ebe6..2aa76ab 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -109,19 +109,9 @@ class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-metho def on_success(self, retval, task_id, args, kwargs): super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs) - result_id = retval['result_id'] - result = Result.objects.get(pk=result_id) - - # Replace result with formatted result - formatted_params = db.parser.format_dbms_knobs(result.dbms.pk, retval['recommendation']) - # Create next configuration to try - config = db.parser.create_knob_configuration(result.dbms.pk, formatted_params) task_meta = TaskMeta.objects.get(task_id=task_id) - retval['recommendation'] = config task_meta.result = retval task_meta.save() - result.next_configuration = JSONUtil.dumps(retval) - result.save() def clean_knob_data(knob_matrix, knob_labels, session): @@ -419,13 +409,31 @@ def train_ddpg(result_id): return result_info +def create_and_save_recommendation(recommended_knobs, result, status, **kwargs): + dbms_id = result.dbms.pk + formatted_knobs = db.parser.format_dbms_knobs(dbms_id, recommended_knobs) + config = db.parser.create_knob_configuration(dbms_id, formatted_knobs) + + retval = dict(**kwargs) + retval.update( + status=status, + result_id=result.pk, + recommendation=config, + ) + result.next_configuration = JSONUtil.dumps(retval) + result.save() + + return retval + + @task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg') def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name LOG.info('Use ddpg to recommend configuration') result_id = result_info['newest_result_id'] - result = Result.objects.filter(pk=result_id) - session = Result.objects.get(pk=result_id).session - agg_data = DataUtil.aggregate_data(result) + result_list = Result.objects.filter(pk=result_id) + result = result_list.first() + session = result.session + agg_data = DataUtil.aggregate_data(result_list) metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session) metric_data = metric_data.flatten() metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1)) @@ -447,11 +455,10 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n knob_bounds = np.vstack(DataUtil.get_knob_bounds(knob_labels, session)) knob_data = MinMaxScaler().fit(knob_bounds).inverse_transform(knob_data.reshape(1, -1))[0] conf_map = {k: knob_data[i] for i, k in enumerate(knob_labels)} - conf_map_res = {} - conf_map_res['status'] = 'good' - conf_map_res['result_id'] = result_id - conf_map_res['recommendation'] = conf_map - conf_map_res['info'] = 'INFO: ddpg' + + conf_map_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result, + status='good', info='INFO: ddpg') + return conf_map_res @@ -638,20 +645,18 @@ def combine_workload(target_data): def configuration_recommendation(recommendation_input): target_data, algorithm = recommendation_input LOG.info('configuration_recommendation called') + newest_result = Result.objects.get(pk=target_data['newest_result_id']) if target_data['bad'] is True: - target_data_res = dict( - status='bad', - result_id=target_data['newest_result_id'], - info='WARNING: no training data, the config is generated randomly', - recommendation=target_data['config_recommend'], + target_data_res = create_and_save_recommendation( + recommended_knobs=target_data['config_recommend'], result=newest_result, + status='bad', info='WARNING: no training data, the config is generated randomly', pipeline_run=target_data['pipeline_run']) LOG.debug('%s: Skipping configuration recommendation.\n\ndata=%s\n', AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) return target_data_res latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run']) - newest_result = Result.objects.get(pk=target_data['newest_result_id']) X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min = combine_workload(target_data) @@ -756,12 +761,10 @@ def configuration_recommendation(recommendation_input): best_config = np.maximum(best_config, X_min_inv) conf_map = {k: best_config[i] for i, k in enumerate(X_columnlabels)} - conf_map_res = dict( - status='good', - result_id=target_data['newest_result_id'], - recommendation=conf_map, - info='INFO: training data size is {}'.format(X_scaled.shape[0]), - pipeline_run=latest_pipeline_run.pk) + conf_map_res = create_and_save_recommendation( + recommended_knobs=conf_map, result=newest_result, + status='good', info='INFO: training data size is {}'.format(X_scaled.shape[0]), + pipeline_run=target_data['pipeline_run']) LOG.debug('%s: Finished selecting the next config.\n\ndata=%s\n', AlgorithmType.name(algorithm), JSONUtil.dumps(conf_map_res, pprint=True)) diff --git a/server/website/website/utils.py b/server/website/website/utils.py index b32fa09..eb29860 100644 --- a/server/website/website/utils.py +++ b/server/website/website/utils.py @@ -17,6 +17,7 @@ from random import choice import numpy as np from django.contrib.auth.models import User +from django.db.models import Case, When from django.utils.text import capfirst from django_db_logger.models import StatusLog from djcelery.models import TaskMeta @@ -72,34 +73,34 @@ class MediaUtil(object): class TaskUtil(object): @staticmethod - def get_tasks(tasks): - if not tasks: - return [] - task_ids = tasks.split(',') - res = [] - for task_id in task_ids: - task = TaskMeta.objects.filter(task_id=task_id) - if len(task) == 0: - continue # Task Not Finished - res.append(task[0]) - return res + def get_tasks(task_ids): + task_ids = task_ids or [] + if isinstance(task_ids, str): + task_ids = task_ids.split(',') + preserved = Case(*[ + When(task_id=task_id, then=pos) for pos, task_id in enumerate(task_ids)]) + return TaskMeta.objects.filter(task_id__in=task_ids).order_by(preserved) @staticmethod def get_task_status(tasks): - if len(tasks) == 0: + if not tasks: return None, 0 + overall_status = 'SUCCESS' num_completed = 0 for task in tasks: status = task.status if status == "SUCCESS": num_completed += 1 - elif status in ['FAILURE', 'REVOKED', 'RETRY']: + elif status in ('FAILURE', 'REVOKED', 'RETRY'): overall_status = status break else: - assert status in ['PENDING', 'RECEIVED', 'STARTED'] + if status not in ('PENDING', 'RECEIVED', 'STARTED'): + LOG.warning("Task %s: invalid task status: '%s' (task_id=%s)", + task.id, status, task.task_id) overall_status = status + return overall_status, num_completed diff --git a/server/website/website/views.py b/server/website/website/views.py index 945fff6..3026671 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -1067,14 +1067,6 @@ def get_timeline_data(request): # get the lastest result def give_result(request, upload_code): # pylint: disable=unused-argument - def _failed_response(_latest_result, _tasks, _num_completed, _status, _msg): - _msg = "{}\nSTATUS: {}\nRESULT ID: {}\n".format(_msg, _status, _latest_result) - if tasks: - _failed_task_idx = min(len(_tasks) - 1, _num_completed + 1) - _failed_task = _tasks[_failed_task_idx] - _msg += "TRACEBACK: {}".format(_failed_task.traceback) - return HttpResponse(_msg, status=400) - try: session = Session.objects.get(upload_code=upload_code) except Session.DoesNotExist: @@ -1084,39 +1076,31 @@ def give_result(request, upload_code): # pylint: disable=unused-argument latest_result = Result.objects.filter(session=session).latest('creation_time') tasks = TaskUtil.get_tasks(latest_result.task_ids) overall_status, num_completed = TaskUtil.get_task_status(tasks) + response = { + 'celery_status': overall_status, + 'result_id': latest_result.pk, + 'message': '', + 'errors': [], + } if overall_status == 'SUCCESS': - # The task status is set to SUCCESS before the next config is saved in - # the latest result so we must wait for it to be updated - max_wait_sec = 20 - elapsed_sec = 0 - while not latest_result.next_configuration and elapsed_sec <= max_wait_sec: - time.sleep(5) - elapsed_sec += 5 - latest_result = Result.objects.get(id=latest_result.pk) - LOG.debug("Waiting for the next config for result %s to be updated... " - "(elapsed: %ss): %s", latest_result.pk, elapsed_sec, - model_to_dict(latest_result)) - - if not latest_result.next_configuration: - LOG.warning( - "Failed to get the next configuration from the latest result after %ss: %s", - elapsed_sec, model_to_dict(latest_result)) - overall_status = 'FAILURE' - response = _failed_response(latest_result, tasks, num_completed, overall_status, - 'Failed to get the next configuration.') - else: - response = HttpResponse(JSONUtil.dumps(latest_result.next_configuration), - content_type='application/json') + response.update(JSONUtil.loads(latest_result.next_configuration), + message='Celery successfully recommended the next configuration') + status_code = 200 elif overall_status in ('FAILURE', 'REVOKED', 'RETRY'): - response = _failed_response(latest_result, tasks, num_completed, overall_status, - 'Celery failed to get the next configuration.') + task_errors = [t.traceback for t in tasks if t.traceback] + if task_errors: + LOG.warning('\n\n'.join(task_errors)) + response.update(message='Celery failed to get the next configuration', errors=task_errors) + status_code = 400 else: # overall_status in ('PENDING', 'RECEIVED', 'STARTED'): - response = HttpResponse("{}: Result not ready".format(overall_status), status=202) + response.update(message='Result not ready') + status_code = 202 - return response + return HttpResponse(JSONUtil.dumps(response, pprint=True), status=status_code, + content_type='application/json') # get the lastest result