From e7d5b0338a1382631a531e109bb41fe77212f1bb Mon Sep 17 00:00:00 2001 From: yangdsh Date: Wed, 22 Apr 2020 02:51:20 +0000 Subject: [PATCH] clean metrics before pruning --- server/website/website/settings/constants.py | 3 ++ server/website/website/tasks/async_tasks.py | 35 ++++--------------- .../website/website/tasks/periodic_tasks.py | 26 +++++++------- server/website/website/utils.py | 31 +++++++++++++++- 4 files changed, 52 insertions(+), 43 deletions(-) diff --git a/server/website/website/settings/constants.py b/server/website/website/settings/constants.py index f078417..cd7abcd 100644 --- a/server/website/website/settings/constants.py +++ b/server/website/website/settings/constants.py @@ -24,3 +24,6 @@ KNOB_IDENT_USE_PRUNED_METRICS = False # The background tasks only process workloads containing this minimum amount of results MIN_WORKLOAD_RESULTS_COUNT = 5 + +# The views used for metrics pruning +VIEWS_FOR_PRUNING = ['dba_hist_osstat', 'dba_hist_sysstat', 'dba_hist_system_event'] diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index c7b4471..d4c37f4 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -89,30 +89,6 @@ class ConfigurationRecommendation(BaseTask): # pylint: disable=abstract-method task_meta.save() -def clean_metric_data(metric_matrix, metric_labels, session): - # Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels - metric_objs = MetricCatalog.objects.filter(dbms=session.dbms) - metric_cat = [session.target_objective] - for metric_obj in metric_objs: - metric_cat.append(metric_obj.name) - missing_columns = sorted(set(metric_cat) - set(metric_labels)) - unused_columns = set(metric_labels) - set(metric_cat) - LOG.debug("clean_metric_data: added %d metrics and removed %d metric.", len(missing_columns), - len(unused_columns)) - default_val = 0 - metric_cat_size = len(metric_cat) - matrix = np.ones((len(metric_matrix), metric_cat_size)) * default_val - metric_labels_dict = {n: i for i, n in enumerate(metric_labels)} - # column labels in matrix has the same order as ones in metric catalog - # missing values are filled with default_val - for i, metric_name in enumerate(metric_cat): - if metric_name in metric_labels_dict: - index = metric_labels_dict[metric_name] - matrix[:, i] = metric_matrix[:, index] - LOG.debug("clean_metric_data: final ~ matrix: %s, labels: %s", matrix.shape, len(metric_cat)) - return matrix, metric_cat - - def save_execution_time(start_ts, fn, result): end_ts = time.time() exec_time = end_ts - start_ts @@ -513,13 +489,13 @@ def train_ddpg(train_ddpg_input): prev_objective = prev_metric_data[target_obj_idx] # Clean metric data - metric_data, metric_labels = clean_metric_data(agg_data['y_matrix'], - agg_data['y_columnlabels'], session) + metric_data, _ = DataUtil.clean_metric_data(agg_data['y_matrix'], + agg_data['y_columnlabels'], session) metric_data = metric_data.flatten() metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1)) normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0] - prev_metric_data, _ = clean_metric_data(prev_agg_data['y_matrix'], - prev_agg_data['y_columnlabels'], session) + prev_metric_data, _ = DataUtil.clean_metric_data(prev_agg_data['y_matrix'], + prev_agg_data['y_columnlabels'], session) prev_metric_data = prev_metric_data.flatten() prev_metric_scalar = MinMaxScaler().fit(prev_metric_data.reshape(1, -1)) prev_normalized_metric_data = prev_metric_scalar.transform(prev_metric_data.reshape(1, -1))[0] @@ -642,7 +618,8 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis params = JSONUtil.loads(session.hyperparameters) agg_data = DataUtil.aggregate_data(result_list) - metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session) + metric_data, _ = DataUtil.clean_metric_data(agg_data['y_matrix'], + agg_data['y_columnlabels'], session) metric_data = metric_data.flatten() metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1)) normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0] diff --git a/server/website/website/tasks/periodic_tasks.py b/server/website/website/tasks/periodic_tasks.py index fbe508d..8afeeaa 100644 --- a/server/website/website/tasks/periodic_tasks.py +++ b/server/website/website/tasks/periodic_tasks.py @@ -22,7 +22,7 @@ from analysis.preprocessing import (Bin, get_shuffle_indices, consolidate_columnlabels) from website.models import PipelineData, PipelineRun, Result, Workload, ExecutionTime from website.settings import (ENABLE_DUMMY_ENCODER, KNOB_IDENT_USE_PRUNED_METRICS, - MIN_WORKLOAD_RESULTS_COUNT, TIME_ZONE) + MIN_WORKLOAD_RESULTS_COUNT, TIME_ZONE, VIEWS_FOR_PRUNING) from website.types import PipelineTaskType, WorkloadStatusType from website.utils import DataUtil, JSONUtil @@ -82,8 +82,6 @@ def run_background_tasks(): # Check that there are enough results in the workload LOG.info("Not enough results in workload %s (# results: %s, # required: %s).", workload_name, num_wkld_results, MIN_WORKLOAD_RESULTS_COUNT) - workload.status = WorkloadStatusType.PROCESSED - workload.save() continue LOG.info("Aggregating data for workload %s...", workload_name) @@ -92,18 +90,11 @@ def run_background_tasks(): LOG.debug("Aggregated knob data: rowlabels=%s, columnlabels=%s, data=%s.", len(knob_data['rowlabels']), len(knob_data['columnlabels']), knob_data['data'].shape) + LOG.debug("Aggregated metric data: rowlabels=%s, columnlabels=%s, data=%s.", + len(metric_data['rowlabels']), len(metric_data['columnlabels']), + metric_data['data'].shape) LOG.info("Done aggregating data for workload %s.", workload_name) - num_valid_results = knob_data['data'].shape[0] # pylint: disable=unsubscriptable-object - if num_valid_results < MIN_WORKLOAD_RESULTS_COUNT: - # Check that there are enough valid results in the workload - LOG.info("Not enough valid results in workload %s (# valid results: " - "%s, # required: %s).", workload_name, num_valid_results, - MIN_WORKLOAD_RESULTS_COUNT) - workload.status = WorkloadStatusType.PROCESSED - workload.save() - continue - # Knob_data and metric_data are 2D numpy arrays. Convert them into a # JSON-friendly (nested) lists and then save them as new PipelineData # objects. @@ -264,6 +255,14 @@ def run_workload_characterization(metric_data): matrix = metric_data['data'] columnlabels = metric_data['columnlabels'] LOG.debug("Workload characterization ~ initial data size: %s", matrix.shape) + useful_labels = [] + for label in columnlabels: + for view in VIEWS_FOR_PRUNING: + if view in label: + useful_labels.append(label) + break + matrix, columnlabels = DataUtil.clean_metric_data(matrix, columnlabels, None, useful_labels) + LOG.debug("Workload characterization ~ cleaned data size: %s", matrix.shape) # Bin each column (metric) in the matrix by its decile binner = Bin(bin_start=1, axis=0) @@ -298,6 +297,7 @@ def run_workload_characterization(metric_data): # Components: metrics * factors components = fa_model.components_.T.copy() + LOG.info("Workload characterization first part costs %.0f seconds.", time.time() - start_ts) # Run Kmeans for # clusters k in range(1, num_nonduplicate_metrics - 1) # K should be much smaller than n_cols in detK, For now max_cluster <= 20 diff --git a/server/website/website/utils.py b/server/website/website/utils.py index c6d673d..9d59a27 100644 --- a/server/website/website/utils.py +++ b/server/website/website/utils.py @@ -25,7 +25,7 @@ from django.utils.text import capfirst from django_db_logger.models import StatusLog from djcelery.models import TaskMeta -from .models import DBMSCatalog, KnobCatalog, Result, Session, SessionKnob +from .models import DBMSCatalog, MetricCatalog, KnobCatalog, Result, Session, SessionKnob from .settings import common from .types import LabelStyleType, VarType @@ -211,6 +211,35 @@ class DataUtil(object): rowlabels_unique[i] = tuple(rowlabels[dup_idxs]) return X_unique, y_unique, rowlabels_unique + @staticmethod + def clean_metric_data(metric_matrix, metric_labels, session, useful_labels=None): + # Make metric_labels identical to useful_labels (if given) + # If useful_labels is not given, let it to be all metrics in the catalog. + if useful_labels is None: + metric_objs = MetricCatalog.objects.filter(dbms=session.dbms) + metric_cat = [session.target_objective] + for metric_obj in metric_objs: + metric_cat.append(metric_obj.name) + else: + metric_cat = useful_labels + missing_columns = sorted(set(metric_cat) - set(metric_labels)) + unused_columns = set(metric_labels) - set(metric_cat) + LOG.debug("clean_metric_data: added %d metrics and removed %d metric.", + len(missing_columns), len(unused_columns)) + default_val = 0 + metric_cat_size = len(metric_cat) + matrix = np.ones((len(metric_matrix), metric_cat_size)) * default_val + metric_labels_dict = {n: i for i, n in enumerate(metric_labels)} + # column labels in matrix has the same order as ones in metric catalog + # missing values are filled with default_val + for i, metric_name in enumerate(metric_cat): + if metric_name in metric_labels_dict: + index = metric_labels_dict[metric_name] + matrix[:, i] = metric_matrix[:, index] + LOG.debug("clean_metric_data: final ~ matrix: %s, labels: %s", matrix.shape, + len(metric_cat)) + return matrix, metric_cat + @staticmethod def clean_knob_data(knob_matrix, knob_labels, sessions): # Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session