choose algorithm based on option

This commit is contained in:
bohanjason 2019-09-28 00:23:35 -04:00 committed by Dana Van Aken
parent be955cc812
commit 82a7d859c2
2 changed files with 43 additions and 36 deletions

View File

@ -78,10 +78,10 @@ class MapWorkload(UpdateTask): # pylint: disable=abstract-method
super(MapWorkload, self).on_success(retval, task_id, args, kwargs) super(MapWorkload, self).on_success(retval, task_id, args, kwargs)
# Replace result with formatted result # Replace result with formatted result
if not args[0]['bad']: if not args[0][0]['bad']:
new_res = { new_res = {
'scores': sorted(args[0]['scores'].items()), 'scores': sorted(args[0][0]['scores'].items()),
'mapped_workload_id': args[0]['mapped_workload'], 'mapped_workload_id': args[0][0]['mapped_workload'],
} }
task_meta = TaskMeta.objects.get(task_id=task_id) task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = new_res # Only store scores task_meta.result = new_res # Only store scores
@ -97,7 +97,7 @@ class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-metho
def on_success(self, retval, task_id, args, kwargs): def on_success(self, retval, task_id, args, kwargs):
super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs) super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs)
result_id = args[0]['newest_result_id'] result_id = args[0][0]['newest_result_id']
result = Result.objects.get(pk=result_id) result = Result.objects.get(pk=result_id)
# Replace result with formatted result # Replace result with formatted result
@ -141,7 +141,7 @@ def clean_knob_data(knob_matrix, knob_labels, session):
@task(base=AggregateTargetResults, name='aggregate_target_results') @task(base=AggregateTargetResults, name='aggregate_target_results')
def aggregate_target_results(result_id): def aggregate_target_results(result_id, algorithm='gpr'):
# Check that we've completed the background tasks at least once. We need # Check that we've completed the background tasks at least once. We need
# this data in order to make a configuration recommendation (until we # this data in order to make a configuration recommendation (until we
# implement a sampling technique to generate new training data). # implement a sampling technique to generate new training data).
@ -159,7 +159,7 @@ def aggregate_target_results(result_id):
agg_data['newest_result_id'] = result_id agg_data['newest_result_id'] = result_id
agg_data['bad'] = True agg_data['bad'] = True
agg_data['config_recommend'] = random_knob_result agg_data['config_recommend'] = random_knob_result
return agg_data return agg_data, algorithm
# Aggregate all knob config results tried by the target so far in this # Aggregate all knob config results tried by the target so far in this
# tuning session and this tuning workload. # tuning session and this tuning workload.
@ -179,7 +179,7 @@ def aggregate_target_results(result_id):
agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) agg_data['X_matrix'] = np.array(cleaned_agg_data[0])
agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1])
return agg_data return agg_data, algorithm
def gen_random_data(knobs): def gen_random_data(knobs):
@ -331,7 +331,8 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n
@task(base=ConfigurationRecommendation, name='configuration_recommendation') @task(base=ConfigurationRecommendation, name='configuration_recommendation')
def configuration_recommendation(target_data): def configuration_recommendation(recommendation_input):
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called') LOG.info('configuration_recommendation called')
latest_pipeline_run = PipelineRun.objects.get_latest() latest_pipeline_run = PipelineRun.objects.get_latest()
@ -544,8 +545,11 @@ def configuration_recommendation(target_data):
project = newest_result.session.project.pk project = newest_result.session.project.pk
full_path = os.path.join(MODEL_DIR, 'p' + str(project) + '_s' + str(session) + '_nn.weights') full_path = os.path.join(MODEL_DIR, 'p' + str(project) + '_s' + str(session) + '_nn.weights')
res = None
assert algorithm in ['gpr', 'dnn']
if algorithm == 'dnn':
# neural network model # neural network model
# FIXME: choose algorithm based on the session option
model_nn = NeuralNet(weights_file=full_path, model_nn = NeuralNet(weights_file=full_path,
n_input=X_samples.shape[1], n_input=X_samples.shape[1],
batch_size=X_samples.shape[0], batch_size=X_samples.shape[0],
@ -555,7 +559,8 @@ def configuration_recommendation(target_data):
debug=True) debug=True)
model_nn.fit(X_scaled, y_scaled) model_nn.fit(X_scaled, y_scaled)
res = model_nn.recommend(X_samples, X_min, X_max, explore=True) res = model_nn.recommend(X_samples, X_min, X_max, explore=True)
elif algorithm == 'gpr':
# default gpr model
model = GPRGD(length_scale=DEFAULT_LENGTH_SCALE, model = GPRGD(length_scale=DEFAULT_LENGTH_SCALE,
magnitude=DEFAULT_MAGNITUDE, magnitude=DEFAULT_MAGNITUDE,
max_train_size=MAX_TRAIN_SIZE, max_train_size=MAX_TRAIN_SIZE,
@ -566,8 +571,8 @@ def configuration_recommendation(target_data):
max_iter=MAX_ITER, max_iter=MAX_ITER,
sigma_multiplier=DEFAULT_SIGMA_MULTIPLIER, sigma_multiplier=DEFAULT_SIGMA_MULTIPLIER,
mu_multiplier=DEFAULT_MU_MULTIPLIER) mu_multiplier=DEFAULT_MU_MULTIPLIER)
# model.fit(X_scaled, y_scaled, X_min, X_max, ridge=DEFAULT_RIDGE) model.fit(X_scaled, y_scaled, X_min, X_max, ridge=DEFAULT_RIDGE)
# res = model.predict(X_samples, constraint_helper=constraint_helper) res = model.predict(X_samples, constraint_helper=constraint_helper)
best_config_idx = np.argmin(res.minl.ravel()) best_config_idx = np.argmin(res.minl.ravel())
best_config = res.minl_conf[best_config_idx, :] best_config = res.minl_conf[best_config_idx, :]
@ -601,12 +606,13 @@ def load_data_helper(filtered_pipeline_data, workload, task_type):
@task(base=MapWorkload, name='map_workload') @task(base=MapWorkload, name='map_workload')
def map_workload(target_data): def map_workload(map_workload_input):
target_data, algorithm = map_workload_input
# Get the latest version of pipeline data that's been computed so far. # Get the latest version of pipeline data that's been computed so far.
latest_pipeline_run = PipelineRun.objects.get_latest() latest_pipeline_run = PipelineRun.objects.get_latest()
if target_data['bad']: if target_data['bad']:
assert target_data is not None assert target_data is not None
return target_data return target_data, algorithm
assert latest_pipeline_run is not None assert latest_pipeline_run is not None
newest_result = Result.objects.get(pk=target_data['newest_result_id']) newest_result = Result.objects.get(pk=target_data['newest_result_id'])
@ -753,5 +759,4 @@ def map_workload(target_data):
target_data['mapped_workload'] = (best_workload_id, best_workload_name, best_score) target_data['mapped_workload'] = (best_workload_id, best_workload_name, best_score)
target_data['scores'] = scores_info target_data['scores'] = scores_info
return target_data return target_data, algorithm
#

View File

@ -536,7 +536,9 @@ def handle_result_files(session, files):
response = chain(train_ddpg.s(result.pk), response = chain(train_ddpg.s(result.pk),
configuration_recommendation_ddpg.s()).apply_async() configuration_recommendation_ddpg.s()).apply_async()
elif session.algorithm == AlgorithmType.DNN: elif session.algorithm == AlgorithmType.DNN:
pass response = chain(aggregate_target_results.s(result.pk, 'dnn'),
map_workload.s(),
configuration_recommendation.s()).apply_async()
taskmeta_ids = [] taskmeta_ids = []
current_task = response current_task = response
while current_task: while current_task: