deal with invalid results

This commit is contained in:
yangdsh 2020-03-22 20:32:17 +00:00 committed by Dana Van Aken
parent e123158eb6
commit 56860d6364
4 changed files with 55 additions and 29 deletions

View File

@ -179,7 +179,9 @@ def save_execution_time(start_ts, fn, result):
def choose_value_in_range(num1, num2): def choose_value_in_range(num1, num2):
if num1 > 10 * num2 or num2 > 10 * num1: if num2 < 10 and num1 < 10:
mean = min(num1, num2)
elif num1 > 10 * num2 or num2 > 10 * num1:
# It is important to add 1 to avoid log(0) # It is important to add 1 to avoid log(0)
log_num1 = np.log(num1 + 1) log_num1 = np.log(num1 + 1)
log_num2 = np.log(num2 + 1) log_num2 = np.log(num2 + 1)
@ -294,9 +296,16 @@ def preprocessing(result_id, algorithm):
# this data in order to make a configuration recommendation (until we # this data in order to make a configuration recommendation (until we
# implement a sampling technique to generate new training data). # implement a sampling technique to generate new training data).
has_pipeline_data = PipelineData.objects.filter(workload=newest_result.workload).exists() has_pipeline_data = PipelineData.objects.filter(workload=newest_result.workload).exists()
if not has_pipeline_data or session.tuning_session == 'lhs': session_results = Result.objects.filter(session=session)
useful_results_cnt = len(session_results)
for result in session_results:
if 'range_test' in result.metric_data.name or 'default' in result.metric_data.name:
useful_results_cnt -= 1
if not has_pipeline_data or useful_results_cnt == 0 or session.tuning_session == 'lhs':
if not has_pipeline_data and session.tuning_session == 'tuning_session': if not has_pipeline_data and session.tuning_session == 'tuning_session':
LOG.debug("Background tasks haven't ran for this workload yet, picking data with lhs.") LOG.debug("Background tasks haven't ran for this workload yet, picking data with lhs.")
if useful_results_cnt == 0 and session.tuning_session == 'tuning_session':
LOG.debug("Not enough data in this session, picking data with lhs.")
all_samples = JSONUtil.loads(session.lhs_samples) all_samples = JSONUtil.loads(session.lhs_samples)
if len(all_samples) == 0: if len(all_samples) == 0:
@ -448,20 +457,25 @@ def train_ddpg(train_ddpg_input):
params = JSONUtil.loads(session.hyperparameters) params = JSONUtil.loads(session.hyperparameters)
session_results = Result.objects.filter(session=session, session_results = Result.objects.filter(session=session,
creation_time__lt=result.creation_time) creation_time__lt=result.creation_time)
useful_results_cnt = len(session_results)
first_valid_result = -1
for i, result in enumerate(session_results): for i, result in enumerate(session_results):
if 'range_test' in result.metric_data.name: if 'range_test' in result.metric_data.name or 'default' in result.metric_data.name:
session_results.pop(i) useful_results_cnt -= 1
else:
last_valid_result = i
first_valid_result = i if first_valid_result == -1 else first_valid_result
target_data = {} target_data = {}
target_data['newest_result_id'] = result_id target_data['newest_result_id'] = result_id
# Extract data from result and previous results # Extract data from result and previous results
result = Result.objects.filter(pk=result_id) result = Result.objects.filter(pk=result_id)
if len(session_results) == 0: if useful_results_cnt == 0:
base_result_id = result_id base_result_id = result_id
prev_result_id = result_id prev_result_id = result_id
else: else:
base_result_id = session_results[0].pk base_result_id = session_results[first_valid_result].pk
prev_result_id = session_results[len(session_results) - 1].pk prev_result_id = session_results[last_valid_result].pk
base_result = Result.objects.filter(pk=base_result_id) base_result = Result.objects.filter(pk=base_result_id)
prev_result = Result.objects.filter(pk=prev_result_id) prev_result = Result.objects.filter(pk=prev_result_id)

View File

@ -80,8 +80,8 @@ def run_background_tasks():
LOG.debug("Aggregating data for workload %d", workload.id) LOG.debug("Aggregating data for workload %d", workload.id)
# Aggregate the knob & metric data for this workload # Aggregate the knob & metric data for this workload
knob_data, metric_data = aggregate_data(wkld_results) knob_data, metric_data = aggregate_data(wkld_results)
LOG.debug("knob_data: %s", str(knob_data)) # LOG.debug("knob_data: %s", str(knob_data))
LOG.debug("metric_data: %s", str(metric_data)) # LOG.debug("metric_data: %s", str(metric_data))
# Knob_data and metric_data are 2D numpy arrays. Convert them into a # Knob_data and metric_data are 2D numpy arrays. Convert them into a
# JSON-friendly (nested) lists and then save them as new PipelineData # JSON-friendly (nested) lists and then save them as new PipelineData
@ -184,7 +184,7 @@ def aggregate_data(wkld_results):
# - 'y_columnlabels': a list of the metric names corresponding to the # - 'y_columnlabels': a list of the metric names corresponding to the
# columns in the metric_data matrix # columns in the metric_data matrix
start_ts = time.time() start_ts = time.time()
aggregated_data = DataUtil.aggregate_data(wkld_results) aggregated_data = DataUtil.aggregate_data(wkld_results, ignore=['range_test', 'default', '*'])
# Separate knob & workload data into two "standard" dictionaries of the # Separate knob & workload data into two "standard" dictionaries of the
# same form # same form

View File

@ -145,15 +145,17 @@ class DataUtil(object):
return np.array(minvals), np.array(maxvals) return np.array(minvals), np.array(maxvals)
@staticmethod @staticmethod
def aggregate_data(results): def aggregate_data(results, ignore=None):
if ignore is None:
ignore = ['range_test', 'default']
knob_labels = list(JSONUtil.loads(results[0].knob_data.data).keys()) knob_labels = list(JSONUtil.loads(results[0].knob_data.data).keys())
metric_labels = list(JSONUtil.loads(results[0].metric_data.data).keys()) metric_labels = list(JSONUtil.loads(results[0].metric_data.data).keys())
X_matrix = np.empty((len(results), len(knob_labels)), dtype=float) X_matrix = []
y_matrix = np.empty((len(results), len(metric_labels)), dtype=float) y_matrix = []
rowlabels = np.empty(len(results), dtype=int) rowlabels = []
for i, result in enumerate(results): for result in results:
if 'range_test' in result.metric_data.name: if any(symbol in result.metric_data.name for symbol in ignore):
continue continue
param_data = JSONUtil.loads(result.knob_data.data) param_data = JSONUtil.loads(result.knob_data.data)
if len(param_data) != len(knob_labels): if len(param_data) != len(knob_labels):
@ -167,13 +169,13 @@ class DataUtil(object):
("Incorrect number of metrics " ("Incorrect number of metrics "
"(expected={}, actual={})").format(len(metric_labels), "(expected={}, actual={})").format(len(metric_labels),
len(metric_data))) len(metric_data)))
X_matrix[i, :] = [param_data[l] for l in knob_labels] X_matrix.append([param_data[l] for l in knob_labels])
y_matrix[i, :] = [metric_data[l] for l in metric_labels] y_matrix.append([metric_data[l] for l in metric_labels])
rowlabels[i] = result.pk rowlabels.append(result.pk)
return { return {
'X_matrix': X_matrix, 'X_matrix': np.array(X_matrix, dtype=np.float64),
'y_matrix': y_matrix, 'y_matrix': np.array(y_matrix, dtype=np.float64),
'rowlabels': rowlabels.tolist(), 'rowlabels': rowlabels,
'X_columnlabels': knob_labels, 'X_columnlabels': knob_labels,
'y_columnlabels': metric_labels, 'y_columnlabels': metric_labels,
} }

View File

@ -495,13 +495,17 @@ def handle_result_files(session, files, execution_times=None):
continue continue
target_value = JSONUtil.loads(past_metric.data)[session.target_objective] target_value = JSONUtil.loads(past_metric.data)[session.target_objective]
if metric_meta.improvement == target_objectives.MORE_IS_BETTER: if metric_meta.improvement == target_objectives.MORE_IS_BETTER:
if worst_target_value is None or target_value < worst_target_value: if '*' in worst_metric.name or target_value < worst_target_value:
worst_target_value = target_value worst_target_value = target_value
worst_metric = past_metric worst_metric = past_metric
else: else:
if worst_target_value is None or target_value > worst_target_value: if '*' in worst_metric.name or target_value > worst_target_value:
worst_target_value = target_value worst_target_value = target_value
worst_metric = past_metric worst_metric = past_metric
if '*' in worst_metric.name:
LOG.debug("All previous results are invalid")
penalty_target_value = worst_target_value
else:
LOG.debug("Worst target value so far is: %d", worst_target_value) LOG.debug("Worst target value so far is: %d", worst_target_value)
penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2) penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2)
if metric_meta.improvement == target_objectives.MORE_IS_BETTER: if metric_meta.improvement == target_objectives.MORE_IS_BETTER:
@ -647,6 +651,11 @@ def handle_result_files(session, files, execution_times=None):
# We tag the metric as invalid, so later they will be set to the worst result # We tag the metric as invalid, so later they will be set to the worst result
metric_data.name = 'range_test_' + metric_data.name + '*' metric_data.name = 'range_test_' + metric_data.name + '*'
metric_data.save() metric_data.save()
if 'status' in summary and summary['status'] == "default":
# The metric should not be used for learning because the driver did not run workload
# We tag the metric as invalid, so later they will be set to the worst result
metric_data.name = 'default_' + metric_data.name
metric_data.save()
# Create a new workload if this one does not already exist # Create a new workload if this one does not already exist
workload = Workload.objects.create_workload( workload = Workload.objects.create_workload(
@ -1239,6 +1248,7 @@ def give_result(request, upload_code): # pylint: disable=unused-argument
elif group_res.ready(): elif group_res.ready():
assert group_res.successful() assert group_res.successful()
latest_result = Result.objects.filter(session=session).latest('creation_time')
next_config = JSONUtil.loads(latest_result.next_configuration) next_config = JSONUtil.loads(latest_result.next_configuration)
response.update( response.update(
next_config, celery_status='SUCCESS', next_config, celery_status='SUCCESS',