diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index ca8e533..c2133ed 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -65,7 +65,7 @@ class MapWorkloadTask(BaseTask): # pylint: disable=abstract-method new_res = None # Replace result with formatted result - if not args[0][0]['bad']: + if not args[0][0]['bad'] and args[0][0]['mapped_workload'] is not None: new_res = { 'scores': sorted(args[0][0]['scores'].items()), 'mapped_workload_id': args[0][0]['mapped_workload'], @@ -469,34 +469,40 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n def combine_workload(target_data): - # Load mapped workload data - mapped_workload_id = target_data['mapped_workload'][0] - - latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run']) - mapped_workload = Workload.objects.get(pk=mapped_workload_id) - workload_knob_data = PipelineData.objects.get( - pipeline_run=latest_pipeline_run, - workload=mapped_workload, - task_type=PipelineTaskType.KNOB_DATA) - workload_knob_data = JSONUtil.loads(workload_knob_data.data) - workload_metric_data = PipelineData.objects.get( - pipeline_run=latest_pipeline_run, - workload=mapped_workload, - task_type=PipelineTaskType.METRIC_DATA) - workload_metric_data = JSONUtil.loads(workload_metric_data.data) - newest_result = Result.objects.get(pk=target_data['newest_result_id']) + latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run']) session = newest_result.session params = JSONUtil.loads(session.hyperparameters) - cleaned_workload_knob_data = clean_knob_data(workload_knob_data["data"], - workload_knob_data["columnlabels"], - newest_result.session) - X_workload = np.array(cleaned_workload_knob_data[0]) - X_columnlabels = np.array(cleaned_workload_knob_data[1]) - y_workload = np.array(workload_metric_data['data']) - y_columnlabels = np.array(workload_metric_data['columnlabels']) - rowlabels_workload = np.array(workload_metric_data['rowlabels']) + # Load mapped workload data + if target_data['mapped_workload'] is not None: + mapped_workload_id = target_data['mapped_workload'][0] + mapped_workload = Workload.objects.get(pk=mapped_workload_id) + workload_knob_data = PipelineData.objects.get( + pipeline_run=latest_pipeline_run, + workload=mapped_workload, + task_type=PipelineTaskType.KNOB_DATA) + workload_knob_data = JSONUtil.loads(workload_knob_data.data) + workload_metric_data = PipelineData.objects.get( + pipeline_run=latest_pipeline_run, + workload=mapped_workload, + task_type=PipelineTaskType.METRIC_DATA) + workload_metric_data = JSONUtil.loads(workload_metric_data.data) + cleaned_workload_knob_data = clean_knob_data(workload_knob_data["data"], + workload_knob_data["columnlabels"], + newest_result.session) + X_workload = np.array(cleaned_workload_knob_data[0]) + X_columnlabels = np.array(cleaned_workload_knob_data[1]) + y_workload = np.array(workload_metric_data['data']) + y_columnlabels = np.array(workload_metric_data['columnlabels']) + rowlabels_workload = np.array(workload_metric_data['rowlabels']) + else: + # combine the target_data with itself is actually adding nothing to the target_data + X_workload = np.array(target_data['X_matrix']) + X_columnlabels = np.array(target_data['X_columnlabels']) + y_workload = np.array(target_data['y_matrix']) + y_columnlabels = np.array(target_data['y_columnlabels']) + rowlabels_workload = np.array(target_data['rowlabels']) # Target workload data newest_result = Result.objects.get(pk=target_data['newest_result_id']) @@ -513,16 +519,17 @@ def combine_workload(target_data): 'identical y columnlabels (sorted metric names)'), y_columnlabels, target_data['y_columnlabels']) - # Filter Xs by top 10 ranked knobs - ranked_knobs = PipelineData.objects.get( - pipeline_run=latest_pipeline_run, - workload=mapped_workload, - task_type=PipelineTaskType.RANKED_KNOBS) - ranked_knobs = JSONUtil.loads(ranked_knobs.data)[:params['IMPORTANT_KNOB_NUMBER']] - ranked_knob_idxs = [i for i, cl in enumerate(X_columnlabels) if cl in ranked_knobs] - X_workload = X_workload[:, ranked_knob_idxs] - X_target = X_target[:, ranked_knob_idxs] - X_columnlabels = X_columnlabels[ranked_knob_idxs] + if target_data['mapped_workload'] is not None: + # Filter Xs by top 'IMPORTANT_KNOB_NUMBER' ranked knobs + ranked_knobs = PipelineData.objects.get( + pipeline_run=latest_pipeline_run, + workload=mapped_workload, + task_type=PipelineTaskType.RANKED_KNOBS) + ranked_knobs = JSONUtil.loads(ranked_knobs.data)[:params['IMPORTANT_KNOB_NUMBER']] + ranked_knob_idxs = [i for i, cl in enumerate(X_columnlabels) if cl in ranked_knobs] + X_workload = X_workload[:, ranked_knob_idxs] + X_target = X_target[:, ranked_knob_idxs] + X_columnlabels = X_columnlabels[ranked_knob_idxs] # Filter ys by current target objective metric target_objective = newest_result.session.target_objective @@ -562,7 +569,7 @@ def combine_workload(target_data): # Dummy encode categorial variables if ENABLE_DUMMY_ENCODER: categorical_info = DataUtil.dummy_encoder_helper(X_columnlabels, - mapped_workload.dbms) + newest_result.dbms) dummy_encoder = DummyEncoder(categorical_info['n_values'], categorical_info['categorical_features'], categorical_info['cat_columnlabels'], @@ -829,10 +836,16 @@ def map_workload(map_workload_input): ranked_knob_idxs = None pruned_metric_idxs = None - # Compute workload mapping data for each unique workload unique_workloads = pipeline_data.values_list('workload', flat=True).distinct() - assert len(unique_workloads) > 0 + if unique_workloads == 0: + # The background task that aggregates the data has not finished running yet + target_data.update(mapped_workload=None, scores=None) + LOG.debug('%s: Skipping workload mapping because there is no workload.\n', + AlgorithmType.name(algorithm)) + return target_data, algorithm + workload_data = {} + # Compute workload mapping data for each unique workload for unique_workload in unique_workloads: workload_obj = Workload.objects.get(pk=unique_workload)