From 82a7d859c293dd06d161f5772c7d0dd3214f22e1 Mon Sep 17 00:00:00 2001 From: bohanjason Date: Sat, 28 Sep 2019 00:23:35 -0400 Subject: [PATCH] choose algorithm based on option --- server/website/website/tasks/async_tasks.py | 75 +++++++++++---------- server/website/website/views.py | 4 +- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 7e67a97..3d0408d 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -78,10 +78,10 @@ class MapWorkload(UpdateTask): # pylint: disable=abstract-method super(MapWorkload, self).on_success(retval, task_id, args, kwargs) # Replace result with formatted result - if not args[0]['bad']: + if not args[0][0]['bad']: new_res = { - 'scores': sorted(args[0]['scores'].items()), - 'mapped_workload_id': args[0]['mapped_workload'], + 'scores': sorted(args[0][0]['scores'].items()), + 'mapped_workload_id': args[0][0]['mapped_workload'], } task_meta = TaskMeta.objects.get(task_id=task_id) task_meta.result = new_res # Only store scores @@ -97,7 +97,7 @@ class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-metho def on_success(self, retval, task_id, args, kwargs): super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs) - result_id = args[0]['newest_result_id'] + result_id = args[0][0]['newest_result_id'] result = Result.objects.get(pk=result_id) # Replace result with formatted result @@ -141,7 +141,7 @@ def clean_knob_data(knob_matrix, knob_labels, session): @task(base=AggregateTargetResults, name='aggregate_target_results') -def aggregate_target_results(result_id): +def aggregate_target_results(result_id, algorithm='gpr'): # Check that we've completed the background tasks at least once. We need # this data in order to make a configuration recommendation (until we # implement a sampling technique to generate new training data). @@ -159,7 +159,7 @@ def aggregate_target_results(result_id): agg_data['newest_result_id'] = result_id agg_data['bad'] = True agg_data['config_recommend'] = random_knob_result - return agg_data + return agg_data, algorithm # Aggregate all knob config results tried by the target so far in this # tuning session and this tuning workload. @@ -179,7 +179,7 @@ def aggregate_target_results(result_id): agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) - return agg_data + return agg_data, algorithm def gen_random_data(knobs): @@ -331,7 +331,8 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n @task(base=ConfigurationRecommendation, name='configuration_recommendation') -def configuration_recommendation(target_data): +def configuration_recommendation(recommendation_input): + target_data, algorithm = recommendation_input LOG.info('configuration_recommendation called') latest_pipeline_run = PipelineRun.objects.get_latest() @@ -544,30 +545,34 @@ def configuration_recommendation(target_data): project = newest_result.session.project.pk full_path = os.path.join(MODEL_DIR, 'p' + str(project) + '_s' + str(session) + '_nn.weights') - # neural network model - # FIXME: choose algorithm based on the session option - model_nn = NeuralNet(weights_file=full_path, - n_input=X_samples.shape[1], - batch_size=X_samples.shape[0], - explore_iters=500, - noise_scale_begin=0.1, - noise_scale_end=0, - debug=True) - model_nn.fit(X_scaled, y_scaled) - res = model_nn.recommend(X_samples, X_min, X_max, explore=True) + res = None + assert algorithm in ['gpr', 'dnn'] - model = GPRGD(length_scale=DEFAULT_LENGTH_SCALE, - magnitude=DEFAULT_MAGNITUDE, - max_train_size=MAX_TRAIN_SIZE, - batch_size=BATCH_SIZE, - num_threads=NUM_THREADS, - learning_rate=DEFAULT_LEARNING_RATE, - epsilon=DEFAULT_EPSILON, - max_iter=MAX_ITER, - sigma_multiplier=DEFAULT_SIGMA_MULTIPLIER, - mu_multiplier=DEFAULT_MU_MULTIPLIER) - # model.fit(X_scaled, y_scaled, X_min, X_max, ridge=DEFAULT_RIDGE) - # res = model.predict(X_samples, constraint_helper=constraint_helper) + if algorithm == 'dnn': + # neural network model + model_nn = NeuralNet(weights_file=full_path, + n_input=X_samples.shape[1], + batch_size=X_samples.shape[0], + explore_iters=500, + noise_scale_begin=0.1, + noise_scale_end=0, + debug=True) + model_nn.fit(X_scaled, y_scaled) + res = model_nn.recommend(X_samples, X_min, X_max, explore=True) + elif algorithm == 'gpr': + # default gpr model + model = GPRGD(length_scale=DEFAULT_LENGTH_SCALE, + magnitude=DEFAULT_MAGNITUDE, + max_train_size=MAX_TRAIN_SIZE, + batch_size=BATCH_SIZE, + num_threads=NUM_THREADS, + learning_rate=DEFAULT_LEARNING_RATE, + epsilon=DEFAULT_EPSILON, + max_iter=MAX_ITER, + sigma_multiplier=DEFAULT_SIGMA_MULTIPLIER, + mu_multiplier=DEFAULT_MU_MULTIPLIER) + model.fit(X_scaled, y_scaled, X_min, X_max, ridge=DEFAULT_RIDGE) + res = model.predict(X_samples, constraint_helper=constraint_helper) best_config_idx = np.argmin(res.minl.ravel()) best_config = res.minl_conf[best_config_idx, :] @@ -601,12 +606,13 @@ def load_data_helper(filtered_pipeline_data, workload, task_type): @task(base=MapWorkload, name='map_workload') -def map_workload(target_data): +def map_workload(map_workload_input): + target_data, algorithm = map_workload_input # Get the latest version of pipeline data that's been computed so far. latest_pipeline_run = PipelineRun.objects.get_latest() if target_data['bad']: assert target_data is not None - return target_data + return target_data, algorithm assert latest_pipeline_run is not None newest_result = Result.objects.get(pk=target_data['newest_result_id']) @@ -753,5 +759,4 @@ def map_workload(target_data): target_data['mapped_workload'] = (best_workload_id, best_workload_name, best_score) target_data['scores'] = scores_info - return target_data -# + return target_data, algorithm diff --git a/server/website/website/views.py b/server/website/website/views.py index 3fdb363..793db20 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -536,7 +536,9 @@ def handle_result_files(session, files): response = chain(train_ddpg.s(result.pk), configuration_recommendation_ddpg.s()).apply_async() elif session.algorithm == AlgorithmType.DNN: - pass + response = chain(aggregate_target_results.s(result.pk, 'dnn'), + map_workload.s(), + configuration_recommendation.s()).apply_async() taskmeta_ids = [] current_task = response while current_task: