From 0bf2da738feb3df14777bd259a0cc5f329479193 Mon Sep 17 00:00:00 2001 From: yangdsh Date: Thu, 16 Apr 2020 01:34:27 +0000 Subject: [PATCH] do not allow mapping to the same workload; remove important_knob_num --- server/website/website/tasks/async_tasks.py | 130 ++++-------------- .../website/website/tasks/periodic_tasks.py | 9 +- server/website/website/utils.py | 64 +++++++++ 3 files changed, 98 insertions(+), 105 deletions(-) diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 9d2a11e..f319a8c 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -89,63 +89,6 @@ class ConfigurationRecommendation(BaseTask): # pylint: disable=abstract-method task_meta.save() -def clean_knob_data(knob_matrix, knob_labels, session): - # Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session - knob_matrix = np.array(knob_matrix) - session_knobs = SessionKnob.objects.get_knobs_for_session(session) - knob_cat = [k['name'] for k in session_knobs] - - if knob_cat == knob_labels: - # Nothing to do! - return knob_matrix, knob_labels - - LOG.info("session_knobs: %s, knob_labels: %s, missing: %s, extra: %s", len(knob_cat), - len(knob_labels), len(set(knob_cat) - set(knob_labels)), - len(set(knob_labels) - set(knob_cat))) - - nrows = knob_matrix.shape[0] # pylint: disable=unsubscriptable-object - new_labels = [] - new_columns = [] - - for knob in session_knobs: - knob_name = knob['name'] - if knob_name not in knob_labels: - # Add missing column initialized to knob's default value - default_val = knob['default'] - try: - if knob['vartype'] == VarType.ENUM: - default_val = knob['enumvals'].split(',').index(default_val) - elif knob['vartype'] == VarType.BOOL: - default_val = str(default_val).lower() in ("on", "true", "yes", "0") - else: - default_val = float(default_val) - except ValueError: - LOG.warning("Error parsing knob '%s' default value: %s. Setting default to 0.", - knob_name, default_val, exc_info=True) - default_val = 0 - new_col = np.ones((nrows, 1), dtype=float) * default_val - new_lab = knob_name - else: - index = knob_labels.index(knob_name) - new_col = knob_matrix[:, index].reshape(-1, 1) - new_lab = knob_labels[index] - - new_labels.append(new_lab) - new_columns.append(new_col) - - new_matrix = np.hstack(new_columns).reshape(nrows, -1) - LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, len(new_labels), new_labels) - - assert new_labels == knob_cat, \ - "Expected knobs: {}\nActual knobs: {}\n".format( - knob_cat, new_labels) - assert new_matrix.shape == (nrows, len(knob_cat)), \ - "Expected shape: {}, Actual shape: {}".format( - (nrows, len(knob_cat)), new_matrix.shape) - - return new_matrix, new_labels - - def clean_metric_data(metric_matrix, metric_labels, session): # Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels metric_objs = MetricCatalog.objects.filter(dbms=session.dbms) @@ -421,7 +364,8 @@ def aggregate_target_results(aggregate_target_results_input): agg_data['status'] = 'good' # Clean knob data - cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session) + cleaned_agg_data = DataUtil.clean_knob_data(agg_data['X_matrix'], + agg_data['X_columnlabels'], [session]) agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) LOG.debug("%s ~ FINAL: X_matrix=%s, X_columnlabels=%s", task_name, @@ -581,7 +525,8 @@ def train_ddpg(train_ddpg_input): prev_normalized_metric_data = prev_metric_scalar.transform(prev_metric_data.reshape(1, -1))[0] # Clean knob data - cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session) + cleaned_knob_data = DataUtil.clean_knob_data(agg_data['X_matrix'], + agg_data['X_columnlabels'], [session]) knob_data = np.array(cleaned_knob_data[0]) knob_labels = np.array(cleaned_knob_data[1]) knob_bounds = np.vstack(DataUtil.get_knob_bounds(knob_labels.flatten(), session)) @@ -701,7 +646,8 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis metric_data = metric_data.flatten() metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1)) normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0] - cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session) + cleaned_knob_data = DataUtil.clean_knob_data(agg_data['X_matrix'], + agg_data['X_columnlabels'], [session]) knob_labels = np.array(cleaned_knob_data[1]).flatten() knob_num = len(knob_labels) metric_num = len(metric_data) @@ -730,7 +676,7 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis return target_data_res -def combine_workload(target_data): +def process_training_data(target_data): newest_result = Result.objects.get(pk=target_data['newest_result_id']) latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run']) session = newest_result.session @@ -752,9 +698,9 @@ def combine_workload(target_data): workload=mapped_workload, task_type=PipelineTaskType.METRIC_DATA) workload_metric_data = JSONUtil.loads(workload_metric_data.data) - cleaned_workload_knob_data = clean_knob_data(workload_knob_data["data"], - workload_knob_data["columnlabels"], - newest_result.session) + cleaned_workload_knob_data = DataUtil.clean_knob_data(workload_knob_data["data"], + workload_knob_data["columnlabels"], + [newest_result.session]) X_workload = np.array(cleaned_workload_knob_data[0]) X_columnlabels = np.array(cleaned_workload_knob_data[1]) y_workload = np.array(workload_metric_data['data']) @@ -783,23 +729,6 @@ def combine_workload(target_data): 'identical y columnlabels (sorted metric names)'), y_columnlabels, target_data['y_columnlabels']) - if target_data['mapped_workload'] is not None: - # Filter Xs by top 'IMPORTANT_KNOB_NUMBER' ranked knobs - ranked_knobs = PipelineData.objects.get( - pipeline_run=latest_pipeline_run, - workload=mapped_workload, - task_type=PipelineTaskType.RANKED_KNOBS) - pipeline_data_knob = ranked_knobs - pipeline_data_metric = PipelineData.objects.get( - pipeline_run=latest_pipeline_run, - workload=mapped_workload, - task_type=PipelineTaskType.PRUNED_METRICS) - ranked_knobs = JSONUtil.loads(ranked_knobs.data)[:params['IMPORTANT_KNOB_NUMBER']] - ranked_knob_idxs = [i for i, cl in enumerate(X_columnlabels) if cl in ranked_knobs] - X_workload = X_workload[:, ranked_knob_idxs] - X_target = X_target[:, ranked_knob_idxs] - X_columnlabels = X_columnlabels[ranked_knob_idxs] - # Filter ys by current target objective metric target_objective = newest_result.session.target_objective target_obj_idx = [i for i, cl in enumerate(y_columnlabels) if cl == target_objective] @@ -945,7 +874,7 @@ def configuration_recommendation(recommendation_input): X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\ dummy_encoder, constraint_helper, pipeline_knobs,\ - pipeline_metrics = combine_workload(target_data) + pipeline_metrics = process_training_data(target_data) # FIXME: we should generate more samples and use a smarter sampling technique num_samples = params['NUM_SAMPLES'] @@ -1108,12 +1037,10 @@ def map_workload(map_workload_input): workload__hardware=target_workload.hardware, workload__project=target_workload.project) - # FIXME (dva): we should also compute the global (i.e., overall) ranked_knobs - # and pruned metrics but we just use those from the first workload for now + # FIXME (dva): we should also compute the global (i.e., overall) + # pruned metrics but we just use those from the first workload for now initialized = False - global_ranked_knobs = None global_pruned_metrics = None - ranked_knob_idxs = None pruned_metric_idxs = None unique_workloads = pipeline_data.values_list('workload', flat=True).distinct() @@ -1131,9 +1058,9 @@ def map_workload(map_workload_input): # Load knob & metric data for this workload knob_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.KNOB_DATA) - knob_data["data"], knob_data["columnlabels"] = clean_knob_data(knob_data["data"], - knob_data["columnlabels"], - newest_result.session) + knob_data["data"], knob_data["columnlabels"] =\ + DataUtil.clean_knob_data(knob_data["data"], knob_data["columnlabels"], + [newest_result.session]) metric_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.METRIC_DATA) X_matrix = np.array(knob_data["data"]) y_matrix = np.array(metric_data["data"]) @@ -1141,25 +1068,17 @@ def map_workload(map_workload_input): assert np.array_equal(rowlabels, metric_data["rowlabels"]) if not initialized: - # For now set ranked knobs & pruned metrics to be those computed - # for the first workload - global_ranked_knobs = load_data_helper( - pipeline_data, unique_workload, - PipelineTaskType.RANKED_KNOBS)[:params['IMPORTANT_KNOB_NUMBER']] + # For now set pruned metrics to be those computed for the first workload global_pruned_metrics = load_data_helper( pipeline_data, unique_workload, PipelineTaskType.PRUNED_METRICS) - ranked_knob_idxs = [i for i in range(X_matrix.shape[1]) if X_columnlabels[ - i] in global_ranked_knobs] pruned_metric_idxs = [i for i in range(y_matrix.shape[1]) if y_columnlabels[ i] in global_pruned_metrics] - # Filter X & y columnlabels by top ranked_knobs & pruned_metrics - X_columnlabels = X_columnlabels[ranked_knob_idxs] + # Filter y columnlabels by pruned_metrics y_columnlabels = y_columnlabels[pruned_metric_idxs] initialized = True - # Filter X & y matrices by top ranked_knobs & pruned_metrics - X_matrix = X_matrix[:, ranked_knob_idxs] + # Filter y matrices by pruned_metrics y_matrix = y_matrix[:, pruned_metric_idxs] # Combine duplicate rows (rows with same knob settings) @@ -1172,11 +1091,11 @@ def map_workload(map_workload_input): 'rowlabels': rowlabels, } - if len(workload_data) == 0: + if len(workload_data) < 2: # The background task that aggregates the data has not finished running yet target_data.update(mapped_workload=None, scores=None) LOG.debug('%s: Result = %s\n', task_name, _task_result_tostring(target_data)) - LOG.info('%s: Skipping workload mapping because no workload data is available.', + LOG.info('%s: Skipping workload mapping because less than 2 workloads are available.', task_name) return target_data, algorithm @@ -1194,8 +1113,8 @@ def map_workload(map_workload_input): del Xs del ys - # Filter the target's X & y data by the ranked knobs & pruned metrics. - X_target = target_data['X_matrix'][:, ranked_knob_idxs] + X_target = target_data['X_matrix'] + # Filter the target's y data by the pruned metrics. y_target = target_data['y_matrix'][:, pruned_metric_idxs] # Now standardize the target's data and bin it by the deciles we just @@ -1206,6 +1125,9 @@ def map_workload(map_workload_input): scores = {} for workload_id, workload_entry in list(workload_data.items()): + LOG.info('%s: %s', newest_result.workload.pk, workload_id) + if newest_result.workload.pk == workload_id: + continue predictions = np.empty_like(y_target) X_workload = workload_entry['X_matrix'] X_scaled = X_scaler.transform(X_workload) diff --git a/server/website/website/tasks/periodic_tasks.py b/server/website/website/tasks/periodic_tasks.py index 9cc6898..94241fc 100644 --- a/server/website/website/tasks/periodic_tasks.py +++ b/server/website/website/tasks/periodic_tasks.py @@ -156,7 +156,14 @@ def run_background_tasks(): # PipelineData object. LOG.info("Ranking knobs for workload %s (use pruned metric data: %s)...", workload_name, KNOB_IDENT_USE_PRUNED_METRICS) - ranked_knobs = run_knob_identification(knob_data=knob_data, + sessions = [] + for result in wkld_results: + if result.session not in sessions: + sessions.append(result.session) + rank_knob_data = copy.deepcopy(knob_data) + rank_knob_data['data'], rank_knob_data['columnlabels'] =\ + DataUtil.clean_knob_data(knob_data['data'], knob_data['columnlabels'], sessions) + ranked_knobs = run_knob_identification(knob_data=rank_knob_data, metric_data=ranked_metric_data, dbms=workload.dbms) LOG.info("Done ranking knobs for workload %s (# ranked knobs: %s).\n\n" diff --git a/server/website/website/utils.py b/server/website/website/utils.py index 123d121..43b9ef4 100644 --- a/server/website/website/utils.py +++ b/server/website/website/utils.py @@ -211,6 +211,70 @@ class DataUtil(object): rowlabels_unique[i] = tuple(rowlabels[dup_idxs]) return X_unique, y_unique, rowlabels_unique + @staticmethod + def clean_knob_data(knob_matrix, knob_labels, sessions): + # Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session + knob_matrix = np.array(knob_matrix) + session_knobs = [] + knob_cat = [] + for session in sessions: + knobs_for_this_session = SessionKnob.objects.get_knobs_for_session(session) + for knob in knobs_for_this_session: + if knob['name'] not in knob_cat: + session_knobs.append(knob) + knob_cat = [k['name'] for k in session_knobs] + + if knob_cat == knob_labels: + # Nothing to do! + return knob_matrix, knob_labels + + LOG.info("session_knobs: %s, knob_labels: %s, missing: %s, extra: %s", len(knob_cat), + len(knob_labels), len(set(knob_cat) - set(knob_labels)), + len(set(knob_labels) - set(knob_cat))) + + nrows = knob_matrix.shape[0] # pylint: disable=unsubscriptable-object + new_labels = [] + new_columns = [] + + for knob in session_knobs: + knob_name = knob['name'] + if knob_name not in knob_labels: + # Add missing column initialized to knob's default value + default_val = knob['default'] + try: + if knob['vartype'] == VarType.ENUM: + default_val = knob['enumvals'].split(',').index(default_val) + elif knob['vartype'] == VarType.BOOL: + default_val = str(default_val).lower() in ("on", "true", "yes", "0") + else: + default_val = float(default_val) + except ValueError: + LOG.warning("Error parsing knob '%s' default value: %s. Setting default to 0.", + knob_name, default_val, exc_info=True) + default_val = 0 + new_col = np.ones((nrows, 1), dtype=float) * default_val + new_lab = knob_name + else: + index = knob_labels.index(knob_name) + new_col = knob_matrix[:, index].reshape(-1, 1) + new_lab = knob_labels[index] + + new_labels.append(new_lab) + new_columns.append(new_col) + + new_matrix = np.hstack(new_columns).reshape(nrows, -1) + LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, + len(new_labels), new_labels) + + assert new_labels == knob_cat, \ + "Expected knobs: {}\nActual knobs: {}\n".format( + knob_cat, new_labels) + assert new_matrix.shape == (nrows, len(knob_cat)), \ + "Expected shape: {}, Actual shape: {}".format( + (nrows, len(knob_cat)), new_matrix.shape) + + return new_matrix, new_labels + @staticmethod def dummy_encoder_helper(featured_knobs, dbms): n_values = []