Fixed super verbose celery log messages and updated Django options to also log messages to the console.

This commit is contained in:
dvanaken 2020-04-03 01:13:21 -04:00 committed by Dana Van Aken
parent 423140169f
commit 66ad361654
5 changed files with 218 additions and 102 deletions

View File

@ -326,15 +326,20 @@ LOGGING = {
'propagate': False, 'propagate': False,
}, },
'celery': { 'celery': {
'handlers': ['celery', 'dblog'], 'handlers': ['celery', 'dblog', 'console'],
'level': 'DEBUG', 'level': 'DEBUG',
'propogate': True, 'propogate': True,
}, },
'celery.task': { 'celery.task': {
'handlers': ['celery', 'dblog'], 'handlers': ['celery', 'dblog', 'console'],
'level': 'DEBUG', 'level': 'DEBUG',
'propogate': True, 'propogate': True,
}, },
'celery.beat': {
'handlers': ['celery', 'dblog', 'console'],
'level': 'INFO',
'propogate': True,
},
# Uncomment to email admins after encountering an error (and debug=False) # Uncomment to email admins after encountering an error (and debug=False)
# 'django.request': { # 'django.request': {
# 'handlers': ['mail_admins'], # 'handlers': ['mail_admins'],

View File

@ -120,8 +120,9 @@ def clean_knob_data(knob_matrix, knob_labels, session):
else: else:
default_val = float(default_val) default_val = float(default_val)
except ValueError: except ValueError:
LOG.warning("Error parsing knob '%s' default value: %s. Setting default to 0.",
knob_name, default_val, exc_info=True)
default_val = 0 default_val = 0
LOG.exception("Error parsing knob default: %s=%s", knob_name, default_val)
new_col = np.ones((nrows, 1), dtype=float) * default_val new_col = np.ones((nrows, 1), dtype=float) * default_val
new_lab = knob_name new_lab = knob_name
else: else:
@ -133,8 +134,7 @@ def clean_knob_data(knob_matrix, knob_labels, session):
new_columns.append(new_col) new_columns.append(new_col)
new_matrix = np.hstack(new_columns).reshape(nrows, -1) new_matrix = np.hstack(new_columns).reshape(nrows, -1)
LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, len(new_labels), new_labels)
len(new_labels), new_labels)
assert new_labels == knob_cat, \ assert new_labels == knob_cat, \
"Expected knobs: {}\nActual knobs: {}\n".format( "Expected knobs: {}\nActual knobs: {}\n".format(
@ -154,7 +154,7 @@ def clean_metric_data(metric_matrix, metric_labels, session):
metric_cat.append(metric_obj.name) metric_cat.append(metric_obj.name)
missing_columns = sorted(set(metric_cat) - set(metric_labels)) missing_columns = sorted(set(metric_cat) - set(metric_labels))
unused_columns = set(metric_labels) - set(metric_cat) unused_columns = set(metric_labels) - set(metric_cat)
LOG.debug("clean_metric_data added %d metrics and removed %d metric.", len(missing_columns), LOG.debug("clean_metric_data: added %d metrics and removed %d metric.", len(missing_columns),
len(unused_columns)) len(unused_columns))
default_val = 0 default_val = 0
metric_cat_size = len(metric_cat) metric_cat_size = len(metric_cat)
@ -166,7 +166,7 @@ def clean_metric_data(metric_matrix, metric_labels, session):
if metric_name in metric_labels_dict: if metric_name in metric_labels_dict:
index = metric_labels_dict[metric_name] index = metric_labels_dict[metric_name]
matrix[:, i] = metric_matrix[:, index] matrix[:, i] = metric_matrix[:, index]
LOG.debug(matrix.shape) LOG.debug("clean_metric_data: final ~ matrix: %s, labels: %s", matrix.shape, len(metric_cat))
return matrix, metric_cat return matrix, metric_cat
@ -176,6 +176,39 @@ def save_execution_time(start_ts, fn, result):
start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE)) start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE))
ExecutionTime.objects.create(module="celery.async_tasks", function=fn, tag="", ExecutionTime.objects.create(module="celery.async_tasks", function=fn, tag="",
start_time=start_time, execution_time=exec_time, result=result) start_time=start_time, execution_time=exec_time, result=result)
return exec_time
def _get_task_name(session, result_id):
if session.tuning_session == 'lhs':
algo_name = 'LHS'
elif session.tuning_session == 'randomly_generate':
algo_name = 'RANDOM'
elif session.tuning_session == 'tuning_session':
algo_name = AlgorithmType.short_name(session.algorithm)
else:
LOG.warning("Unhandled session type: %s", session.tuning_session)
algo_name = session.tuning_session
return '{}.{}@{}#{}'.format(session.project.name, session.name, algo_name, result_id)
def _task_result_tostring(task_result):
if isinstance(task_result, dict):
task_dict = type(task_result)()
for k, v in task_result.items():
if k.startswith('X_') or k.startswith('y_') or k == 'rowlabels':
if isinstance(v, np.ndarray):
v = str(v.shape)
elif isinstance(v, list):
v = len(v)
else:
LOG.warning("Unhandled type: k=%s, type(v)=%s, v=%s", k, type(v), v)
v = str(v)
task_dict[k] = v
task_str = JSONUtil.dumps(task_dict, pprint=True)
else:
task_str = str(task_result)
return task_str
def choose_value_in_range(num1, num2): def choose_value_in_range(num1, num2):
@ -252,8 +285,8 @@ def calc_next_knob_range(algorithm, knob_info, newest_result, good_val, bad_val,
target_data['status'] = 'range_test' target_data['status'] = 'range_test'
target_data['config_recommend'] = next_config target_data['config_recommend'] = next_config
LOG.debug('%s: Generated a config to test %s of %s.\n\ndata=%s\n', LOG.debug('%s: Generated a config to test %s of %s.\n\ndata=%s\n',
AlgorithmType.name(algorithm), mode, knob.name, _get_task_name(session, newest_result.pk), mode, knob.name,
JSONUtil.dumps(target_data, pprint=True)) _task_result_tostring(target_data))
return True, target_data return True, target_data
@ -265,6 +298,8 @@ def preprocessing(result_id, algorithm):
newest_result = Result.objects.get(pk=result_id) newest_result = Result.objects.get(pk=result_id)
session = newest_result.session session = newest_result.session
knobs = SessionKnob.objects.get_knobs_for_session(session) knobs = SessionKnob.objects.get_knobs_for_session(session)
task_name = _get_task_name(session, result_id)
LOG.info("%s: Preprocessing data...", task_name)
# Check that the minvals of tunable knobs are all decided # Check that the minvals of tunable knobs are all decided
for knob_info in knobs: for knob_info in knobs:
@ -276,7 +311,10 @@ def preprocessing(result_id, algorithm):
successful, target_data = calc_next_knob_range( successful, target_data = calc_next_knob_range(
algorithm, knob_info, newest_result, minval, lowerbound, 'lowerbound') algorithm, knob_info, newest_result, minval, lowerbound, 'lowerbound')
if successful: if successful:
save_execution_time(start_ts, "preprocessing", newest_result) exec_time = save_execution_time(start_ts, "preprocessing", newest_result)
LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data))
LOG.info("%s, Done processing. Returning config for lowerbound knob search"
" (%.1f seconds).", task_name, exec_time)
return result_id, algorithm, target_data return result_id, algorithm, target_data
# Check that the maxvals of tunable knobs are all decided # Check that the maxvals of tunable knobs are all decided
@ -289,7 +327,10 @@ def preprocessing(result_id, algorithm):
successful, target_data = calc_next_knob_range( successful, target_data = calc_next_knob_range(
algorithm, knob_info, newest_result, maxval, upperbound, 'upperbound') algorithm, knob_info, newest_result, maxval, upperbound, 'upperbound')
if successful: if successful:
save_execution_time(start_ts, "preprocessing", newest_result) exec_time = save_execution_time(start_ts, "preprocessing", newest_result)
LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data))
LOG.info("%s: Done processing. Returning config for upperbound knob search"
" (%.1f seconds).", task_name, exec_time)
return result_id, algorithm, target_data return result_id, algorithm, target_data
# Check that we've completed the background tasks at least once. We need # Check that we've completed the background tasks at least once. We need
@ -304,39 +345,44 @@ def preprocessing(result_id, algorithm):
results_cnt -= 1 results_cnt -= 1
if i == len(session_results) - 1 and algorithm == AlgorithmType.DDPG: if i == len(session_results) - 1 and algorithm == AlgorithmType.DDPG:
skip_ddpg = True skip_ddpg = True
if not has_pipeline_data or results_cnt == 0 or skip_ddpg or session.tuning_session == 'lhs':
LOG.debug("%s: workload=%s, has_pipeline_data: %s, # results: %s, results_cnt: %s, "
"skip_ddpg: %s", task_name, newest_result.workload, has_pipeline_data,
len(session_results), results_cnt, skip_ddpg)
if session.tuning_session == 'randomly_generate':
# generate a config randomly
random_knob_result = gen_random_data(knobs)
target_data['status'] = 'random'
target_data['config_recommend'] = random_knob_result
LOG.debug('%s: Generated a random config.', task_name)
elif not has_pipeline_data or results_cnt == 0 or skip_ddpg or session.tuning_session == 'lhs':
if not has_pipeline_data and session.tuning_session == 'tuning_session': if not has_pipeline_data and session.tuning_session == 'tuning_session':
LOG.debug("Background tasks haven't ran for this workload yet, picking data with lhs.") LOG.info("%s: Background tasks haven't ran for this workload yet, "
"picking data with lhs.", task_name)
if results_cnt == 0 and session.tuning_session == 'tuning_session': if results_cnt == 0 and session.tuning_session == 'tuning_session':
LOG.debug("Not enough data in this session, picking data with lhs.") LOG.info("%s: Not enough data in this session, picking data with lhs.", task_name)
if skip_ddpg: if skip_ddpg:
LOG.debug("The most recent result cannot be used by DDPG, picking data with lhs.") LOG.info("%s: The most recent result cannot be used by DDPG, picking data with lhs.",
task_name)
all_samples = JSONUtil.loads(session.lhs_samples) all_samples = JSONUtil.loads(session.lhs_samples)
if len(all_samples) == 0: if len(all_samples) == 0:
if session.tuning_session == 'lhs': num_lhs_samples = 100 if session.tuning_session == 'lhs' else 10
all_samples = gen_lhs_samples(knobs, 100) all_samples = gen_lhs_samples(knobs, num_lhs_samples)
else: LOG.debug('%s: Generated %s LHS samples (LHS data: %s).', task_name, num_lhs_samples,
all_samples = gen_lhs_samples(knobs, 10) len(all_samples))
LOG.debug('%s: Generated LHS.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(all_samples[:5], pprint=True))
samples = all_samples.pop() samples = all_samples.pop()
target_data['status'] = 'lhs' target_data['status'] = 'lhs'
target_data['config_recommend'] = samples target_data['config_recommend'] = samples
session.lhs_samples = JSONUtil.dumps(all_samples) session.lhs_samples = JSONUtil.dumps(all_samples)
session.save() session.save()
LOG.debug('%s: Got LHS config.\n\ndata=%s\n', LOG.debug('%s: Got LHS config.', task_name)
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
elif session.tuning_session == 'randomly_generate': exec_time = save_execution_time(start_ts, "preprocessing", newest_result)
# generate a config randomly LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data))
random_knob_result = gen_random_data(knobs) LOG.info("%s: Done preprocessing data (%.1f seconds).", task_name, exec_time)
target_data['status'] = 'random'
target_data['config_recommend'] = random_knob_result
LOG.debug('%s: Generated a random config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
save_execution_time(start_ts, "preprocessing", newest_result)
return result_id, algorithm, target_data return result_id, algorithm, target_data
@ -344,36 +390,45 @@ def preprocessing(result_id, algorithm):
def aggregate_target_results(aggregate_target_results_input): def aggregate_target_results(aggregate_target_results_input):
start_ts = time.time() start_ts = time.time()
result_id, algorithm, target_data = aggregate_target_results_input result_id, algorithm, target_data = aggregate_target_results_input
newest_result = Result.objects.get(pk=result_id)
session = newest_result.session
task_name = _get_task_name(session, result_id)
# If the preprocessing method has already generated a config, bypass this method. # If the preprocessing method has already generated a config, bypass this method.
if 'config_recommend' in target_data: if 'config_recommend' in target_data:
assert 'newest_result_id' in target_data and 'status' in target_data assert 'newest_result_id' in target_data and 'status' in target_data
LOG.debug('%s: Skipping aggregate_target_results.\nData:\n%s\n\n', LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data))
AlgorithmType.name(algorithm), target_data) LOG.info('%s: Skipping aggregate_target_results (status=%s).', task_name,
target_data.get('status', ''))
return target_data, algorithm return target_data, algorithm
newest_result = Result.objects.get(pk=result_id) LOG.info("%s: Aggregating target results...", task_name)
session = newest_result.session
# Aggregate all knob config results tried by the target so far in this # Aggregate all knob config results tried by the target so far in this
# tuning session and this tuning workload. # tuning session and this tuning workload.
target_results = Result.objects.filter(session=session, target_results = Result.objects.filter(session=session,
dbms=newest_result.dbms, dbms=newest_result.dbms,
workload=newest_result.workload) workload=newest_result.workload)
LOG.debug("%s: # results: %s", task_name, len(target_results))
if len(target_results) == 0: if len(target_results) == 0:
raise Exception('Cannot find any results for session_id={}, dbms_id={}' raise Exception('Cannot find any results for session_id={}, dbms_id={}'
.format(session, newest_result.dbms)) .format(session, newest_result.dbms))
agg_data = DataUtil.aggregate_data(target_results) agg_data = DataUtil.aggregate_data(target_results)
LOG.debug("%s ~ INITIAL: X_matrix=%s, X_columnlabels=%s", task_name,
agg_data['X_matrix'].shape, len(agg_data['X_columnlabels']))
agg_data['newest_result_id'] = result_id agg_data['newest_result_id'] = result_id
agg_data['status'] = 'good' agg_data['status'] = 'good'
# Clean knob data # Clean knob data
cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session)
session)
agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) agg_data['X_matrix'] = np.array(cleaned_agg_data[0])
agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1])
LOG.debug("%s ~ FINAL: X_matrix=%s, X_columnlabels=%s", task_name,
agg_data['X_matrix'].shape, len(agg_data['X_columnlabels']))
LOG.debug('%s: Finished aggregating target results.\n\n', exec_time = save_execution_time(start_ts, "aggregate_target_results", newest_result)
AlgorithmType.name(algorithm)) LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(agg_data))
save_execution_time(start_ts, "aggregate_target_results", Result.objects.get(pk=result_id)) LOG.info('%s: Finished aggregating target results (%.1f seconds).', task_name, exec_time)
return agg_data, algorithm return agg_data, algorithm
@ -440,7 +495,8 @@ def gen_lhs_samples(knobs, nsamples):
elif types[fidx] == VarType.REAL: elif types[fidx] == VarType.REAL:
lhs_samples[-1][names[fidx]] = float(samples[sidx][fidx]) lhs_samples[-1][names[fidx]] = float(samples[sidx][fidx])
else: else:
LOG.debug("LHS type not supported: %s", types[fidx]) LOG.warning("LHS: vartype not supported: %s (knob name: %s).",
VarType.name(types[fidx]), names[fidx])
random.shuffle(lhs_samples) random.shuffle(lhs_samples)
return lhs_samples return lhs_samples
@ -450,18 +506,23 @@ def gen_lhs_samples(knobs, nsamples):
def train_ddpg(train_ddpg_input): def train_ddpg(train_ddpg_input):
start_ts = time.time() start_ts = time.time()
result_id, algorithm, target_data = train_ddpg_input result_id, algorithm, target_data = train_ddpg_input
result = Result.objects.get(pk=result_id)
session = result.session
task_name = _get_task_name(session, result_id)
# If the preprocessing method has already generated a config, bypass this method. # If the preprocessing method has already generated a config, bypass this method.
if 'config_recommend' in target_data: if 'config_recommend' in target_data:
assert 'newest_result_id' in target_data and 'status' in target_data assert 'newest_result_id' in target_data and 'status' in target_data
LOG.debug('Skipping training DDPG.\nData:\n%s\n\n', target_data) LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data))
LOG.info("%s: Skipping train DDPG (status=%s).", task_name, target_data['status'])
return target_data, algorithm return target_data, algorithm
LOG.info('Add training data to ddpg and train ddpg') LOG.info('%s: Add training data to ddpg and train ddpg...', task_name)
result = Result.objects.get(pk=result_id)
session = result.session
params = JSONUtil.loads(session.hyperparameters) params = JSONUtil.loads(session.hyperparameters)
session_results = Result.objects.filter(session=session, session_results = Result.objects.filter(session=session,
creation_time__lt=result.creation_time) creation_time__lt=result.creation_time)
results_cnt = len(session_results) results_cnt = len(session_results)
first_valid_result = -1 first_valid_result = -1
for i, result in enumerate(session_results): for i, result in enumerate(session_results):
@ -506,16 +567,17 @@ def train_ddpg(train_ddpg_input):
knob_data = MinMaxScaler().fit(knob_bounds).transform(knob_data)[0] knob_data = MinMaxScaler().fit(knob_bounds).transform(knob_data)[0]
knob_num = len(knob_data) knob_num = len(knob_data)
metric_num = len(metric_data) metric_num = len(metric_data)
LOG.info('knob_num: %d, metric_num: %d', knob_num, metric_num) LOG.debug('%s: knob_num: %d, metric_num: %d', task_name, knob_num, metric_num)
# Filter ys by current target objective metric # Filter ys by current target objective metric
target_obj_idx = [i for i, n in enumerate(metric_labels) if n == target_objective] target_obj_idx = [i for i, n in enumerate(metric_labels) if n == target_objective]
if len(target_obj_idx) == 0: if len(target_obj_idx) == 0:
raise Exception(('Could not find target objective in metrics ' raise Exception(('[{}] Could not find target objective in metrics '
'(target_obj={})').format(target_objective)) '(target_obj={})').format(task_name, target_objective))
elif len(target_obj_idx) > 1: elif len(target_obj_idx) > 1:
raise Exception(('Found {} instances of target objective in ' raise Exception(('[{}] Found {} instances of target objective in '
'metrics (target_obj={})').format(len(target_obj_idx), 'metrics (target_obj={})').format(task_name,
len(target_obj_idx),
target_objective)) target_objective))
objective = metric_data[target_obj_idx] objective = metric_data[target_obj_idx]
base_objective = base_metric_data[prev_obj_idx] base_objective = base_metric_data[prev_obj_idx]
@ -543,7 +605,7 @@ def train_ddpg(train_ddpg_input):
else: # negative reward else: # negative reward
reward = -(np.square((2 * base_objective - objective) / base_objective) - 1)\ reward = -(np.square((2 * base_objective - objective) / base_objective) - 1)\
* abs(2 * prev_objective - objective) / prev_objective * abs(2 * prev_objective - objective) / prev_objective
LOG.info('reward: %f', reward) LOG.info('%s: reward: %f', task_name, reward)
# Update ddpg # Update ddpg
ddpg = DDPG(n_actions=knob_num, n_states=metric_num, alr=params['DDPG_ACTOR_LEARNING_RATE'], ddpg = DDPG(n_actions=knob_num, n_states=metric_num, alr=params['DDPG_ACTOR_LEARNING_RATE'],
@ -561,8 +623,10 @@ def train_ddpg(train_ddpg_input):
ddpg.update() ddpg.update()
session.ddpg_actor_model, session.ddpg_critic_model = ddpg.get_model() session.ddpg_actor_model, session.ddpg_critic_model = ddpg.get_model()
session.ddpg_reply_memory = ddpg.replay_memory.get() session.ddpg_reply_memory = ddpg.replay_memory.get()
save_execution_time(start_ts, "train_ddpg", Result.objects.get(pk=result_id))
session.save() session.save()
exec_time = save_execution_time(start_ts, "train_ddpg", result)
LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data))
LOG.info('%s: Done training ddpg (%.1f seconds).', task_name, exec_time)
return target_data, algorithm return target_data, algorithm
@ -598,8 +662,8 @@ def check_early_return(target_data, algorithm):
target_data_res = create_and_save_recommendation( target_data_res = create_and_save_recommendation(
recommended_knobs=target_data['config_recommend'], result=newest_result, recommended_knobs=target_data['config_recommend'], result=newest_result,
status=target_data['status'], info=info, pipeline_run=None) status=target_data['status'], info=info, pipeline_run=None)
LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n', LOG.debug('%s: Skipping configuration recommendation (status=%s).',
AlgorithmType.name(algorithm), target_data) _get_task_name(newest_result.session, result_id), target_data_res['status'])
return True, target_data_res return True, target_data_res
return False, None return False, None
@ -607,17 +671,21 @@ def check_early_return(target_data, algorithm):
@shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg') @shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg')
def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: disable=invalid-name def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: disable=invalid-name
start_ts = time.time() start_ts = time.time()
LOG.info('configuration_recommendation called (DDPG)')
target_data, algorithm = recommendation_ddpg_input target_data, algorithm = recommendation_ddpg_input
early_return, target_data_res = check_early_return(target_data, algorithm)
if early_return:
return target_data_res
result_id = target_data['newest_result_id'] result_id = target_data['newest_result_id']
result_list = Result.objects.filter(pk=result_id) result_list = Result.objects.filter(pk=result_id)
result = result_list.first() result = result_list.first()
session = result.session session = result.session
task_name = _get_task_name(session, result_id)
early_return, target_data_res = check_early_return(target_data, algorithm)
if early_return:
LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data_res))
LOG.info("%s: Returning early from config recommendation (DDPG).", task_name)
return target_data_res
LOG.info('%s: Recommendation the next configuration (DDPG)...', task_name)
params = JSONUtil.loads(session.hyperparameters) params = JSONUtil.loads(session.hyperparameters)
agg_data = DataUtil.aggregate_data(result_list) agg_data = DataUtil.aggregate_data(result_list)
metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session) metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session)
@ -645,8 +713,11 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis
target_data_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result, target_data_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result,
status='good', info='INFO: ddpg') status='good', info='INFO: ddpg')
exec_time = save_execution_time(start_ts, "configuration_recommendation_ddpg", result)
LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data_res))
LOG.info("%s: Done recommending the next configuration (DDPG, %.1f seconds).",
task_name, exec_time)
save_execution_time(start_ts, "configuration_recommendation_ddpg", result)
return target_data_res return target_data_res
@ -850,14 +921,17 @@ def combine_workload(target_data):
def configuration_recommendation(recommendation_input): def configuration_recommendation(recommendation_input):
start_ts = time.time() start_ts = time.time()
target_data, algorithm = recommendation_input target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called') newest_result = Result.objects.get(pk=target_data['newest_result_id'])
session = newest_result.session
task_name = _get_task_name(session, target_data['newest_result_id'])
early_return, target_data_res = check_early_return(target_data, algorithm) early_return, target_data_res = check_early_return(target_data, algorithm)
if early_return: if early_return:
LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data_res))
LOG.info("%s: Returning early from config recommendation.", task_name)
return target_data_res return target_data_res
newest_result = Result.objects.get(pk=target_data['newest_result_id']) LOG.info("%s: Recommending the next configuration...", task_name)
session = newest_result.session
params = JSONUtil.loads(session.hyperparameters) params = JSONUtil.loads(session.hyperparameters)
X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\ X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\
@ -915,6 +989,7 @@ def configuration_recommendation(recommendation_input):
elif algorithm == AlgorithmType.GPR: elif algorithm == AlgorithmType.GPR:
# default gpr model # default gpr model
if params['GPR_USE_GPFLOW']: if params['GPR_USE_GPFLOW']:
LOG.debug("%s: Running GPR with GPFLOW.", task_name)
model_kwargs = {} model_kwargs = {}
model_kwargs['model_learning_rate'] = params['GPR_HP_LEARNING_RATE'] model_kwargs['model_learning_rate'] = params['GPR_HP_LEARNING_RATE']
model_kwargs['model_maxiter'] = params['GPR_HP_MAX_ITER'] model_kwargs['model_maxiter'] = params['GPR_HP_MAX_ITER']
@ -933,6 +1008,7 @@ def configuration_recommendation(recommendation_input):
**model_kwargs) **model_kwargs)
res = tf_optimize(m.model, X_samples, **opt_kwargs) res = tf_optimize(m.model, X_samples, **opt_kwargs)
else: else:
LOG.debug("%s: Running GPR with GPRGD.", task_name)
model = GPRGD(length_scale=params['GPR_LENGTH_SCALE'], model = GPRGD(length_scale=params['GPR_LENGTH_SCALE'],
magnitude=params['GPR_MAGNITUDE'], magnitude=params['GPR_MAGNITUDE'],
max_train_size=params['GPR_MAX_TRAIN_SIZE'], max_train_size=params['GPR_MAX_TRAIN_SIZE'],
@ -973,17 +1049,19 @@ def configuration_recommendation(recommendation_input):
recommended_knobs=conf_map, result=newest_result, recommended_knobs=conf_map, result=newest_result,
status='good', info='INFO: training data size is {}'.format(X_scaled.shape[0]), status='good', info='INFO: training data size is {}'.format(X_scaled.shape[0]),
pipeline_run=target_data['pipeline_run']) pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Finished selecting the next config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(conf_map_res, pprint=True))
save_execution_time(start_ts, "configuration_recommendation", newest_result) exec_time = save_execution_time(start_ts, "configuration_recommendation", newest_result)
LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(conf_map_res))
LOG.info("%s: Done recommending the next configuration (%.1f seconds).", task_name, exec_time)
return conf_map_res return conf_map_res
def load_data_helper(filtered_pipeline_data, workload, task_type): def load_data_helper(filtered_pipeline_data, workload, task_type):
pipeline_data = filtered_pipeline_data.get(workload=workload, pipeline_data = filtered_pipeline_data.get(workload=workload,
task_type=task_type) task_type=task_type)
LOG.debug("PIPELINE DATA: %s", str(pipeline_data.data)) LOG.debug("PIPELINE DATA: pipeline_run=%s, workoad=%s, type=%s, data=%s...",
pipeline_data.pipeline_run.pk, workload, PipelineTaskType.name(task_type),
str(pipeline_data.data)[:100])
return JSONUtil.loads(pipeline_data.data) return JSONUtil.loads(pipeline_data.data)
@ -991,12 +1069,14 @@ def load_data_helper(filtered_pipeline_data, workload, task_type):
def map_workload(map_workload_input): def map_workload(map_workload_input):
start_ts = time.time() start_ts = time.time()
target_data, algorithm = map_workload_input target_data, algorithm = map_workload_input
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
session = newest_result.session
task_name = _get_task_name(session, target_data['newest_result_id'])
assert target_data is not None assert target_data is not None
if target_data['status'] != 'good': if target_data['status'] != 'good':
LOG.debug('%s: Skipping workload mapping.\n\ndata=%s\n', LOG.debug('\n%s: Result = %s\n', task_name, _task_result_tostring(target_data))
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) LOG.info("%s: Skipping workload mapping (status: %s).", task_name, target_data['status'])
return target_data, algorithm return target_data, algorithm
# Get the latest version of pipeline data that's been computed so far. # Get the latest version of pipeline data that's been computed so far.
@ -1004,8 +1084,8 @@ def map_workload(map_workload_input):
assert latest_pipeline_run is not None assert latest_pipeline_run is not None
target_data['pipeline_run'] = latest_pipeline_run.pk target_data['pipeline_run'] = latest_pipeline_run.pk
newest_result = Result.objects.get(pk=target_data['newest_result_id']) LOG.info("%s: Mapping the workload...", task_name)
session = newest_result.session
params = JSONUtil.loads(session.hyperparameters) params = JSONUtil.loads(session.hyperparameters)
target_workload = newest_result.workload target_workload = newest_result.workload
X_columnlabels = np.array(target_data['X_columnlabels']) X_columnlabels = np.array(target_data['X_columnlabels'])
@ -1086,8 +1166,9 @@ def map_workload(map_workload_input):
if len(workload_data) == 0: if len(workload_data) == 0:
# The background task that aggregates the data has not finished running yet # The background task that aggregates the data has not finished running yet
target_data.update(mapped_workload=None, scores=None) target_data.update(mapped_workload=None, scores=None)
LOG.debug('%s: Skipping workload mapping because there is no parsed workload.\n', LOG.debug('%s: Result = %s\n', task_name, _task_result_tostring(target_data))
AlgorithmType.name(algorithm)) LOG.info('%s: Skipping workload mapping because no workload data is available.',
task_name)
return target_data, algorithm return target_data, algorithm
# Stack all X & y matrices for preprocessing # Stack all X & y matrices for preprocessing
@ -1166,8 +1247,8 @@ def map_workload(map_workload_input):
scores_info[workload_id] = (workload_name, similarity_score) scores_info[workload_id] = (workload_name, similarity_score)
target_data.update(mapped_workload=(best_workload_id, best_workload_name, best_score), target_data.update(mapped_workload=(best_workload_id, best_workload_name, best_score),
scores=scores_info) scores=scores_info)
LOG.debug('%s: Finished mapping the workload.\n\ndata=%s\n', exec_time = save_execution_time(start_ts, "map_workload", newest_result)
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) LOG.debug('\n%s: Result = %s\n', task_name, _task_result_tostring(target_data))
LOG.info('%s: Done mapping the workload (%.1f seconds).', task_name, exec_time)
save_execution_time(start_ts, "map_workload", newest_result)
return target_data, algorithm return target_data, algorithm

View File

@ -41,17 +41,21 @@ def save_execution_time(start_ts, fn):
@shared_task(name="run_background_tasks") @shared_task(name="run_background_tasks")
def run_background_tasks(): def run_background_tasks():
start_ts = time.time() start_ts = time.time()
LOG.debug("Starting background tasks") LOG.info("Starting background tasks...")
# Find modified and not modified workloads, we only have to calculate for the # Find modified and not modified workloads, we only have to calculate for the
# modified workloads. # modified workloads.
modified_workloads = Workload.objects.filter(status=WorkloadStatusType.MODIFIED) modified_workloads = Workload.objects.filter(status=WorkloadStatusType.MODIFIED)
num_modified = modified_workloads.count()
non_modified_workloads = Workload.objects.filter(status=WorkloadStatusType.PROCESSED) non_modified_workloads = Workload.objects.filter(status=WorkloadStatusType.PROCESSED)
non_modified_workloads = list(non_modified_workloads.values_list('pk', flat=True)) non_modified_workloads = list(non_modified_workloads.values_list('pk', flat=True))
last_pipeline_run = PipelineRun.objects.get_latest() last_pipeline_run = PipelineRun.objects.get_latest()
LOG.debug("Workloads: # modified: %s, # processed: %s, # total: %s",
num_modified, len(non_modified_workloads),
Workload.objects.all().count())
if len(modified_workloads) == 0: if num_modified == 0:
# No previous workload data yet. Try again later. # No previous workload data yet. Try again later.
LOG.debug("No workload data yet. Ending background tasks") LOG.info("No modified workload data yet. Ending background tasks.")
return return
# Create new entry in PipelineRun table to store the output of each of # Create new entry in PipelineRun table to store the output of each of
@ -59,29 +63,39 @@ def run_background_tasks():
pipeline_run_obj = PipelineRun(start_time=now(), end_time=None) pipeline_run_obj = PipelineRun(start_time=now(), end_time=None)
pipeline_run_obj.save() pipeline_run_obj.save()
for workload in modified_workloads: for i, workload in enumerate(modified_workloads):
workload.status = WorkloadStatusType.PROCESSING
workload.save()
wkld_results = Result.objects.filter(workload=workload) wkld_results = Result.objects.filter(workload=workload)
if wkld_results.exists() is False: num_wkld_results = wkld_results.count()
workload_name = '{}@{}.{}'.format(workload.dbms.key, workload.project.name, workload.name)
LOG.info("Starting workload %s (%s/%s, # results: %s)...", workload_name,
i + 1, num_modified, num_wkld_results)
if num_wkld_results == 0:
# delete the workload # delete the workload
LOG.debug("Deleting workload %d because it has no results.", workload.id) LOG.info("Deleting workload %s because it has no results.", workload_name)
workload.delete() workload.delete()
continue continue
elif num_wkld_results < MIN_WORKLOAD_RESULTS_COUNT:
# Check that there are enough results in the workload # Check that there are enough results in the workload
if wkld_results.count() < MIN_WORKLOAD_RESULTS_COUNT: LOG.info("Not enough results in workload %s (# results: %s, # required: %s).",
LOG.debug("Not enough results in workload %d (only %d results).", workload.id, workload_name, num_wkld_results, MIN_WORKLOAD_RESULTS_COUNT)
wkld_results.count())
continue continue
workload.status = WorkloadStatusType.PROCESSING
workload.save()
LOG.debug("Aggregating data for workload %d", workload.id) LOG.info("Aggregating data for workload %s...", workload_name)
# Aggregate the knob & metric data for this workload # Aggregate the knob & metric data for this workload
knob_data, metric_data = aggregate_data(wkld_results) knob_data, metric_data = aggregate_data(wkld_results)
# LOG.debug("knob_data: %s", str(knob_data)) LOG.debug("Aggregated knob data: rowlabels=%s, columnlabels=%s, data=%s.",
# LOG.debug("metric_data: %s", str(metric_data)) len(knob_data['rowlabels']), len(knob_data['columnlabels']),
knob_data['data'].shape)
LOG.debug("Aggregated metric data: rowlabels=%s, columnlabels=%s, data=%s.",
len(metric_data['rowlabels']), len(metric_data['columnlabels']),
metric_data['data'].shape)
LOG.info("Done aggregating data for workload %s.", workload_name)
# Knob_data and metric_data are 2D numpy arrays. Convert them into a # Knob_data and metric_data are 2D numpy arrays. Convert them into a
# JSON-friendly (nested) lists and then save them as new PipelineData # JSON-friendly (nested) lists and then save them as new PipelineData
@ -109,9 +123,11 @@ def run_background_tasks():
# Execute the Workload Characterization task to compute the list of # Execute the Workload Characterization task to compute the list of
# pruned metrics for this workload and save them in a new PipelineData # pruned metrics for this workload and save them in a new PipelineData
# object. # object.
LOG.debug("Pruning metrics for workload %d.", workload.id) LOG.info("Pruning metrics for workload %s...", workload_name)
pruned_metrics = run_workload_characterization(metric_data=metric_data) pruned_metrics = run_workload_characterization(metric_data=metric_data)
LOG.debug("pruned_metrics: %s", str(pruned_metrics)) LOG.info("Done pruning metrics for workload %s (# pruned metrics: %s).\n\n"
"Pruned metrics: %s\n", workload_name, len(pruned_metrics),
pruned_metrics)
pruned_metrics_entry = PipelineData(pipeline_run=pipeline_run_obj, pruned_metrics_entry = PipelineData(pipeline_run=pipeline_run_obj,
task_type=PipelineTaskType.PRUNED_METRICS, task_type=PipelineTaskType.PRUNED_METRICS,
workload=workload, workload=workload,
@ -131,11 +147,12 @@ def run_background_tasks():
# Execute the Knob Identification task to compute an ordered list of knobs # Execute the Knob Identification task to compute an ordered list of knobs
# ranked by their impact on the DBMS's performance. Save them in a new # ranked by their impact on the DBMS's performance. Save them in a new
# PipelineData object. # PipelineData object.
LOG.debug("Ranking knobs for workload %d.", workload.id) LOG.info("Ranking knobs for workload %s...", workload_name)
ranked_knobs = run_knob_identification(knob_data=knob_data, ranked_knobs = run_knob_identification(knob_data=knob_data,
metric_data=pruned_metric_data, metric_data=pruned_metric_data,
dbms=workload.dbms) dbms=workload.dbms)
LOG.debug("ranked_knobs: %s", str(ranked_knobs)) LOG.info("Done ranking knobs for workload %s (# ranked knobs: %s).\n\n"
"Ranked knobs: %s\n", workload_name, len(ranked_knobs), ranked_knobs)
ranked_knobs_entry = PipelineData(pipeline_run=pipeline_run_obj, ranked_knobs_entry = PipelineData(pipeline_run=pipeline_run_obj,
task_type=PipelineTaskType.RANKED_KNOBS, task_type=PipelineTaskType.RANKED_KNOBS,
workload=workload, workload=workload,
@ -145,7 +162,10 @@ def run_background_tasks():
workload.status = WorkloadStatusType.PROCESSED workload.status = WorkloadStatusType.PROCESSED
workload.save() workload.save()
LOG.debug("Finished processing modified workloads") LOG.info("Done processing workload %s (%s/%s).", workload_name, i + 1,
num_modified)
LOG.info("Finished processing %s modified workloads.", num_modified)
non_modified_workloads = Workload.objects.filter(pk__in=non_modified_workloads) non_modified_workloads = Workload.objects.filter(pk__in=non_modified_workloads)
# Update the latest pipeline data for the non modified workloads to have this pipeline run # Update the latest pipeline data for the non modified workloads to have this pipeline run
@ -157,8 +177,8 @@ def run_background_tasks():
# the background tasks # the background tasks
pipeline_run_obj.end_time = now() pipeline_run_obj.end_time = now()
pipeline_run_obj.save() pipeline_run_obj.save()
LOG.debug("Finished background tasks")
save_execution_time(start_ts, "run_background_tasks") save_execution_time(start_ts, "run_background_tasks")
LOG.info("Finished background tasks (%.0f seconds).", time.time() - start_ts)
def aggregate_data(wkld_results): def aggregate_data(wkld_results):

View File

@ -191,6 +191,16 @@ class AlgorithmType(BaseType):
DNN: 'Deep Neural Network', DNN: 'Deep Neural Network',
} }
SHORT_NAMES = {
GPR: 'GPR',
DDPG: 'DDPG',
DNN: 'DNN',
}
@classmethod
def short_name(cls, ctype):
return cls.SHORT_NAMES[ctype]
class StorageType(BaseType): class StorageType(BaseType):
SSD = 5 SSD = 5

View File

@ -315,7 +315,7 @@ class ConversionUtil(object):
return value return value
min_suffix = system[i + 1][1] min_suffix = system[i + 1][1]
LOG.warning('The value is smaller than the min factor: %s < %s%s. ' LOG.warning('The value is smaller than the min factor: %s < %s (1%s). '
'Setting min_suffix=%s...', value, factor, suffix, min_suffix) 'Setting min_suffix=%s...', value, factor, suffix, min_suffix)
else: else:
min_factor = factor min_factor = factor