clean metrics before pruning
This commit is contained in:
parent
8da203fdc3
commit
e7d5b0338a
|
@ -24,3 +24,6 @@ KNOB_IDENT_USE_PRUNED_METRICS = False
|
||||||
|
|
||||||
# The background tasks only process workloads containing this minimum amount of results
|
# The background tasks only process workloads containing this minimum amount of results
|
||||||
MIN_WORKLOAD_RESULTS_COUNT = 5
|
MIN_WORKLOAD_RESULTS_COUNT = 5
|
||||||
|
|
||||||
|
# The views used for metrics pruning
|
||||||
|
VIEWS_FOR_PRUNING = ['dba_hist_osstat', 'dba_hist_sysstat', 'dba_hist_system_event']
|
||||||
|
|
|
@ -89,30 +89,6 @@ class ConfigurationRecommendation(BaseTask): # pylint: disable=abstract-method
|
||||||
task_meta.save()
|
task_meta.save()
|
||||||
|
|
||||||
|
|
||||||
def clean_metric_data(metric_matrix, metric_labels, session):
|
|
||||||
# Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels
|
|
||||||
metric_objs = MetricCatalog.objects.filter(dbms=session.dbms)
|
|
||||||
metric_cat = [session.target_objective]
|
|
||||||
for metric_obj in metric_objs:
|
|
||||||
metric_cat.append(metric_obj.name)
|
|
||||||
missing_columns = sorted(set(metric_cat) - set(metric_labels))
|
|
||||||
unused_columns = set(metric_labels) - set(metric_cat)
|
|
||||||
LOG.debug("clean_metric_data: added %d metrics and removed %d metric.", len(missing_columns),
|
|
||||||
len(unused_columns))
|
|
||||||
default_val = 0
|
|
||||||
metric_cat_size = len(metric_cat)
|
|
||||||
matrix = np.ones((len(metric_matrix), metric_cat_size)) * default_val
|
|
||||||
metric_labels_dict = {n: i for i, n in enumerate(metric_labels)}
|
|
||||||
# column labels in matrix has the same order as ones in metric catalog
|
|
||||||
# missing values are filled with default_val
|
|
||||||
for i, metric_name in enumerate(metric_cat):
|
|
||||||
if metric_name in metric_labels_dict:
|
|
||||||
index = metric_labels_dict[metric_name]
|
|
||||||
matrix[:, i] = metric_matrix[:, index]
|
|
||||||
LOG.debug("clean_metric_data: final ~ matrix: %s, labels: %s", matrix.shape, len(metric_cat))
|
|
||||||
return matrix, metric_cat
|
|
||||||
|
|
||||||
|
|
||||||
def save_execution_time(start_ts, fn, result):
|
def save_execution_time(start_ts, fn, result):
|
||||||
end_ts = time.time()
|
end_ts = time.time()
|
||||||
exec_time = end_ts - start_ts
|
exec_time = end_ts - start_ts
|
||||||
|
@ -513,12 +489,12 @@ def train_ddpg(train_ddpg_input):
|
||||||
prev_objective = prev_metric_data[target_obj_idx]
|
prev_objective = prev_metric_data[target_obj_idx]
|
||||||
|
|
||||||
# Clean metric data
|
# Clean metric data
|
||||||
metric_data, metric_labels = clean_metric_data(agg_data['y_matrix'],
|
metric_data, _ = DataUtil.clean_metric_data(agg_data['y_matrix'],
|
||||||
agg_data['y_columnlabels'], session)
|
agg_data['y_columnlabels'], session)
|
||||||
metric_data = metric_data.flatten()
|
metric_data = metric_data.flatten()
|
||||||
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
|
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
|
||||||
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
|
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
|
||||||
prev_metric_data, _ = clean_metric_data(prev_agg_data['y_matrix'],
|
prev_metric_data, _ = DataUtil.clean_metric_data(prev_agg_data['y_matrix'],
|
||||||
prev_agg_data['y_columnlabels'], session)
|
prev_agg_data['y_columnlabels'], session)
|
||||||
prev_metric_data = prev_metric_data.flatten()
|
prev_metric_data = prev_metric_data.flatten()
|
||||||
prev_metric_scalar = MinMaxScaler().fit(prev_metric_data.reshape(1, -1))
|
prev_metric_scalar = MinMaxScaler().fit(prev_metric_data.reshape(1, -1))
|
||||||
|
@ -642,7 +618,8 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis
|
||||||
|
|
||||||
params = JSONUtil.loads(session.hyperparameters)
|
params = JSONUtil.loads(session.hyperparameters)
|
||||||
agg_data = DataUtil.aggregate_data(result_list)
|
agg_data = DataUtil.aggregate_data(result_list)
|
||||||
metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session)
|
metric_data, _ = DataUtil.clean_metric_data(agg_data['y_matrix'],
|
||||||
|
agg_data['y_columnlabels'], session)
|
||||||
metric_data = metric_data.flatten()
|
metric_data = metric_data.flatten()
|
||||||
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
|
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
|
||||||
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
|
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
|
||||||
|
|
|
@ -22,7 +22,7 @@ from analysis.preprocessing import (Bin, get_shuffle_indices,
|
||||||
consolidate_columnlabels)
|
consolidate_columnlabels)
|
||||||
from website.models import PipelineData, PipelineRun, Result, Workload, ExecutionTime
|
from website.models import PipelineData, PipelineRun, Result, Workload, ExecutionTime
|
||||||
from website.settings import (ENABLE_DUMMY_ENCODER, KNOB_IDENT_USE_PRUNED_METRICS,
|
from website.settings import (ENABLE_DUMMY_ENCODER, KNOB_IDENT_USE_PRUNED_METRICS,
|
||||||
MIN_WORKLOAD_RESULTS_COUNT, TIME_ZONE)
|
MIN_WORKLOAD_RESULTS_COUNT, TIME_ZONE, VIEWS_FOR_PRUNING)
|
||||||
from website.types import PipelineTaskType, WorkloadStatusType
|
from website.types import PipelineTaskType, WorkloadStatusType
|
||||||
from website.utils import DataUtil, JSONUtil
|
from website.utils import DataUtil, JSONUtil
|
||||||
|
|
||||||
|
@ -82,8 +82,6 @@ def run_background_tasks():
|
||||||
# Check that there are enough results in the workload
|
# Check that there are enough results in the workload
|
||||||
LOG.info("Not enough results in workload %s (# results: %s, # required: %s).",
|
LOG.info("Not enough results in workload %s (# results: %s, # required: %s).",
|
||||||
workload_name, num_wkld_results, MIN_WORKLOAD_RESULTS_COUNT)
|
workload_name, num_wkld_results, MIN_WORKLOAD_RESULTS_COUNT)
|
||||||
workload.status = WorkloadStatusType.PROCESSED
|
|
||||||
workload.save()
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
LOG.info("Aggregating data for workload %s...", workload_name)
|
LOG.info("Aggregating data for workload %s...", workload_name)
|
||||||
|
@ -92,18 +90,11 @@ def run_background_tasks():
|
||||||
LOG.debug("Aggregated knob data: rowlabels=%s, columnlabels=%s, data=%s.",
|
LOG.debug("Aggregated knob data: rowlabels=%s, columnlabels=%s, data=%s.",
|
||||||
len(knob_data['rowlabels']), len(knob_data['columnlabels']),
|
len(knob_data['rowlabels']), len(knob_data['columnlabels']),
|
||||||
knob_data['data'].shape)
|
knob_data['data'].shape)
|
||||||
|
LOG.debug("Aggregated metric data: rowlabels=%s, columnlabels=%s, data=%s.",
|
||||||
|
len(metric_data['rowlabels']), len(metric_data['columnlabels']),
|
||||||
|
metric_data['data'].shape)
|
||||||
LOG.info("Done aggregating data for workload %s.", workload_name)
|
LOG.info("Done aggregating data for workload %s.", workload_name)
|
||||||
|
|
||||||
num_valid_results = knob_data['data'].shape[0] # pylint: disable=unsubscriptable-object
|
|
||||||
if num_valid_results < MIN_WORKLOAD_RESULTS_COUNT:
|
|
||||||
# Check that there are enough valid results in the workload
|
|
||||||
LOG.info("Not enough valid results in workload %s (# valid results: "
|
|
||||||
"%s, # required: %s).", workload_name, num_valid_results,
|
|
||||||
MIN_WORKLOAD_RESULTS_COUNT)
|
|
||||||
workload.status = WorkloadStatusType.PROCESSED
|
|
||||||
workload.save()
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Knob_data and metric_data are 2D numpy arrays. Convert them into a
|
# Knob_data and metric_data are 2D numpy arrays. Convert them into a
|
||||||
# JSON-friendly (nested) lists and then save them as new PipelineData
|
# JSON-friendly (nested) lists and then save them as new PipelineData
|
||||||
# objects.
|
# objects.
|
||||||
|
@ -264,6 +255,14 @@ def run_workload_characterization(metric_data):
|
||||||
matrix = metric_data['data']
|
matrix = metric_data['data']
|
||||||
columnlabels = metric_data['columnlabels']
|
columnlabels = metric_data['columnlabels']
|
||||||
LOG.debug("Workload characterization ~ initial data size: %s", matrix.shape)
|
LOG.debug("Workload characterization ~ initial data size: %s", matrix.shape)
|
||||||
|
useful_labels = []
|
||||||
|
for label in columnlabels:
|
||||||
|
for view in VIEWS_FOR_PRUNING:
|
||||||
|
if view in label:
|
||||||
|
useful_labels.append(label)
|
||||||
|
break
|
||||||
|
matrix, columnlabels = DataUtil.clean_metric_data(matrix, columnlabels, None, useful_labels)
|
||||||
|
LOG.debug("Workload characterization ~ cleaned data size: %s", matrix.shape)
|
||||||
|
|
||||||
# Bin each column (metric) in the matrix by its decile
|
# Bin each column (metric) in the matrix by its decile
|
||||||
binner = Bin(bin_start=1, axis=0)
|
binner = Bin(bin_start=1, axis=0)
|
||||||
|
@ -298,6 +297,7 @@ def run_workload_characterization(metric_data):
|
||||||
|
|
||||||
# Components: metrics * factors
|
# Components: metrics * factors
|
||||||
components = fa_model.components_.T.copy()
|
components = fa_model.components_.T.copy()
|
||||||
|
LOG.info("Workload characterization first part costs %.0f seconds.", time.time() - start_ts)
|
||||||
|
|
||||||
# Run Kmeans for # clusters k in range(1, num_nonduplicate_metrics - 1)
|
# Run Kmeans for # clusters k in range(1, num_nonduplicate_metrics - 1)
|
||||||
# K should be much smaller than n_cols in detK, For now max_cluster <= 20
|
# K should be much smaller than n_cols in detK, For now max_cluster <= 20
|
||||||
|
|
|
@ -25,7 +25,7 @@ from django.utils.text import capfirst
|
||||||
from django_db_logger.models import StatusLog
|
from django_db_logger.models import StatusLog
|
||||||
from djcelery.models import TaskMeta
|
from djcelery.models import TaskMeta
|
||||||
|
|
||||||
from .models import DBMSCatalog, KnobCatalog, Result, Session, SessionKnob
|
from .models import DBMSCatalog, MetricCatalog, KnobCatalog, Result, Session, SessionKnob
|
||||||
from .settings import common
|
from .settings import common
|
||||||
from .types import LabelStyleType, VarType
|
from .types import LabelStyleType, VarType
|
||||||
|
|
||||||
|
@ -211,6 +211,35 @@ class DataUtil(object):
|
||||||
rowlabels_unique[i] = tuple(rowlabels[dup_idxs])
|
rowlabels_unique[i] = tuple(rowlabels[dup_idxs])
|
||||||
return X_unique, y_unique, rowlabels_unique
|
return X_unique, y_unique, rowlabels_unique
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clean_metric_data(metric_matrix, metric_labels, session, useful_labels=None):
|
||||||
|
# Make metric_labels identical to useful_labels (if given)
|
||||||
|
# If useful_labels is not given, let it to be all metrics in the catalog.
|
||||||
|
if useful_labels is None:
|
||||||
|
metric_objs = MetricCatalog.objects.filter(dbms=session.dbms)
|
||||||
|
metric_cat = [session.target_objective]
|
||||||
|
for metric_obj in metric_objs:
|
||||||
|
metric_cat.append(metric_obj.name)
|
||||||
|
else:
|
||||||
|
metric_cat = useful_labels
|
||||||
|
missing_columns = sorted(set(metric_cat) - set(metric_labels))
|
||||||
|
unused_columns = set(metric_labels) - set(metric_cat)
|
||||||
|
LOG.debug("clean_metric_data: added %d metrics and removed %d metric.",
|
||||||
|
len(missing_columns), len(unused_columns))
|
||||||
|
default_val = 0
|
||||||
|
metric_cat_size = len(metric_cat)
|
||||||
|
matrix = np.ones((len(metric_matrix), metric_cat_size)) * default_val
|
||||||
|
metric_labels_dict = {n: i for i, n in enumerate(metric_labels)}
|
||||||
|
# column labels in matrix has the same order as ones in metric catalog
|
||||||
|
# missing values are filled with default_val
|
||||||
|
for i, metric_name in enumerate(metric_cat):
|
||||||
|
if metric_name in metric_labels_dict:
|
||||||
|
index = metric_labels_dict[metric_name]
|
||||||
|
matrix[:, i] = metric_matrix[:, index]
|
||||||
|
LOG.debug("clean_metric_data: final ~ matrix: %s, labels: %s", matrix.shape,
|
||||||
|
len(metric_cat))
|
||||||
|
return matrix, metric_cat
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def clean_knob_data(knob_matrix, knob_labels, sessions):
|
def clean_knob_data(knob_matrix, knob_labels, sessions):
|
||||||
# Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session
|
# Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session
|
||||||
|
|
Loading…
Reference in New Issue