do not allow mapping to the same workload; remove important_knob_num

This commit is contained in:
yangdsh 2020-04-16 01:34:27 +00:00 committed by Dana Van Aken
parent ec67e739d8
commit 0bf2da738f
3 changed files with 98 additions and 105 deletions

View File

@ -89,63 +89,6 @@ class ConfigurationRecommendation(BaseTask): # pylint: disable=abstract-method
task_meta.save()
def clean_knob_data(knob_matrix, knob_labels, session):
# Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session
knob_matrix = np.array(knob_matrix)
session_knobs = SessionKnob.objects.get_knobs_for_session(session)
knob_cat = [k['name'] for k in session_knobs]
if knob_cat == knob_labels:
# Nothing to do!
return knob_matrix, knob_labels
LOG.info("session_knobs: %s, knob_labels: %s, missing: %s, extra: %s", len(knob_cat),
len(knob_labels), len(set(knob_cat) - set(knob_labels)),
len(set(knob_labels) - set(knob_cat)))
nrows = knob_matrix.shape[0] # pylint: disable=unsubscriptable-object
new_labels = []
new_columns = []
for knob in session_knobs:
knob_name = knob['name']
if knob_name not in knob_labels:
# Add missing column initialized to knob's default value
default_val = knob['default']
try:
if knob['vartype'] == VarType.ENUM:
default_val = knob['enumvals'].split(',').index(default_val)
elif knob['vartype'] == VarType.BOOL:
default_val = str(default_val).lower() in ("on", "true", "yes", "0")
else:
default_val = float(default_val)
except ValueError:
LOG.warning("Error parsing knob '%s' default value: %s. Setting default to 0.",
knob_name, default_val, exc_info=True)
default_val = 0
new_col = np.ones((nrows, 1), dtype=float) * default_val
new_lab = knob_name
else:
index = knob_labels.index(knob_name)
new_col = knob_matrix[:, index].reshape(-1, 1)
new_lab = knob_labels[index]
new_labels.append(new_lab)
new_columns.append(new_col)
new_matrix = np.hstack(new_columns).reshape(nrows, -1)
LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, len(new_labels), new_labels)
assert new_labels == knob_cat, \
"Expected knobs: {}\nActual knobs: {}\n".format(
knob_cat, new_labels)
assert new_matrix.shape == (nrows, len(knob_cat)), \
"Expected shape: {}, Actual shape: {}".format(
(nrows, len(knob_cat)), new_matrix.shape)
return new_matrix, new_labels
def clean_metric_data(metric_matrix, metric_labels, session):
# Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels
metric_objs = MetricCatalog.objects.filter(dbms=session.dbms)
@ -421,7 +364,8 @@ def aggregate_target_results(aggregate_target_results_input):
agg_data['status'] = 'good'
# Clean knob data
cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session)
cleaned_agg_data = DataUtil.clean_knob_data(agg_data['X_matrix'],
agg_data['X_columnlabels'], [session])
agg_data['X_matrix'] = np.array(cleaned_agg_data[0])
agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1])
LOG.debug("%s ~ FINAL: X_matrix=%s, X_columnlabels=%s", task_name,
@ -581,7 +525,8 @@ def train_ddpg(train_ddpg_input):
prev_normalized_metric_data = prev_metric_scalar.transform(prev_metric_data.reshape(1, -1))[0]
# Clean knob data
cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session)
cleaned_knob_data = DataUtil.clean_knob_data(agg_data['X_matrix'],
agg_data['X_columnlabels'], [session])
knob_data = np.array(cleaned_knob_data[0])
knob_labels = np.array(cleaned_knob_data[1])
knob_bounds = np.vstack(DataUtil.get_knob_bounds(knob_labels.flatten(), session))
@ -701,7 +646,8 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis
metric_data = metric_data.flatten()
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session)
cleaned_knob_data = DataUtil.clean_knob_data(agg_data['X_matrix'],
agg_data['X_columnlabels'], [session])
knob_labels = np.array(cleaned_knob_data[1]).flatten()
knob_num = len(knob_labels)
metric_num = len(metric_data)
@ -730,7 +676,7 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis
return target_data_res
def combine_workload(target_data):
def process_training_data(target_data):
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run'])
session = newest_result.session
@ -752,9 +698,9 @@ def combine_workload(target_data):
workload=mapped_workload,
task_type=PipelineTaskType.METRIC_DATA)
workload_metric_data = JSONUtil.loads(workload_metric_data.data)
cleaned_workload_knob_data = clean_knob_data(workload_knob_data["data"],
workload_knob_data["columnlabels"],
newest_result.session)
cleaned_workload_knob_data = DataUtil.clean_knob_data(workload_knob_data["data"],
workload_knob_data["columnlabels"],
[newest_result.session])
X_workload = np.array(cleaned_workload_knob_data[0])
X_columnlabels = np.array(cleaned_workload_knob_data[1])
y_workload = np.array(workload_metric_data['data'])
@ -783,23 +729,6 @@ def combine_workload(target_data):
'identical y columnlabels (sorted metric names)'),
y_columnlabels, target_data['y_columnlabels'])
if target_data['mapped_workload'] is not None:
# Filter Xs by top 'IMPORTANT_KNOB_NUMBER' ranked knobs
ranked_knobs = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.RANKED_KNOBS)
pipeline_data_knob = ranked_knobs
pipeline_data_metric = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.PRUNED_METRICS)
ranked_knobs = JSONUtil.loads(ranked_knobs.data)[:params['IMPORTANT_KNOB_NUMBER']]
ranked_knob_idxs = [i for i, cl in enumerate(X_columnlabels) if cl in ranked_knobs]
X_workload = X_workload[:, ranked_knob_idxs]
X_target = X_target[:, ranked_knob_idxs]
X_columnlabels = X_columnlabels[ranked_knob_idxs]
# Filter ys by current target objective metric
target_objective = newest_result.session.target_objective
target_obj_idx = [i for i, cl in enumerate(y_columnlabels) if cl == target_objective]
@ -945,7 +874,7 @@ def configuration_recommendation(recommendation_input):
X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\
dummy_encoder, constraint_helper, pipeline_knobs,\
pipeline_metrics = combine_workload(target_data)
pipeline_metrics = process_training_data(target_data)
# FIXME: we should generate more samples and use a smarter sampling technique
num_samples = params['NUM_SAMPLES']
@ -1108,12 +1037,10 @@ def map_workload(map_workload_input):
workload__hardware=target_workload.hardware,
workload__project=target_workload.project)
# FIXME (dva): we should also compute the global (i.e., overall) ranked_knobs
# and pruned metrics but we just use those from the first workload for now
# FIXME (dva): we should also compute the global (i.e., overall)
# pruned metrics but we just use those from the first workload for now
initialized = False
global_ranked_knobs = None
global_pruned_metrics = None
ranked_knob_idxs = None
pruned_metric_idxs = None
unique_workloads = pipeline_data.values_list('workload', flat=True).distinct()
@ -1131,9 +1058,9 @@ def map_workload(map_workload_input):
# Load knob & metric data for this workload
knob_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.KNOB_DATA)
knob_data["data"], knob_data["columnlabels"] = clean_knob_data(knob_data["data"],
knob_data["columnlabels"],
newest_result.session)
knob_data["data"], knob_data["columnlabels"] =\
DataUtil.clean_knob_data(knob_data["data"], knob_data["columnlabels"],
[newest_result.session])
metric_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.METRIC_DATA)
X_matrix = np.array(knob_data["data"])
y_matrix = np.array(metric_data["data"])
@ -1141,25 +1068,17 @@ def map_workload(map_workload_input):
assert np.array_equal(rowlabels, metric_data["rowlabels"])
if not initialized:
# For now set ranked knobs & pruned metrics to be those computed
# for the first workload
global_ranked_knobs = load_data_helper(
pipeline_data, unique_workload,
PipelineTaskType.RANKED_KNOBS)[:params['IMPORTANT_KNOB_NUMBER']]
# For now set pruned metrics to be those computed for the first workload
global_pruned_metrics = load_data_helper(
pipeline_data, unique_workload, PipelineTaskType.PRUNED_METRICS)
ranked_knob_idxs = [i for i in range(X_matrix.shape[1]) if X_columnlabels[
i] in global_ranked_knobs]
pruned_metric_idxs = [i for i in range(y_matrix.shape[1]) if y_columnlabels[
i] in global_pruned_metrics]
# Filter X & y columnlabels by top ranked_knobs & pruned_metrics
X_columnlabels = X_columnlabels[ranked_knob_idxs]
# Filter y columnlabels by pruned_metrics
y_columnlabels = y_columnlabels[pruned_metric_idxs]
initialized = True
# Filter X & y matrices by top ranked_knobs & pruned_metrics
X_matrix = X_matrix[:, ranked_knob_idxs]
# Filter y matrices by pruned_metrics
y_matrix = y_matrix[:, pruned_metric_idxs]
# Combine duplicate rows (rows with same knob settings)
@ -1172,11 +1091,11 @@ def map_workload(map_workload_input):
'rowlabels': rowlabels,
}
if len(workload_data) == 0:
if len(workload_data) < 2:
# The background task that aggregates the data has not finished running yet
target_data.update(mapped_workload=None, scores=None)
LOG.debug('%s: Result = %s\n', task_name, _task_result_tostring(target_data))
LOG.info('%s: Skipping workload mapping because no workload data is available.',
LOG.info('%s: Skipping workload mapping because less than 2 workloads are available.',
task_name)
return target_data, algorithm
@ -1194,8 +1113,8 @@ def map_workload(map_workload_input):
del Xs
del ys
# Filter the target's X & y data by the ranked knobs & pruned metrics.
X_target = target_data['X_matrix'][:, ranked_knob_idxs]
X_target = target_data['X_matrix']
# Filter the target's y data by the pruned metrics.
y_target = target_data['y_matrix'][:, pruned_metric_idxs]
# Now standardize the target's data and bin it by the deciles we just
@ -1206,6 +1125,9 @@ def map_workload(map_workload_input):
scores = {}
for workload_id, workload_entry in list(workload_data.items()):
LOG.info('%s: %s', newest_result.workload.pk, workload_id)
if newest_result.workload.pk == workload_id:
continue
predictions = np.empty_like(y_target)
X_workload = workload_entry['X_matrix']
X_scaled = X_scaler.transform(X_workload)

View File

@ -156,7 +156,14 @@ def run_background_tasks():
# PipelineData object.
LOG.info("Ranking knobs for workload %s (use pruned metric data: %s)...",
workload_name, KNOB_IDENT_USE_PRUNED_METRICS)
ranked_knobs = run_knob_identification(knob_data=knob_data,
sessions = []
for result in wkld_results:
if result.session not in sessions:
sessions.append(result.session)
rank_knob_data = copy.deepcopy(knob_data)
rank_knob_data['data'], rank_knob_data['columnlabels'] =\
DataUtil.clean_knob_data(knob_data['data'], knob_data['columnlabels'], sessions)
ranked_knobs = run_knob_identification(knob_data=rank_knob_data,
metric_data=ranked_metric_data,
dbms=workload.dbms)
LOG.info("Done ranking knobs for workload %s (# ranked knobs: %s).\n\n"

View File

@ -211,6 +211,70 @@ class DataUtil(object):
rowlabels_unique[i] = tuple(rowlabels[dup_idxs])
return X_unique, y_unique, rowlabels_unique
@staticmethod
def clean_knob_data(knob_matrix, knob_labels, sessions):
# Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session
knob_matrix = np.array(knob_matrix)
session_knobs = []
knob_cat = []
for session in sessions:
knobs_for_this_session = SessionKnob.objects.get_knobs_for_session(session)
for knob in knobs_for_this_session:
if knob['name'] not in knob_cat:
session_knobs.append(knob)
knob_cat = [k['name'] for k in session_knobs]
if knob_cat == knob_labels:
# Nothing to do!
return knob_matrix, knob_labels
LOG.info("session_knobs: %s, knob_labels: %s, missing: %s, extra: %s", len(knob_cat),
len(knob_labels), len(set(knob_cat) - set(knob_labels)),
len(set(knob_labels) - set(knob_cat)))
nrows = knob_matrix.shape[0] # pylint: disable=unsubscriptable-object
new_labels = []
new_columns = []
for knob in session_knobs:
knob_name = knob['name']
if knob_name not in knob_labels:
# Add missing column initialized to knob's default value
default_val = knob['default']
try:
if knob['vartype'] == VarType.ENUM:
default_val = knob['enumvals'].split(',').index(default_val)
elif knob['vartype'] == VarType.BOOL:
default_val = str(default_val).lower() in ("on", "true", "yes", "0")
else:
default_val = float(default_val)
except ValueError:
LOG.warning("Error parsing knob '%s' default value: %s. Setting default to 0.",
knob_name, default_val, exc_info=True)
default_val = 0
new_col = np.ones((nrows, 1), dtype=float) * default_val
new_lab = knob_name
else:
index = knob_labels.index(knob_name)
new_col = knob_matrix[:, index].reshape(-1, 1)
new_lab = knob_labels[index]
new_labels.append(new_lab)
new_columns.append(new_col)
new_matrix = np.hstack(new_columns).reshape(nrows, -1)
LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape,
len(new_labels), new_labels)
assert new_labels == knob_cat, \
"Expected knobs: {}\nActual knobs: {}\n".format(
knob_cat, new_labels)
assert new_matrix.shape == (nrows, len(knob_cat)), \
"Expected shape: {}, Actual shape: {}".format(
(nrows, len(knob_cat)), new_matrix.shape)
return new_matrix, new_labels
@staticmethod
def dummy_encoder_helper(featured_knobs, dbms):
n_values = []