diff --git a/server/website/website/forms.py b/server/website/website/forms.py index d4e0ecb..a8ccc0a 100644 --- a/server/website/website/forms.py +++ b/server/website/website/forms.py @@ -158,4 +158,4 @@ class SessionKnobForm(forms.ModelForm): class Meta: # pylint: disable=no-init model = SessionKnob - fields = ['session', 'knob', 'minval', 'maxval', 'tunable'] + fields = ['session', 'knob', 'minval', 'maxval', 'lowerbound', 'upperbound', 'tunable'] diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 35989b1..b5d4157 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -179,17 +179,19 @@ def save_execution_time(start_ts, fn, result): def choose_value_in_range(num1, num2): - if num1 < 1: - num1 = num1 + 1 - if num2 < 1: - num2 = num2 + 1 - log_num1 = np.log(num1) - log_num2 = np.log(num2) + # It is important to add 1 to avoid log(0) + log_num1 = np.log(num1 + 1) + log_num2 = np.log(num2 + 1) return np.exp((log_num1 + log_num2) / 2) -def test_knob_range(knob_info, newest_result, good_val, bad_val, mode): +def calc_next_knob_range(algorithm, knob_info, newest_result, good_val, bad_val, mode): session = newest_result.session + # The metric should not be used for learning because the driver runs nothing + # We tag the metric as invalid, so later they will be set to the worst result + metric_file = newest_result.metric_data + metric_file.name = metric_file.name + '*' + metric_file.save() knob = KnobCatalog.objects.get(name=knob_info['name'], dbms=session.dbms) knob_file = newest_result.knob_data knob_values = JSONUtil.loads(knob_file.data) @@ -225,9 +227,13 @@ def test_knob_range(knob_info, newest_result, good_val, bad_val, mode): else: if mode == 'lowerbound': session_knob.minval = str(int(expected_value)) + # Terminate the search if the observed value is very different from the set one if expected_value < last_value / 10: session_knob.minval = str(int(last_value)) session_knob.lowerbound = str(int(last_value)) + session_knob.save() + # The return value means we will not generate next config to test this knob + return False, None else: session_knob.maxval = str(int(expected_value)) next_value = choose_value_in_range(expected_value, bad_val) @@ -241,43 +247,50 @@ def test_knob_range(knob_info, newest_result, good_val, bad_val, mode): knobs = SessionKnob.objects.get_knobs_for_session(session) next_config = gen_test_maxval_data(knobs, knob.name, next_value) - agg_data = DataUtil.aggregate_data(Result.objects.filter(pk=newest_result.pk)) - agg_data['newest_result_id'] = newest_result.pk - agg_data['status'] = 'range_test' - agg_data['config_recommend'] = next_config - LOG.debug('Testing %s of %s.\n\ndata=%s\n', mode, knob.name, - JSONUtil.dumps(agg_data, pprint=True)) - save_execution_time(time.time(), "aggregate_target_results", newest_result) - return agg_data + target_data = {} + target_data['newest_result_id'] = newest_result.pk + target_data['status'] = 'range_test' + target_data['config_recommend'] = next_config + LOG.debug('%s: Generated a config to test %s of %s.\n\ndata=%s\n', + AlgorithmType.name(algorithm), mode, knob.name, + JSONUtil.dumps(target_data, pprint=True)) + return True, target_data -@shared_task(base=IgnoreResultTask, name='aggregate_target_results') -def aggregate_target_results(result_id, algorithm): +@shared_task(base=IgnoreResultTask, name='preprocessing') +def preprocessing(result_id, algorithm): start_ts = time.time() - agg_data = DataUtil.aggregate_data(Result.objects.filter(pk=result_id)) + target_data = {} + target_data['newest_result_id'] = result_id newest_result = Result.objects.get(pk=result_id) session = newest_result.session knobs = SessionKnob.objects.get_knobs_for_session(session) # Check that the minvals of tunable knobs are all decided for knob_info in knobs: - if 'lowerbound' in knob_info and knob_info['lowerbound'] is not None: + if knob_info.get('lowerbound', None) is not None: lowerbound = float(knob_info['lowerbound']) minval = float(knob_info['minval']) if lowerbound < minval * 0.7: # We need to do binary search to determine the minval of this knob - return test_knob_range(knob_info, - newest_result, minval, lowerbound, 'lowerbound'), algorithm + successful, target_data = calc_next_knob_range( + algorithm, knob_info, newest_result, minval, lowerbound, 'lowerbound') + if successful: + save_execution_time(start_ts, "preprocessing", newest_result) + return result_id, algorithm, target_data # Check that the maxvals of tunable knobs are all decided for knob_info in knobs: - if 'upperbound' in knob_info and knob_info['upperbound'] is not None: + if knob_info.get('upperbound', None) is not None: upperbound = float(knob_info['upperbound']) maxval = float(knob_info['maxval']) - if upperbound > maxval * 1.5: + if upperbound > maxval * 1.3: # We need to do binary search to determine the maxval of this knob - return test_knob_range(knob_info, - newest_result, maxval, upperbound, 'upperbound'), algorithm + successful, target_data = calc_next_knob_range( + algorithm, knob_info, newest_result, maxval, upperbound, 'upperbound') + if successful: + save_execution_time(start_ts, "preprocessing", newest_result) + return result_id, algorithm, target_data # Check that we've completed the background tasks at least once. We need # this data in order to make a configuration recommendation (until we @@ -296,44 +309,58 @@ def aggregate_target_results(result_id, algorithm): LOG.debug('%s: Generated LHS.\n\ndata=%s\n', AlgorithmType.name(algorithm), JSONUtil.dumps(all_samples[:5], pprint=True)) samples = all_samples.pop() - agg_data['newest_result_id'] = result_id - agg_data['status'] = 'lhs' - agg_data['config_recommend'] = samples + target_data['status'] = 'lhs' + target_data['config_recommend'] = samples session.lhs_samples = JSONUtil.dumps(all_samples) session.save() LOG.debug('%s: Got LHS config.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(agg_data, pprint=True)) + AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) elif session.tuning_session == 'randomly_generate': # generate a config randomly random_knob_result = gen_random_data(knobs) - agg_data['newest_result_id'] = result_id - agg_data['status'] = 'random' - agg_data['config_recommend'] = random_knob_result - LOG.debug('%s: Finished generating a random config.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(agg_data, pprint=True)) + target_data['status'] = 'random' + target_data['config_recommend'] = random_knob_result + LOG.debug('%s: Generated a random config.\n\ndata=%s\n', + AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) - else: - # Aggregate all knob config results tried by the target so far in this - # tuning session and this tuning workload. - target_results = Result.objects.filter(session=session, - dbms=newest_result.dbms, - workload=newest_result.workload) - if len(target_results) == 0: - raise Exception('Cannot find any results for session_id={}, dbms_id={}' - .format(session, newest_result.dbms)) - agg_data = DataUtil.aggregate_data(target_results) - agg_data['newest_result_id'] = result_id - agg_data['status'] = 'good' + save_execution_time(start_ts, "preprocessing", newest_result) + return result_id, algorithm, target_data - # Clean knob data - cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], - session) - agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) - agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) - LOG.debug('%s: Finished aggregating target results.\n\n', - AlgorithmType.name(algorithm)) +@shared_task(base=IgnoreResultTask, name='aggregate_target_results') +def aggregate_target_results(aggregate_target_results_input): + start_ts = time.time() + result_id, algorithm, target_data = aggregate_target_results_input + # If the preprocessing method has already generated a config, bypass this method. + if 'config_recommend' in target_data: + assert 'newest_result_id' in target_data and 'status' in target_data + LOG.debug('%s: Skipping aggregate_target_results.\nData:\n%s\n\n', + AlgorithmType.name(algorithm), target_data) + return target_data, algorithm + + newest_result = Result.objects.get(pk=result_id) + session = newest_result.session + # Aggregate all knob config results tried by the target so far in this + # tuning session and this tuning workload. + target_results = Result.objects.filter(session=session, + dbms=newest_result.dbms, + workload=newest_result.workload) + if len(target_results) == 0: + raise Exception('Cannot find any results for session_id={}, dbms_id={}' + .format(session, newest_result.dbms)) + agg_data = DataUtil.aggregate_data(target_results) + agg_data['newest_result_id'] = result_id + agg_data['status'] = 'good' + + # Clean knob data + cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], + session) + agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) + agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) + + LOG.debug('%s: Finished aggregating target results.\n\n', + AlgorithmType.name(algorithm)) save_execution_time(start_ts, "aggregate_target_results", Result.objects.get(pk=result_id)) return agg_data, algorithm @@ -433,16 +460,23 @@ def gen_lhs_samples(knobs, nsamples): @shared_task(base=IgnoreResultTask, name='train_ddpg') -def train_ddpg(result_id): +def train_ddpg(train_ddpg_input): start_ts = time.time() + result_id, algorithm, target_data = train_ddpg_input + # If the preprocessing method has already generated a config, bypass this method. + if 'config_recommend' in target_data: + assert 'newest_result_id' in target_data and 'status' in target_data + LOG.debug('Skipping training DDPG.\nData:\n%s\n\n', target_data) + return target_data, algorithm + LOG.info('Add training data to ddpg and train ddpg') result = Result.objects.get(pk=result_id) - session = Result.objects.get(pk=result_id).session + session = result.session params = JSONUtil.loads(session.hyperparameters) session_results = Result.objects.filter(session=session, creation_time__lt=result.creation_time) - result_info = {} - result_info['newest_result_id'] = result_id + target_data = {} + target_data['newest_result_id'] = result_id # Extract data from result and previous results result = Result.objects.filter(pk=result_id) @@ -459,8 +493,7 @@ def train_ddpg(result_id): base_metric_data = (DataUtil.aggregate_data(base_result))['y_matrix'].flatten() prev_metric_data = (DataUtil.aggregate_data(prev_result))['y_matrix'].flatten() - result = Result.objects.get(pk=result_id) - target_objective = result.session.target_objective + target_objective = session.target_objective prev_obj_idx = [i for i, n in enumerate(agg_data['y_columnlabels']) if n == target_objective] # Clean metric data @@ -492,8 +525,8 @@ def train_ddpg(result_id): objective = metric_data[target_obj_idx] base_objective = base_metric_data[prev_obj_idx] prev_objective = prev_metric_data[prev_obj_idx] - metric_meta = db.target_objectives.get_metric_metadata( - result.session.dbms.pk, result.session.target_objective) + metric_meta = db.target_objectives.get_metric_metadata(session.dbms.pk, + session.target_objective) # Calculate the reward if params['DDPG_SIMPLE_REWARD']: @@ -535,7 +568,7 @@ def train_ddpg(result_id): session.ddpg_reply_memory = ddpg.replay_memory.get() save_execution_time(start_ts, "train_ddpg", Result.objects.get(pk=result_id)) session.save() - return result_info + return target_data, algorithm def create_and_save_recommendation(recommended_knobs, result, status, **kwargs): @@ -555,11 +588,38 @@ def create_and_save_recommendation(recommended_knobs, result, status, **kwargs): return retval +def check_early_return(target_data, algorithm): + result_id = target_data['newest_result_id'] + newest_result = Result.objects.get(pk=result_id) + if target_data.get('status', 'good') != 'good': # No status or status is not 'good' + if target_data['status'] == 'random': + info = 'The config is generated by Random' + elif target_data['status'] == 'lhs': + info = 'The config is generated by LHS' + elif target_data['status'] == 'range_test': + info = 'Searching for valid knob ranges' + else: + info = 'Unknown' + target_data_res = create_and_save_recommendation( + recommended_knobs=target_data['config_recommend'], result=newest_result, + status=target_data['status'], info=info, pipeline_run=None) + LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n', + AlgorithmType.name(algorithm), target_data) + return True, target_data_res + return False, None + + @shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg') -def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name +def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: disable=invalid-name start_ts = time.time() - LOG.info('Use ddpg to recommend configuration') - result_id = result_info['newest_result_id'] + LOG.info('configuration_recommendation called (DDPG)') + target_data, algorithm = recommendation_ddpg_input + + early_return, target_data_res = check_early_return(target_data, algorithm) + if early_return: + return target_data_res + + result_id = target_data['newest_result_id'] result_list = Result.objects.filter(pk=result_id) result = result_list.first() session = result.session @@ -588,11 +648,11 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n knob_data = MinMaxScaler().fit(knob_bounds).inverse_transform(knob_data.reshape(1, -1))[0] conf_map = {k: knob_data[i] for i, k in enumerate(knob_labels)} - conf_map_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result, - status='good', info='INFO: ddpg') + target_data_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result, + status='good', info='INFO: ddpg') save_execution_time(start_ts, "configuration_recommendation_ddpg", result) - return conf_map_res + return target_data_res def combine_workload(target_data): @@ -796,28 +856,15 @@ def configuration_recommendation(recommendation_input): start_ts = time.time() target_data, algorithm = recommendation_input LOG.info('configuration_recommendation called') + + early_return, target_data_res = check_early_return(target_data, algorithm) + if early_return: + return target_data_res + newest_result = Result.objects.get(pk=target_data['newest_result_id']) session = newest_result.session params = JSONUtil.loads(session.hyperparameters) - if target_data['status'] != 'good': - LOG.info(target_data['status']) - if target_data['status'] == 'random': - info = 'The config is generated by Random' - elif target_data['status'] == 'lhs': - info = 'The config is generated by LHS' - elif target_data['status'] == 'range_test': - info = 'Searching for the valid ranges of knobs' - else: - info = 'Unknown' - target_data_res = create_and_save_recommendation( - recommended_knobs=target_data['config_recommend'], result=newest_result, - status=target_data['status'], info=info, - pipeline_run=target_data['pipeline_run']) - LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n', - AlgorithmType.name(algorithm), target_data) - return target_data_res - X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\ dummy_encoder, constraint_helper, pipeline_knobs,\ pipeline_metrics = combine_workload(target_data) @@ -950,9 +997,8 @@ def map_workload(map_workload_input): start_ts = time.time() target_data, algorithm = map_workload_input + assert target_data is not None if target_data['status'] != 'good': - assert target_data is not None - target_data['pipeline_run'] = None LOG.debug('%s: Skipping workload mapping.\n\ndata=%s\n', AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) diff --git a/server/website/website/templates/edit_knobs.html b/server/website/website/templates/edit_knobs.html index 94b621c..53bf8a6 100644 --- a/server/website/website/templates/edit_knobs.html +++ b/server/website/website/templates/edit_knobs.html @@ -9,8 +9,10 @@ style="display:block; overflow-y:scroll; height:500px"> Name - Min + Min Max + Lowerbound + Upperbound Tunable @@ -21,6 +23,8 @@ {{ form.name }} {{ form.minval }} {{ form.maxval }} + {{ form.lowerbound }} + {{ form.upperbound }} {{ form.tunable }} diff --git a/server/website/website/views.py b/server/website/website/views.py index a6ab667..2043dfd 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -484,41 +484,42 @@ def handle_result_files(session, files, execution_times=None): # Load the contents of the controller's summary file summary = JSONUtil.loads(files['summary']) + # Find worst throughput + past_metrics = MetricData.objects.filter(session=session) + metric_meta = target_objectives.get_instance(session.dbms.pk, session.target_objective) + worst_target_value = None + worst_metric = None + for past_metric in past_metrics: + if '*' in past_metric.name: + continue + target_value = JSONUtil.loads(past_metric.data)[session.target_objective] + if metric_meta.improvement == target_objectives.MORE_IS_BETTER: + if worst_target_value is None or target_value < worst_target_value: + worst_target_value = target_value + worst_metric = past_metric + else: + if worst_target_value is None or target_value > worst_target_value: + worst_target_value = target_value + worst_metric = past_metric + LOG.debug("Worst target value so far is: %d", worst_target_value) + penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2) + if metric_meta.improvement == target_objectives.MORE_IS_BETTER: + penalty_target_value = worst_target_value / penalty_factor + else: + penalty_target_value = worst_target_value * penalty_factor + + # Update the past invalid results + for past_metric in past_metrics: + if '*' in past_metric.name: + past_metric_data = JSONUtil.loads(past_metric.data) + past_metric_data[session.target_objective] = penalty_target_value + past_metric.data = JSONUtil.dumps(past_metric_data) + past_metric.save() + # If database crashed on restart, pull latest result and worst throughput so far if 'error' in summary and summary['error'] == "DB_RESTART_ERROR": LOG.debug("Error in restarting database") - # Find worst throughput - past_metrics = MetricData.objects.filter(session=session) - metric_meta = target_objectives.get_instance(session.dbms.pk, session.target_objective) - worst_target_value = None - worst_metric = None - for past_metric in past_metrics: - if '*' in past_metric.name: - continue - target_value = JSONUtil.loads(past_metric.data)[session.target_objective] - if metric_meta.improvement == target_objectives.MORE_IS_BETTER: - if worst_target_value is None or target_value < worst_target_value: - worst_target_value = target_value - worst_metric = past_metric - else: - if worst_target_value is None or target_value > worst_target_value: - worst_target_value = target_value - worst_metric = past_metric - LOG.debug("Worst target value so far is: %d", worst_target_value) - penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2) - if metric_meta.improvement == target_objectives.MORE_IS_BETTER: - penalty_target_value = worst_target_value / penalty_factor - else: - penalty_target_value = worst_target_value * penalty_factor - - # Update the past invalid results - for past_metric in past_metrics: - if '*' in past_metric.name: - past_metric_data = JSONUtil.loads(past_metric.data) - past_metric_data[session.target_objective] = penalty_target_value - past_metric.data = JSONUtil.dumps(past_metric_data) - past_metric.save() worst_result = Result.objects.filter(metric_data=worst_metric).first() last_result = Result.objects.filter(session=session).order_by("-id").first() @@ -677,18 +678,21 @@ def handle_result_files(session, files, execution_times=None): response = None if session.algorithm == AlgorithmType.GPR: subtask_list = [ - ('aggregate_target_results', (result_id, session.algorithm)), + ('preprocessing', (result_id, session.algorithm)), + ('aggregate_target_results', ()), ('map_workload', ()), ('configuration_recommendation', ()), ] elif session.algorithm == AlgorithmType.DDPG: subtask_list = [ - ('train_ddpg', (result_id,)), + ('preprocessing', (result_id, session.algorithm)), + ('train_ddpg', ()), ('configuration_recommendation_ddpg', ()), ] elif session.algorithm == AlgorithmType.DNN: subtask_list = [ - ('aggregate_target_results', (result_id, session.algorithm)), + ('preprocessing', (result_id, session.algorithm)), + ('aggregate_target_results', ()), ('map_workload', ()), ('configuration_recommendation', ()), ]