skip workload mapping when there is no workload

This commit is contained in:
yangdsh 2020-02-18 16:44:25 +00:00 committed by Dana Van Aken
parent 59b723ee83
commit f7e22ff5bb
1 changed files with 51 additions and 38 deletions

View File

@ -65,7 +65,7 @@ class MapWorkloadTask(BaseTask): # pylint: disable=abstract-method
new_res = None
# Replace result with formatted result
if not args[0][0]['bad']:
if not args[0][0]['bad'] and args[0][0]['mapped_workload'] is not None:
new_res = {
'scores': sorted(args[0][0]['scores'].items()),
'mapped_workload_id': args[0][0]['mapped_workload'],
@ -469,34 +469,40 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n
def combine_workload(target_data):
# Load mapped workload data
mapped_workload_id = target_data['mapped_workload'][0]
latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run'])
mapped_workload = Workload.objects.get(pk=mapped_workload_id)
workload_knob_data = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.KNOB_DATA)
workload_knob_data = JSONUtil.loads(workload_knob_data.data)
workload_metric_data = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.METRIC_DATA)
workload_metric_data = JSONUtil.loads(workload_metric_data.data)
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run'])
session = newest_result.session
params = JSONUtil.loads(session.hyperparameters)
cleaned_workload_knob_data = clean_knob_data(workload_knob_data["data"],
workload_knob_data["columnlabels"],
newest_result.session)
X_workload = np.array(cleaned_workload_knob_data[0])
X_columnlabels = np.array(cleaned_workload_knob_data[1])
y_workload = np.array(workload_metric_data['data'])
y_columnlabels = np.array(workload_metric_data['columnlabels'])
rowlabels_workload = np.array(workload_metric_data['rowlabels'])
# Load mapped workload data
if target_data['mapped_workload'] is not None:
mapped_workload_id = target_data['mapped_workload'][0]
mapped_workload = Workload.objects.get(pk=mapped_workload_id)
workload_knob_data = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.KNOB_DATA)
workload_knob_data = JSONUtil.loads(workload_knob_data.data)
workload_metric_data = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.METRIC_DATA)
workload_metric_data = JSONUtil.loads(workload_metric_data.data)
cleaned_workload_knob_data = clean_knob_data(workload_knob_data["data"],
workload_knob_data["columnlabels"],
newest_result.session)
X_workload = np.array(cleaned_workload_knob_data[0])
X_columnlabels = np.array(cleaned_workload_knob_data[1])
y_workload = np.array(workload_metric_data['data'])
y_columnlabels = np.array(workload_metric_data['columnlabels'])
rowlabels_workload = np.array(workload_metric_data['rowlabels'])
else:
# combine the target_data with itself is actually adding nothing to the target_data
X_workload = np.array(target_data['X_matrix'])
X_columnlabels = np.array(target_data['X_columnlabels'])
y_workload = np.array(target_data['y_matrix'])
y_columnlabels = np.array(target_data['y_columnlabels'])
rowlabels_workload = np.array(target_data['rowlabels'])
# Target workload data
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
@ -513,16 +519,17 @@ def combine_workload(target_data):
'identical y columnlabels (sorted metric names)'),
y_columnlabels, target_data['y_columnlabels'])
# Filter Xs by top 10 ranked knobs
ranked_knobs = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.RANKED_KNOBS)
ranked_knobs = JSONUtil.loads(ranked_knobs.data)[:params['IMPORTANT_KNOB_NUMBER']]
ranked_knob_idxs = [i for i, cl in enumerate(X_columnlabels) if cl in ranked_knobs]
X_workload = X_workload[:, ranked_knob_idxs]
X_target = X_target[:, ranked_knob_idxs]
X_columnlabels = X_columnlabels[ranked_knob_idxs]
if target_data['mapped_workload'] is not None:
# Filter Xs by top 'IMPORTANT_KNOB_NUMBER' ranked knobs
ranked_knobs = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.RANKED_KNOBS)
ranked_knobs = JSONUtil.loads(ranked_knobs.data)[:params['IMPORTANT_KNOB_NUMBER']]
ranked_knob_idxs = [i for i, cl in enumerate(X_columnlabels) if cl in ranked_knobs]
X_workload = X_workload[:, ranked_knob_idxs]
X_target = X_target[:, ranked_knob_idxs]
X_columnlabels = X_columnlabels[ranked_knob_idxs]
# Filter ys by current target objective metric
target_objective = newest_result.session.target_objective
@ -562,7 +569,7 @@ def combine_workload(target_data):
# Dummy encode categorial variables
if ENABLE_DUMMY_ENCODER:
categorical_info = DataUtil.dummy_encoder_helper(X_columnlabels,
mapped_workload.dbms)
newest_result.dbms)
dummy_encoder = DummyEncoder(categorical_info['n_values'],
categorical_info['categorical_features'],
categorical_info['cat_columnlabels'],
@ -829,10 +836,16 @@ def map_workload(map_workload_input):
ranked_knob_idxs = None
pruned_metric_idxs = None
# Compute workload mapping data for each unique workload
unique_workloads = pipeline_data.values_list('workload', flat=True).distinct()
assert len(unique_workloads) > 0
if unique_workloads == 0:
# The background task that aggregates the data has not finished running yet
target_data.update(mapped_workload=None, scores=None)
LOG.debug('%s: Skipping workload mapping because there is no workload.\n',
AlgorithmType.name(algorithm))
return target_data, algorithm
workload_data = {}
# Compute workload mapping data for each unique workload
for unique_workload in unique_workloads:
workload_obj = Workload.objects.get(pk=unique_workload)