Refactoring: move preprocessing to a new celery task

This commit is contained in:
yangdsh 2020-03-06 01:55:13 +00:00 committed by Dana Van Aken
parent cebc958666
commit 029ad0f633
4 changed files with 178 additions and 124 deletions

View File

@ -158,4 +158,4 @@ class SessionKnobForm(forms.ModelForm):
class Meta: # pylint: disable=no-init
model = SessionKnob
fields = ['session', 'knob', 'minval', 'maxval', 'tunable']
fields = ['session', 'knob', 'minval', 'maxval', 'lowerbound', 'upperbound', 'tunable']

View File

@ -179,17 +179,19 @@ def save_execution_time(start_ts, fn, result):
def choose_value_in_range(num1, num2):
if num1 < 1:
num1 = num1 + 1
if num2 < 1:
num2 = num2 + 1
log_num1 = np.log(num1)
log_num2 = np.log(num2)
# It is important to add 1 to avoid log(0)
log_num1 = np.log(num1 + 1)
log_num2 = np.log(num2 + 1)
return np.exp((log_num1 + log_num2) / 2)
def test_knob_range(knob_info, newest_result, good_val, bad_val, mode):
def calc_next_knob_range(algorithm, knob_info, newest_result, good_val, bad_val, mode):
session = newest_result.session
# The metric should not be used for learning because the driver runs nothing
# We tag the metric as invalid, so later they will be set to the worst result
metric_file = newest_result.metric_data
metric_file.name = metric_file.name + '*'
metric_file.save()
knob = KnobCatalog.objects.get(name=knob_info['name'], dbms=session.dbms)
knob_file = newest_result.knob_data
knob_values = JSONUtil.loads(knob_file.data)
@ -225,9 +227,13 @@ def test_knob_range(knob_info, newest_result, good_val, bad_val, mode):
else:
if mode == 'lowerbound':
session_knob.minval = str(int(expected_value))
# Terminate the search if the observed value is very different from the set one
if expected_value < last_value / 10:
session_knob.minval = str(int(last_value))
session_knob.lowerbound = str(int(last_value))
session_knob.save()
# The return value means we will not generate next config to test this knob
return False, None
else:
session_knob.maxval = str(int(expected_value))
next_value = choose_value_in_range(expected_value, bad_val)
@ -241,43 +247,50 @@ def test_knob_range(knob_info, newest_result, good_val, bad_val, mode):
knobs = SessionKnob.objects.get_knobs_for_session(session)
next_config = gen_test_maxval_data(knobs, knob.name, next_value)
agg_data = DataUtil.aggregate_data(Result.objects.filter(pk=newest_result.pk))
agg_data['newest_result_id'] = newest_result.pk
agg_data['status'] = 'range_test'
agg_data['config_recommend'] = next_config
LOG.debug('Testing %s of %s.\n\ndata=%s\n', mode, knob.name,
JSONUtil.dumps(agg_data, pprint=True))
save_execution_time(time.time(), "aggregate_target_results", newest_result)
return agg_data
target_data = {}
target_data['newest_result_id'] = newest_result.pk
target_data['status'] = 'range_test'
target_data['config_recommend'] = next_config
LOG.debug('%s: Generated a config to test %s of %s.\n\ndata=%s\n',
AlgorithmType.name(algorithm), mode, knob.name,
JSONUtil.dumps(target_data, pprint=True))
return True, target_data
@shared_task(base=IgnoreResultTask, name='aggregate_target_results')
def aggregate_target_results(result_id, algorithm):
@shared_task(base=IgnoreResultTask, name='preprocessing')
def preprocessing(result_id, algorithm):
start_ts = time.time()
agg_data = DataUtil.aggregate_data(Result.objects.filter(pk=result_id))
target_data = {}
target_data['newest_result_id'] = result_id
newest_result = Result.objects.get(pk=result_id)
session = newest_result.session
knobs = SessionKnob.objects.get_knobs_for_session(session)
# Check that the minvals of tunable knobs are all decided
for knob_info in knobs:
if 'lowerbound' in knob_info and knob_info['lowerbound'] is not None:
if knob_info.get('lowerbound', None) is not None:
lowerbound = float(knob_info['lowerbound'])
minval = float(knob_info['minval'])
if lowerbound < minval * 0.7:
# We need to do binary search to determine the minval of this knob
return test_knob_range(knob_info,
newest_result, minval, lowerbound, 'lowerbound'), algorithm
successful, target_data = calc_next_knob_range(
algorithm, knob_info, newest_result, minval, lowerbound, 'lowerbound')
if successful:
save_execution_time(start_ts, "preprocessing", newest_result)
return result_id, algorithm, target_data
# Check that the maxvals of tunable knobs are all decided
for knob_info in knobs:
if 'upperbound' in knob_info and knob_info['upperbound'] is not None:
if knob_info.get('upperbound', None) is not None:
upperbound = float(knob_info['upperbound'])
maxval = float(knob_info['maxval'])
if upperbound > maxval * 1.5:
if upperbound > maxval * 1.3:
# We need to do binary search to determine the maxval of this knob
return test_knob_range(knob_info,
newest_result, maxval, upperbound, 'upperbound'), algorithm
successful, target_data = calc_next_knob_range(
algorithm, knob_info, newest_result, maxval, upperbound, 'upperbound')
if successful:
save_execution_time(start_ts, "preprocessing", newest_result)
return result_id, algorithm, target_data
# Check that we've completed the background tasks at least once. We need
# this data in order to make a configuration recommendation (until we
@ -296,44 +309,58 @@ def aggregate_target_results(result_id, algorithm):
LOG.debug('%s: Generated LHS.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(all_samples[:5], pprint=True))
samples = all_samples.pop()
agg_data['newest_result_id'] = result_id
agg_data['status'] = 'lhs'
agg_data['config_recommend'] = samples
target_data['status'] = 'lhs'
target_data['config_recommend'] = samples
session.lhs_samples = JSONUtil.dumps(all_samples)
session.save()
LOG.debug('%s: Got LHS config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(agg_data, pprint=True))
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
elif session.tuning_session == 'randomly_generate':
# generate a config randomly
random_knob_result = gen_random_data(knobs)
agg_data['newest_result_id'] = result_id
agg_data['status'] = 'random'
agg_data['config_recommend'] = random_knob_result
LOG.debug('%s: Finished generating a random config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(agg_data, pprint=True))
target_data['status'] = 'random'
target_data['config_recommend'] = random_knob_result
LOG.debug('%s: Generated a random config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
else:
# Aggregate all knob config results tried by the target so far in this
# tuning session and this tuning workload.
target_results = Result.objects.filter(session=session,
dbms=newest_result.dbms,
workload=newest_result.workload)
if len(target_results) == 0:
raise Exception('Cannot find any results for session_id={}, dbms_id={}'
.format(session, newest_result.dbms))
agg_data = DataUtil.aggregate_data(target_results)
agg_data['newest_result_id'] = result_id
agg_data['status'] = 'good'
save_execution_time(start_ts, "preprocessing", newest_result)
return result_id, algorithm, target_data
# Clean knob data
cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'],
session)
agg_data['X_matrix'] = np.array(cleaned_agg_data[0])
agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1])
LOG.debug('%s: Finished aggregating target results.\n\n',
AlgorithmType.name(algorithm))
@shared_task(base=IgnoreResultTask, name='aggregate_target_results')
def aggregate_target_results(aggregate_target_results_input):
start_ts = time.time()
result_id, algorithm, target_data = aggregate_target_results_input
# If the preprocessing method has already generated a config, bypass this method.
if 'config_recommend' in target_data:
assert 'newest_result_id' in target_data and 'status' in target_data
LOG.debug('%s: Skipping aggregate_target_results.\nData:\n%s\n\n',
AlgorithmType.name(algorithm), target_data)
return target_data, algorithm
newest_result = Result.objects.get(pk=result_id)
session = newest_result.session
# Aggregate all knob config results tried by the target so far in this
# tuning session and this tuning workload.
target_results = Result.objects.filter(session=session,
dbms=newest_result.dbms,
workload=newest_result.workload)
if len(target_results) == 0:
raise Exception('Cannot find any results for session_id={}, dbms_id={}'
.format(session, newest_result.dbms))
agg_data = DataUtil.aggregate_data(target_results)
agg_data['newest_result_id'] = result_id
agg_data['status'] = 'good'
# Clean knob data
cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'],
session)
agg_data['X_matrix'] = np.array(cleaned_agg_data[0])
agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1])
LOG.debug('%s: Finished aggregating target results.\n\n',
AlgorithmType.name(algorithm))
save_execution_time(start_ts, "aggregate_target_results", Result.objects.get(pk=result_id))
return agg_data, algorithm
@ -433,16 +460,23 @@ def gen_lhs_samples(knobs, nsamples):
@shared_task(base=IgnoreResultTask, name='train_ddpg')
def train_ddpg(result_id):
def train_ddpg(train_ddpg_input):
start_ts = time.time()
result_id, algorithm, target_data = train_ddpg_input
# If the preprocessing method has already generated a config, bypass this method.
if 'config_recommend' in target_data:
assert 'newest_result_id' in target_data and 'status' in target_data
LOG.debug('Skipping training DDPG.\nData:\n%s\n\n', target_data)
return target_data, algorithm
LOG.info('Add training data to ddpg and train ddpg')
result = Result.objects.get(pk=result_id)
session = Result.objects.get(pk=result_id).session
session = result.session
params = JSONUtil.loads(session.hyperparameters)
session_results = Result.objects.filter(session=session,
creation_time__lt=result.creation_time)
result_info = {}
result_info['newest_result_id'] = result_id
target_data = {}
target_data['newest_result_id'] = result_id
# Extract data from result and previous results
result = Result.objects.filter(pk=result_id)
@ -459,8 +493,7 @@ def train_ddpg(result_id):
base_metric_data = (DataUtil.aggregate_data(base_result))['y_matrix'].flatten()
prev_metric_data = (DataUtil.aggregate_data(prev_result))['y_matrix'].flatten()
result = Result.objects.get(pk=result_id)
target_objective = result.session.target_objective
target_objective = session.target_objective
prev_obj_idx = [i for i, n in enumerate(agg_data['y_columnlabels']) if n == target_objective]
# Clean metric data
@ -492,8 +525,8 @@ def train_ddpg(result_id):
objective = metric_data[target_obj_idx]
base_objective = base_metric_data[prev_obj_idx]
prev_objective = prev_metric_data[prev_obj_idx]
metric_meta = db.target_objectives.get_metric_metadata(
result.session.dbms.pk, result.session.target_objective)
metric_meta = db.target_objectives.get_metric_metadata(session.dbms.pk,
session.target_objective)
# Calculate the reward
if params['DDPG_SIMPLE_REWARD']:
@ -535,7 +568,7 @@ def train_ddpg(result_id):
session.ddpg_reply_memory = ddpg.replay_memory.get()
save_execution_time(start_ts, "train_ddpg", Result.objects.get(pk=result_id))
session.save()
return result_info
return target_data, algorithm
def create_and_save_recommendation(recommended_knobs, result, status, **kwargs):
@ -555,11 +588,38 @@ def create_and_save_recommendation(recommended_knobs, result, status, **kwargs):
return retval
def check_early_return(target_data, algorithm):
result_id = target_data['newest_result_id']
newest_result = Result.objects.get(pk=result_id)
if target_data.get('status', 'good') != 'good': # No status or status is not 'good'
if target_data['status'] == 'random':
info = 'The config is generated by Random'
elif target_data['status'] == 'lhs':
info = 'The config is generated by LHS'
elif target_data['status'] == 'range_test':
info = 'Searching for valid knob ranges'
else:
info = 'Unknown'
target_data_res = create_and_save_recommendation(
recommended_knobs=target_data['config_recommend'], result=newest_result,
status=target_data['status'], info=info, pipeline_run=None)
LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n',
AlgorithmType.name(algorithm), target_data)
return True, target_data_res
return False, None
@shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg')
def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name
def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: disable=invalid-name
start_ts = time.time()
LOG.info('Use ddpg to recommend configuration')
result_id = result_info['newest_result_id']
LOG.info('configuration_recommendation called (DDPG)')
target_data, algorithm = recommendation_ddpg_input
early_return, target_data_res = check_early_return(target_data, algorithm)
if early_return:
return target_data_res
result_id = target_data['newest_result_id']
result_list = Result.objects.filter(pk=result_id)
result = result_list.first()
session = result.session
@ -588,11 +648,11 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n
knob_data = MinMaxScaler().fit(knob_bounds).inverse_transform(knob_data.reshape(1, -1))[0]
conf_map = {k: knob_data[i] for i, k in enumerate(knob_labels)}
conf_map_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result,
status='good', info='INFO: ddpg')
target_data_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result,
status='good', info='INFO: ddpg')
save_execution_time(start_ts, "configuration_recommendation_ddpg", result)
return conf_map_res
return target_data_res
def combine_workload(target_data):
@ -796,28 +856,15 @@ def configuration_recommendation(recommendation_input):
start_ts = time.time()
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called')
early_return, target_data_res = check_early_return(target_data, algorithm)
if early_return:
return target_data_res
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
session = newest_result.session
params = JSONUtil.loads(session.hyperparameters)
if target_data['status'] != 'good':
LOG.info(target_data['status'])
if target_data['status'] == 'random':
info = 'The config is generated by Random'
elif target_data['status'] == 'lhs':
info = 'The config is generated by LHS'
elif target_data['status'] == 'range_test':
info = 'Searching for the valid ranges of knobs'
else:
info = 'Unknown'
target_data_res = create_and_save_recommendation(
recommended_knobs=target_data['config_recommend'], result=newest_result,
status=target_data['status'], info=info,
pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n',
AlgorithmType.name(algorithm), target_data)
return target_data_res
X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\
dummy_encoder, constraint_helper, pipeline_knobs,\
pipeline_metrics = combine_workload(target_data)
@ -950,9 +997,8 @@ def map_workload(map_workload_input):
start_ts = time.time()
target_data, algorithm = map_workload_input
assert target_data is not None
if target_data['status'] != 'good':
assert target_data is not None
target_data['pipeline_run'] = None
LOG.debug('%s: Skipping workload mapping.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))

View File

@ -9,8 +9,10 @@
style="display:block; overflow-y:scroll; height:500px">
<tr>
<td>Name</td>
<td>Min</td>
<td>Min</td>
<td>Max</td>
<td>Lowerbound</td>
<td>Upperbound</td>
<td>Tunable</td>
<td></td>
</tr>
@ -21,6 +23,8 @@
<td>{{ form.name }}</td>
<td>{{ form.minval }}</td>
<td>{{ form.maxval }}</td>
<td>{{ form.lowerbound }}</td>
<td>{{ form.upperbound }}</td>
<td>{{ form.tunable }}</td>
<td><button type="submit">Save</button></td>
</tr>

View File

@ -484,41 +484,42 @@ def handle_result_files(session, files, execution_times=None):
# Load the contents of the controller's summary file
summary = JSONUtil.loads(files['summary'])
# Find worst throughput
past_metrics = MetricData.objects.filter(session=session)
metric_meta = target_objectives.get_instance(session.dbms.pk, session.target_objective)
worst_target_value = None
worst_metric = None
for past_metric in past_metrics:
if '*' in past_metric.name:
continue
target_value = JSONUtil.loads(past_metric.data)[session.target_objective]
if metric_meta.improvement == target_objectives.MORE_IS_BETTER:
if worst_target_value is None or target_value < worst_target_value:
worst_target_value = target_value
worst_metric = past_metric
else:
if worst_target_value is None or target_value > worst_target_value:
worst_target_value = target_value
worst_metric = past_metric
LOG.debug("Worst target value so far is: %d", worst_target_value)
penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2)
if metric_meta.improvement == target_objectives.MORE_IS_BETTER:
penalty_target_value = worst_target_value / penalty_factor
else:
penalty_target_value = worst_target_value * penalty_factor
# Update the past invalid results
for past_metric in past_metrics:
if '*' in past_metric.name:
past_metric_data = JSONUtil.loads(past_metric.data)
past_metric_data[session.target_objective] = penalty_target_value
past_metric.data = JSONUtil.dumps(past_metric_data)
past_metric.save()
# If database crashed on restart, pull latest result and worst throughput so far
if 'error' in summary and summary['error'] == "DB_RESTART_ERROR":
LOG.debug("Error in restarting database")
# Find worst throughput
past_metrics = MetricData.objects.filter(session=session)
metric_meta = target_objectives.get_instance(session.dbms.pk, session.target_objective)
worst_target_value = None
worst_metric = None
for past_metric in past_metrics:
if '*' in past_metric.name:
continue
target_value = JSONUtil.loads(past_metric.data)[session.target_objective]
if metric_meta.improvement == target_objectives.MORE_IS_BETTER:
if worst_target_value is None or target_value < worst_target_value:
worst_target_value = target_value
worst_metric = past_metric
else:
if worst_target_value is None or target_value > worst_target_value:
worst_target_value = target_value
worst_metric = past_metric
LOG.debug("Worst target value so far is: %d", worst_target_value)
penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2)
if metric_meta.improvement == target_objectives.MORE_IS_BETTER:
penalty_target_value = worst_target_value / penalty_factor
else:
penalty_target_value = worst_target_value * penalty_factor
# Update the past invalid results
for past_metric in past_metrics:
if '*' in past_metric.name:
past_metric_data = JSONUtil.loads(past_metric.data)
past_metric_data[session.target_objective] = penalty_target_value
past_metric.data = JSONUtil.dumps(past_metric_data)
past_metric.save()
worst_result = Result.objects.filter(metric_data=worst_metric).first()
last_result = Result.objects.filter(session=session).order_by("-id").first()
@ -677,18 +678,21 @@ def handle_result_files(session, files, execution_times=None):
response = None
if session.algorithm == AlgorithmType.GPR:
subtask_list = [
('aggregate_target_results', (result_id, session.algorithm)),
('preprocessing', (result_id, session.algorithm)),
('aggregate_target_results', ()),
('map_workload', ()),
('configuration_recommendation', ()),
]
elif session.algorithm == AlgorithmType.DDPG:
subtask_list = [
('train_ddpg', (result_id,)),
('preprocessing', (result_id, session.algorithm)),
('train_ddpg', ()),
('configuration_recommendation_ddpg', ()),
]
elif session.algorithm == AlgorithmType.DNN:
subtask_list = [
('aggregate_target_results', (result_id, session.algorithm)),
('preprocessing', (result_id, session.algorithm)),
('aggregate_target_results', ()),
('map_workload', ()),
('configuration_recommendation', ()),
]