diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 81ea93e..3f00656 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -25,7 +25,8 @@ from analysis.gpr import ucb from analysis.gpr.optimize import tf_optimize from analysis.preprocessing import Bin, DummyEncoder from analysis.constraints import ParamConstraintHelper -from website.models import PipelineData, PipelineRun, Result, Workload, KnobCatalog, SessionKnob +from website.models import (PipelineData, PipelineRun, Result, Workload, KnobCatalog, SessionKnob, + MetricCatalog) from website import db from website.types import PipelineTaskType, AlgorithmType from website.utils import DataUtil, JSONUtil @@ -153,6 +154,35 @@ def clean_knob_data(knob_matrix, knob_labels, session): return matrix, knob_labels +def clean_metric_data(metric_matrix, metric_labels, session): + # Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels + metric_objs = MetricCatalog.objects.filter(dbms=session.dbms) + metric_cat = [session.target_objective] + for metric_obj in metric_objs: + metric_cat.append(metric_obj.name) + matrix = np.array(metric_matrix) + missing_columns = sorted(set(metric_cat) - set(metric_labels)) + unused_columns = set(metric_labels) - set(metric_cat) + LOG.debug("clean_metric_data added %d metrics and removed %d metric.", len(missing_columns), + len(unused_columns)) + # If columns are missing from the matrix + if missing_columns: + for metric in missing_columns: + index = metric_cat.index(metric) + default_val = 0 + matrix = np.insert(matrix, index, default_val, axis=1) + metric_labels.insert(index, metric) + LOG.debug(matrix.shape) + # If they are useless columns in the matrix + if unused_columns: + indexes = [i for i, n in enumerate(metric_labels) if n in unused_columns] + # Delete unused columns + matrix = np.delete(matrix, indexes, 1) + for i in sorted(indexes, reverse=True): + del metric_labels[i] + return matrix, metric_labels + + @task(base=AggregateTargetResults, name='aggregate_target_results') def aggregate_target_results(result_id, algorithm): # Check that we've completed the background tasks at least once. We need @@ -310,9 +340,17 @@ def train_ddpg(result_id): prev_result = Result.objects.filter(pk=prev_result_id) agg_data = DataUtil.aggregate_data(result) - metric_data = agg_data['y_matrix'].flatten() base_metric_data = (DataUtil.aggregate_data(base_result))['y_matrix'].flatten() prev_metric_data = (DataUtil.aggregate_data(prev_result))['y_matrix'].flatten() + + result = Result.objects.get(pk=result_id) + target_objective = result.session.target_objective + prev_obj_idx = [i for i, n in enumerate(agg_data['y_columnlabels']) if n == target_objective] + + # Clean metric data + metric_data, metric_labels = clean_metric_data(agg_data['y_matrix'], + agg_data['y_columnlabels'], session) + metric_data = metric_data.flatten() metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1)) normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0] @@ -327,9 +365,7 @@ def train_ddpg(result_id): LOG.info('knob_num: %d, metric_num: %d', knob_num, metric_num) # Filter ys by current target objective metric - result = Result.objects.get(pk=result_id) - target_objective = result.session.target_objective - target_obj_idx = [i for i, n in enumerate(agg_data['y_columnlabels']) if n == target_objective] + target_obj_idx = [i for i, n in enumerate(metric_labels) if n == target_objective] if len(target_obj_idx) == 0: raise Exception(('Could not find target objective in metrics ' '(target_obj={})').format(target_objective)) @@ -338,8 +374,8 @@ def train_ddpg(result_id): 'metrics (target_obj={})').format(len(target_obj_idx), target_objective)) objective = metric_data[target_obj_idx] - base_objective = base_metric_data[target_obj_idx] - prev_objective = prev_metric_data[target_obj_idx] + base_objective = base_metric_data[prev_obj_idx] + prev_objective = prev_metric_data[prev_obj_idx] metric_meta = db.target_objectives.get_metric_metadata( result.session.dbms.pk, result.session.target_objective) @@ -390,7 +426,8 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n result = Result.objects.filter(pk=result_id) session = Result.objects.get(pk=result_id).session agg_data = DataUtil.aggregate_data(result) - metric_data = agg_data['y_matrix'].flatten() + metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session) + metric_data = metric_data.flatten() metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1)) normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0] cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'],