diff --git a/server/website/website/settings/common.py b/server/website/website/settings/common.py index c431e62..5c71a8e 100644 --- a/server/website/website/settings/common.py +++ b/server/website/website/settings/common.py @@ -326,15 +326,20 @@ LOGGING = { 'propagate': False, }, 'celery': { - 'handlers': ['celery', 'dblog'], + 'handlers': ['celery', 'dblog', 'console'], 'level': 'DEBUG', 'propogate': True, }, 'celery.task': { - 'handlers': ['celery', 'dblog'], + 'handlers': ['celery', 'dblog', 'console'], 'level': 'DEBUG', 'propogate': True, }, + 'celery.beat': { + 'handlers': ['celery', 'dblog', 'console'], + 'level': 'INFO', + 'propogate': True, + }, # Uncomment to email admins after encountering an error (and debug=False) # 'django.request': { # 'handlers': ['mail_admins'], diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index a3b2eff..b0026ea 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -120,8 +120,9 @@ def clean_knob_data(knob_matrix, knob_labels, session): else: default_val = float(default_val) except ValueError: + LOG.warning("Error parsing knob '%s' default value: %s. Setting default to 0.", + knob_name, default_val, exc_info=True) default_val = 0 - LOG.exception("Error parsing knob default: %s=%s", knob_name, default_val) new_col = np.ones((nrows, 1), dtype=float) * default_val new_lab = knob_name else: @@ -133,8 +134,7 @@ def clean_knob_data(knob_matrix, knob_labels, session): new_columns.append(new_col) new_matrix = np.hstack(new_columns).reshape(nrows, -1) - LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, - len(new_labels), new_labels) + LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, len(new_labels), new_labels) assert new_labels == knob_cat, \ "Expected knobs: {}\nActual knobs: {}\n".format( @@ -154,7 +154,7 @@ def clean_metric_data(metric_matrix, metric_labels, session): metric_cat.append(metric_obj.name) missing_columns = sorted(set(metric_cat) - set(metric_labels)) unused_columns = set(metric_labels) - set(metric_cat) - LOG.debug("clean_metric_data added %d metrics and removed %d metric.", len(missing_columns), + LOG.debug("clean_metric_data: added %d metrics and removed %d metric.", len(missing_columns), len(unused_columns)) default_val = 0 metric_cat_size = len(metric_cat) @@ -166,7 +166,7 @@ def clean_metric_data(metric_matrix, metric_labels, session): if metric_name in metric_labels_dict: index = metric_labels_dict[metric_name] matrix[:, i] = metric_matrix[:, index] - LOG.debug(matrix.shape) + LOG.debug("clean_metric_data: final ~ matrix: %s, labels: %s", matrix.shape, len(metric_cat)) return matrix, metric_cat @@ -176,6 +176,39 @@ def save_execution_time(start_ts, fn, result): start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE)) ExecutionTime.objects.create(module="celery.async_tasks", function=fn, tag="", start_time=start_time, execution_time=exec_time, result=result) + return exec_time + + +def _get_task_name(session, result_id): + if session.tuning_session == 'lhs': + algo_name = 'LHS' + elif session.tuning_session == 'randomly_generate': + algo_name = 'RANDOM' + elif session.tuning_session == 'tuning_session': + algo_name = AlgorithmType.short_name(session.algorithm) + else: + LOG.warning("Unhandled session type: %s", session.tuning_session) + algo_name = session.tuning_session + return '{}.{}@{}#{}'.format(session.project.name, session.name, algo_name, result_id) + + +def _task_result_tostring(task_result): + if isinstance(task_result, dict): + task_dict = type(task_result)() + for k, v in task_result.items(): + if k.startswith('X_') or k.startswith('y_') or k == 'rowlabels': + if isinstance(v, np.ndarray): + v = str(v.shape) + elif isinstance(v, list): + v = len(v) + else: + LOG.warning("Unhandled type: k=%s, type(v)=%s, v=%s", k, type(v), v) + v = str(v) + task_dict[k] = v + task_str = JSONUtil.dumps(task_dict, pprint=True) + else: + task_str = str(task_result) + return task_str def choose_value_in_range(num1, num2): @@ -252,8 +285,8 @@ def calc_next_knob_range(algorithm, knob_info, newest_result, good_val, bad_val, target_data['status'] = 'range_test' target_data['config_recommend'] = next_config LOG.debug('%s: Generated a config to test %s of %s.\n\ndata=%s\n', - AlgorithmType.name(algorithm), mode, knob.name, - JSONUtil.dumps(target_data, pprint=True)) + _get_task_name(session, newest_result.pk), mode, knob.name, + _task_result_tostring(target_data)) return True, target_data @@ -265,6 +298,8 @@ def preprocessing(result_id, algorithm): newest_result = Result.objects.get(pk=result_id) session = newest_result.session knobs = SessionKnob.objects.get_knobs_for_session(session) + task_name = _get_task_name(session, result_id) + LOG.info("%s: Preprocessing data...", task_name) # Check that the minvals of tunable knobs are all decided for knob_info in knobs: @@ -276,7 +311,10 @@ def preprocessing(result_id, algorithm): successful, target_data = calc_next_knob_range( algorithm, knob_info, newest_result, minval, lowerbound, 'lowerbound') if successful: - save_execution_time(start_ts, "preprocessing", newest_result) + exec_time = save_execution_time(start_ts, "preprocessing", newest_result) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data)) + LOG.info("%s, Done processing. Returning config for lowerbound knob search" + " (%.1f seconds).", task_name, exec_time) return result_id, algorithm, target_data # Check that the maxvals of tunable knobs are all decided @@ -289,7 +327,10 @@ def preprocessing(result_id, algorithm): successful, target_data = calc_next_knob_range( algorithm, knob_info, newest_result, maxval, upperbound, 'upperbound') if successful: - save_execution_time(start_ts, "preprocessing", newest_result) + exec_time = save_execution_time(start_ts, "preprocessing", newest_result) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data)) + LOG.info("%s: Done processing. Returning config for upperbound knob search" + " (%.1f seconds).", task_name, exec_time) return result_id, algorithm, target_data # Check that we've completed the background tasks at least once. We need @@ -304,39 +345,44 @@ def preprocessing(result_id, algorithm): results_cnt -= 1 if i == len(session_results) - 1 and algorithm == AlgorithmType.DDPG: skip_ddpg = True - if not has_pipeline_data or results_cnt == 0 or skip_ddpg or session.tuning_session == 'lhs': + + LOG.debug("%s: workload=%s, has_pipeline_data: %s, # results: %s, results_cnt: %s, " + "skip_ddpg: %s", task_name, newest_result.workload, has_pipeline_data, + len(session_results), results_cnt, skip_ddpg) + + if session.tuning_session == 'randomly_generate': + # generate a config randomly + random_knob_result = gen_random_data(knobs) + target_data['status'] = 'random' + target_data['config_recommend'] = random_knob_result + LOG.debug('%s: Generated a random config.', task_name) + + elif not has_pipeline_data or results_cnt == 0 or skip_ddpg or session.tuning_session == 'lhs': if not has_pipeline_data and session.tuning_session == 'tuning_session': - LOG.debug("Background tasks haven't ran for this workload yet, picking data with lhs.") + LOG.info("%s: Background tasks haven't ran for this workload yet, " + "picking data with lhs.", task_name) if results_cnt == 0 and session.tuning_session == 'tuning_session': - LOG.debug("Not enough data in this session, picking data with lhs.") + LOG.info("%s: Not enough data in this session, picking data with lhs.", task_name) if skip_ddpg: - LOG.debug("The most recent result cannot be used by DDPG, picking data with lhs.") + LOG.info("%s: The most recent result cannot be used by DDPG, picking data with lhs.", + task_name) all_samples = JSONUtil.loads(session.lhs_samples) if len(all_samples) == 0: - if session.tuning_session == 'lhs': - all_samples = gen_lhs_samples(knobs, 100) - else: - all_samples = gen_lhs_samples(knobs, 10) - LOG.debug('%s: Generated LHS.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(all_samples[:5], pprint=True)) + num_lhs_samples = 100 if session.tuning_session == 'lhs' else 10 + all_samples = gen_lhs_samples(knobs, num_lhs_samples) + LOG.debug('%s: Generated %s LHS samples (LHS data: %s).', task_name, num_lhs_samples, + len(all_samples)) samples = all_samples.pop() target_data['status'] = 'lhs' target_data['config_recommend'] = samples session.lhs_samples = JSONUtil.dumps(all_samples) session.save() - LOG.debug('%s: Got LHS config.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) + LOG.debug('%s: Got LHS config.', task_name) - elif session.tuning_session == 'randomly_generate': - # generate a config randomly - random_knob_result = gen_random_data(knobs) - target_data['status'] = 'random' - target_data['config_recommend'] = random_knob_result - LOG.debug('%s: Generated a random config.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) - - save_execution_time(start_ts, "preprocessing", newest_result) + exec_time = save_execution_time(start_ts, "preprocessing", newest_result) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data)) + LOG.info("%s: Done preprocessing data (%.1f seconds).", task_name, exec_time) return result_id, algorithm, target_data @@ -344,36 +390,45 @@ def preprocessing(result_id, algorithm): def aggregate_target_results(aggregate_target_results_input): start_ts = time.time() result_id, algorithm, target_data = aggregate_target_results_input + newest_result = Result.objects.get(pk=result_id) + session = newest_result.session + task_name = _get_task_name(session, result_id) + # If the preprocessing method has already generated a config, bypass this method. if 'config_recommend' in target_data: assert 'newest_result_id' in target_data and 'status' in target_data - LOG.debug('%s: Skipping aggregate_target_results.\nData:\n%s\n\n', - AlgorithmType.name(algorithm), target_data) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data)) + LOG.info('%s: Skipping aggregate_target_results (status=%s).', task_name, + target_data.get('status', '')) return target_data, algorithm - newest_result = Result.objects.get(pk=result_id) - session = newest_result.session + LOG.info("%s: Aggregating target results...", task_name) + # Aggregate all knob config results tried by the target so far in this # tuning session and this tuning workload. target_results = Result.objects.filter(session=session, dbms=newest_result.dbms, workload=newest_result.workload) + LOG.debug("%s: # results: %s", task_name, len(target_results)) if len(target_results) == 0: raise Exception('Cannot find any results for session_id={}, dbms_id={}' .format(session, newest_result.dbms)) agg_data = DataUtil.aggregate_data(target_results) + LOG.debug("%s ~ INITIAL: X_matrix=%s, X_columnlabels=%s", task_name, + agg_data['X_matrix'].shape, len(agg_data['X_columnlabels'])) agg_data['newest_result_id'] = result_id agg_data['status'] = 'good' # Clean knob data - cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], - session) + cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session) agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) + LOG.debug("%s ~ FINAL: X_matrix=%s, X_columnlabels=%s", task_name, + agg_data['X_matrix'].shape, len(agg_data['X_columnlabels'])) - LOG.debug('%s: Finished aggregating target results.\n\n', - AlgorithmType.name(algorithm)) - save_execution_time(start_ts, "aggregate_target_results", Result.objects.get(pk=result_id)) + exec_time = save_execution_time(start_ts, "aggregate_target_results", newest_result) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(agg_data)) + LOG.info('%s: Finished aggregating target results (%.1f seconds).', task_name, exec_time) return agg_data, algorithm @@ -440,7 +495,8 @@ def gen_lhs_samples(knobs, nsamples): elif types[fidx] == VarType.REAL: lhs_samples[-1][names[fidx]] = float(samples[sidx][fidx]) else: - LOG.debug("LHS type not supported: %s", types[fidx]) + LOG.warning("LHS: vartype not supported: %s (knob name: %s).", + VarType.name(types[fidx]), names[fidx]) random.shuffle(lhs_samples) return lhs_samples @@ -450,18 +506,23 @@ def gen_lhs_samples(knobs, nsamples): def train_ddpg(train_ddpg_input): start_ts = time.time() result_id, algorithm, target_data = train_ddpg_input + result = Result.objects.get(pk=result_id) + session = result.session + task_name = _get_task_name(session, result_id) + # If the preprocessing method has already generated a config, bypass this method. if 'config_recommend' in target_data: assert 'newest_result_id' in target_data and 'status' in target_data - LOG.debug('Skipping training DDPG.\nData:\n%s\n\n', target_data) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data)) + LOG.info("%s: Skipping train DDPG (status=%s).", task_name, target_data['status']) return target_data, algorithm - LOG.info('Add training data to ddpg and train ddpg') - result = Result.objects.get(pk=result_id) - session = result.session + LOG.info('%s: Add training data to ddpg and train ddpg...', task_name) + params = JSONUtil.loads(session.hyperparameters) session_results = Result.objects.filter(session=session, creation_time__lt=result.creation_time) + results_cnt = len(session_results) first_valid_result = -1 for i, result in enumerate(session_results): @@ -506,16 +567,17 @@ def train_ddpg(train_ddpg_input): knob_data = MinMaxScaler().fit(knob_bounds).transform(knob_data)[0] knob_num = len(knob_data) metric_num = len(metric_data) - LOG.info('knob_num: %d, metric_num: %d', knob_num, metric_num) + LOG.debug('%s: knob_num: %d, metric_num: %d', task_name, knob_num, metric_num) # Filter ys by current target objective metric target_obj_idx = [i for i, n in enumerate(metric_labels) if n == target_objective] if len(target_obj_idx) == 0: - raise Exception(('Could not find target objective in metrics ' - '(target_obj={})').format(target_objective)) + raise Exception(('[{}] Could not find target objective in metrics ' + '(target_obj={})').format(task_name, target_objective)) elif len(target_obj_idx) > 1: - raise Exception(('Found {} instances of target objective in ' - 'metrics (target_obj={})').format(len(target_obj_idx), + raise Exception(('[{}] Found {} instances of target objective in ' + 'metrics (target_obj={})').format(task_name, + len(target_obj_idx), target_objective)) objective = metric_data[target_obj_idx] base_objective = base_metric_data[prev_obj_idx] @@ -543,7 +605,7 @@ def train_ddpg(train_ddpg_input): else: # negative reward reward = -(np.square((2 * base_objective - objective) / base_objective) - 1)\ * abs(2 * prev_objective - objective) / prev_objective - LOG.info('reward: %f', reward) + LOG.info('%s: reward: %f', task_name, reward) # Update ddpg ddpg = DDPG(n_actions=knob_num, n_states=metric_num, alr=params['DDPG_ACTOR_LEARNING_RATE'], @@ -561,8 +623,10 @@ def train_ddpg(train_ddpg_input): ddpg.update() session.ddpg_actor_model, session.ddpg_critic_model = ddpg.get_model() session.ddpg_reply_memory = ddpg.replay_memory.get() - save_execution_time(start_ts, "train_ddpg", Result.objects.get(pk=result_id)) session.save() + exec_time = save_execution_time(start_ts, "train_ddpg", result) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data)) + LOG.info('%s: Done training ddpg (%.1f seconds).', task_name, exec_time) return target_data, algorithm @@ -598,8 +662,8 @@ def check_early_return(target_data, algorithm): target_data_res = create_and_save_recommendation( recommended_knobs=target_data['config_recommend'], result=newest_result, status=target_data['status'], info=info, pipeline_run=None) - LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n', - AlgorithmType.name(algorithm), target_data) + LOG.debug('%s: Skipping configuration recommendation (status=%s).', + _get_task_name(newest_result.session, result_id), target_data_res['status']) return True, target_data_res return False, None @@ -607,17 +671,21 @@ def check_early_return(target_data, algorithm): @shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg') def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: disable=invalid-name start_ts = time.time() - LOG.info('configuration_recommendation called (DDPG)') target_data, algorithm = recommendation_ddpg_input - - early_return, target_data_res = check_early_return(target_data, algorithm) - if early_return: - return target_data_res - result_id = target_data['newest_result_id'] result_list = Result.objects.filter(pk=result_id) result = result_list.first() session = result.session + task_name = _get_task_name(session, result_id) + + early_return, target_data_res = check_early_return(target_data, algorithm) + if early_return: + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data_res)) + LOG.info("%s: Returning early from config recommendation (DDPG).", task_name) + return target_data_res + + LOG.info('%s: Recommendation the next configuration (DDPG)...', task_name) + params = JSONUtil.loads(session.hyperparameters) agg_data = DataUtil.aggregate_data(result_list) metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session) @@ -645,8 +713,11 @@ def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: dis target_data_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result, status='good', info='INFO: ddpg') + exec_time = save_execution_time(start_ts, "configuration_recommendation_ddpg", result) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data_res)) + LOG.info("%s: Done recommending the next configuration (DDPG, %.1f seconds).", + task_name, exec_time) - save_execution_time(start_ts, "configuration_recommendation_ddpg", result) return target_data_res @@ -850,14 +921,17 @@ def combine_workload(target_data): def configuration_recommendation(recommendation_input): start_ts = time.time() target_data, algorithm = recommendation_input - LOG.info('configuration_recommendation called') + newest_result = Result.objects.get(pk=target_data['newest_result_id']) + session = newest_result.session + task_name = _get_task_name(session, target_data['newest_result_id']) early_return, target_data_res = check_early_return(target_data, algorithm) if early_return: + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(target_data_res)) + LOG.info("%s: Returning early from config recommendation.", task_name) return target_data_res - newest_result = Result.objects.get(pk=target_data['newest_result_id']) - session = newest_result.session + LOG.info("%s: Recommending the next configuration...", task_name) params = JSONUtil.loads(session.hyperparameters) X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\ @@ -915,6 +989,7 @@ def configuration_recommendation(recommendation_input): elif algorithm == AlgorithmType.GPR: # default gpr model if params['GPR_USE_GPFLOW']: + LOG.debug("%s: Running GPR with GPFLOW.", task_name) model_kwargs = {} model_kwargs['model_learning_rate'] = params['GPR_HP_LEARNING_RATE'] model_kwargs['model_maxiter'] = params['GPR_HP_MAX_ITER'] @@ -933,6 +1008,7 @@ def configuration_recommendation(recommendation_input): **model_kwargs) res = tf_optimize(m.model, X_samples, **opt_kwargs) else: + LOG.debug("%s: Running GPR with GPRGD.", task_name) model = GPRGD(length_scale=params['GPR_LENGTH_SCALE'], magnitude=params['GPR_MAGNITUDE'], max_train_size=params['GPR_MAX_TRAIN_SIZE'], @@ -973,17 +1049,19 @@ def configuration_recommendation(recommendation_input): recommended_knobs=conf_map, result=newest_result, status='good', info='INFO: training data size is {}'.format(X_scaled.shape[0]), pipeline_run=target_data['pipeline_run']) - LOG.debug('%s: Finished selecting the next config.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(conf_map_res, pprint=True)) - save_execution_time(start_ts, "configuration_recommendation", newest_result) + exec_time = save_execution_time(start_ts, "configuration_recommendation", newest_result) + LOG.debug("\n%s: Result = %s\n", task_name, _task_result_tostring(conf_map_res)) + LOG.info("%s: Done recommending the next configuration (%.1f seconds).", task_name, exec_time) return conf_map_res def load_data_helper(filtered_pipeline_data, workload, task_type): pipeline_data = filtered_pipeline_data.get(workload=workload, task_type=task_type) - LOG.debug("PIPELINE DATA: %s", str(pipeline_data.data)) + LOG.debug("PIPELINE DATA: pipeline_run=%s, workoad=%s, type=%s, data=%s...", + pipeline_data.pipeline_run.pk, workload, PipelineTaskType.name(task_type), + str(pipeline_data.data)[:100]) return JSONUtil.loads(pipeline_data.data) @@ -991,12 +1069,14 @@ def load_data_helper(filtered_pipeline_data, workload, task_type): def map_workload(map_workload_input): start_ts = time.time() target_data, algorithm = map_workload_input + newest_result = Result.objects.get(pk=target_data['newest_result_id']) + session = newest_result.session + task_name = _get_task_name(session, target_data['newest_result_id']) assert target_data is not None if target_data['status'] != 'good': - LOG.debug('%s: Skipping workload mapping.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) - + LOG.debug('\n%s: Result = %s\n', task_name, _task_result_tostring(target_data)) + LOG.info("%s: Skipping workload mapping (status: %s).", task_name, target_data['status']) return target_data, algorithm # Get the latest version of pipeline data that's been computed so far. @@ -1004,8 +1084,8 @@ def map_workload(map_workload_input): assert latest_pipeline_run is not None target_data['pipeline_run'] = latest_pipeline_run.pk - newest_result = Result.objects.get(pk=target_data['newest_result_id']) - session = newest_result.session + LOG.info("%s: Mapping the workload...", task_name) + params = JSONUtil.loads(session.hyperparameters) target_workload = newest_result.workload X_columnlabels = np.array(target_data['X_columnlabels']) @@ -1086,8 +1166,9 @@ def map_workload(map_workload_input): if len(workload_data) == 0: # The background task that aggregates the data has not finished running yet target_data.update(mapped_workload=None, scores=None) - LOG.debug('%s: Skipping workload mapping because there is no parsed workload.\n', - AlgorithmType.name(algorithm)) + LOG.debug('%s: Result = %s\n', task_name, _task_result_tostring(target_data)) + LOG.info('%s: Skipping workload mapping because no workload data is available.', + task_name) return target_data, algorithm # Stack all X & y matrices for preprocessing @@ -1166,8 +1247,8 @@ def map_workload(map_workload_input): scores_info[workload_id] = (workload_name, similarity_score) target_data.update(mapped_workload=(best_workload_id, best_workload_name, best_score), scores=scores_info) - LOG.debug('%s: Finished mapping the workload.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) + exec_time = save_execution_time(start_ts, "map_workload", newest_result) + LOG.debug('\n%s: Result = %s\n', task_name, _task_result_tostring(target_data)) + LOG.info('%s: Done mapping the workload (%.1f seconds).', task_name, exec_time) - save_execution_time(start_ts, "map_workload", newest_result) return target_data, algorithm diff --git a/server/website/website/tasks/periodic_tasks.py b/server/website/website/tasks/periodic_tasks.py index f4cb141..29273e2 100644 --- a/server/website/website/tasks/periodic_tasks.py +++ b/server/website/website/tasks/periodic_tasks.py @@ -41,17 +41,21 @@ def save_execution_time(start_ts, fn): @shared_task(name="run_background_tasks") def run_background_tasks(): start_ts = time.time() - LOG.debug("Starting background tasks") + LOG.info("Starting background tasks...") # Find modified and not modified workloads, we only have to calculate for the # modified workloads. modified_workloads = Workload.objects.filter(status=WorkloadStatusType.MODIFIED) + num_modified = modified_workloads.count() non_modified_workloads = Workload.objects.filter(status=WorkloadStatusType.PROCESSED) non_modified_workloads = list(non_modified_workloads.values_list('pk', flat=True)) last_pipeline_run = PipelineRun.objects.get_latest() + LOG.debug("Workloads: # modified: %s, # processed: %s, # total: %s", + num_modified, len(non_modified_workloads), + Workload.objects.all().count()) - if len(modified_workloads) == 0: + if num_modified == 0: # No previous workload data yet. Try again later. - LOG.debug("No workload data yet. Ending background tasks") + LOG.info("No modified workload data yet. Ending background tasks.") return # Create new entry in PipelineRun table to store the output of each of @@ -59,29 +63,39 @@ def run_background_tasks(): pipeline_run_obj = PipelineRun(start_time=now(), end_time=None) pipeline_run_obj.save() - for workload in modified_workloads: - + for i, workload in enumerate(modified_workloads): + workload.status = WorkloadStatusType.PROCESSING + workload.save() wkld_results = Result.objects.filter(workload=workload) - if wkld_results.exists() is False: + num_wkld_results = wkld_results.count() + workload_name = '{}@{}.{}'.format(workload.dbms.key, workload.project.name, workload.name) + + LOG.info("Starting workload %s (%s/%s, # results: %s)...", workload_name, + i + 1, num_modified, num_wkld_results) + + if num_wkld_results == 0: # delete the workload - LOG.debug("Deleting workload %d because it has no results.", workload.id) + LOG.info("Deleting workload %s because it has no results.", workload_name) workload.delete() continue - # Check that there are enough results in the workload - if wkld_results.count() < MIN_WORKLOAD_RESULTS_COUNT: - LOG.debug("Not enough results in workload %d (only %d results).", workload.id, - wkld_results.count()) + elif num_wkld_results < MIN_WORKLOAD_RESULTS_COUNT: + # Check that there are enough results in the workload + LOG.info("Not enough results in workload %s (# results: %s, # required: %s).", + workload_name, num_wkld_results, MIN_WORKLOAD_RESULTS_COUNT) continue - workload.status = WorkloadStatusType.PROCESSING - workload.save() - LOG.debug("Aggregating data for workload %d", workload.id) + LOG.info("Aggregating data for workload %s...", workload_name) # Aggregate the knob & metric data for this workload knob_data, metric_data = aggregate_data(wkld_results) - # LOG.debug("knob_data: %s", str(knob_data)) - # LOG.debug("metric_data: %s", str(metric_data)) + LOG.debug("Aggregated knob data: rowlabels=%s, columnlabels=%s, data=%s.", + len(knob_data['rowlabels']), len(knob_data['columnlabels']), + knob_data['data'].shape) + LOG.debug("Aggregated metric data: rowlabels=%s, columnlabels=%s, data=%s.", + len(metric_data['rowlabels']), len(metric_data['columnlabels']), + metric_data['data'].shape) + LOG.info("Done aggregating data for workload %s.", workload_name) # Knob_data and metric_data are 2D numpy arrays. Convert them into a # JSON-friendly (nested) lists and then save them as new PipelineData @@ -109,9 +123,11 @@ def run_background_tasks(): # Execute the Workload Characterization task to compute the list of # pruned metrics for this workload and save them in a new PipelineData # object. - LOG.debug("Pruning metrics for workload %d.", workload.id) + LOG.info("Pruning metrics for workload %s...", workload_name) pruned_metrics = run_workload_characterization(metric_data=metric_data) - LOG.debug("pruned_metrics: %s", str(pruned_metrics)) + LOG.info("Done pruning metrics for workload %s (# pruned metrics: %s).\n\n" + "Pruned metrics: %s\n", workload_name, len(pruned_metrics), + pruned_metrics) pruned_metrics_entry = PipelineData(pipeline_run=pipeline_run_obj, task_type=PipelineTaskType.PRUNED_METRICS, workload=workload, @@ -131,11 +147,12 @@ def run_background_tasks(): # Execute the Knob Identification task to compute an ordered list of knobs # ranked by their impact on the DBMS's performance. Save them in a new # PipelineData object. - LOG.debug("Ranking knobs for workload %d.", workload.id) + LOG.info("Ranking knobs for workload %s...", workload_name) ranked_knobs = run_knob_identification(knob_data=knob_data, metric_data=pruned_metric_data, dbms=workload.dbms) - LOG.debug("ranked_knobs: %s", str(ranked_knobs)) + LOG.info("Done ranking knobs for workload %s (# ranked knobs: %s).\n\n" + "Ranked knobs: %s\n", workload_name, len(ranked_knobs), ranked_knobs) ranked_knobs_entry = PipelineData(pipeline_run=pipeline_run_obj, task_type=PipelineTaskType.RANKED_KNOBS, workload=workload, @@ -145,7 +162,10 @@ def run_background_tasks(): workload.status = WorkloadStatusType.PROCESSED workload.save() - LOG.debug("Finished processing modified workloads") + LOG.info("Done processing workload %s (%s/%s).", workload_name, i + 1, + num_modified) + + LOG.info("Finished processing %s modified workloads.", num_modified) non_modified_workloads = Workload.objects.filter(pk__in=non_modified_workloads) # Update the latest pipeline data for the non modified workloads to have this pipeline run @@ -157,8 +177,8 @@ def run_background_tasks(): # the background tasks pipeline_run_obj.end_time = now() pipeline_run_obj.save() - LOG.debug("Finished background tasks") save_execution_time(start_ts, "run_background_tasks") + LOG.info("Finished background tasks (%.0f seconds).", time.time() - start_ts) def aggregate_data(wkld_results): diff --git a/server/website/website/types.py b/server/website/website/types.py index c8c39f8..e452f2f 100644 --- a/server/website/website/types.py +++ b/server/website/website/types.py @@ -191,6 +191,16 @@ class AlgorithmType(BaseType): DNN: 'Deep Neural Network', } + SHORT_NAMES = { + GPR: 'GPR', + DDPG: 'DDPG', + DNN: 'DNN', + } + + @classmethod + def short_name(cls, ctype): + return cls.SHORT_NAMES[ctype] + class StorageType(BaseType): SSD = 5 diff --git a/server/website/website/utils.py b/server/website/website/utils.py index 5a45d2b..bd29b26 100644 --- a/server/website/website/utils.py +++ b/server/website/website/utils.py @@ -315,7 +315,7 @@ class ConversionUtil(object): return value min_suffix = system[i + 1][1] - LOG.warning('The value is smaller than the min factor: %s < %s%s. ' + LOG.warning('The value is smaller than the min factor: %s < %s (1%s). ' 'Setting min_suffix=%s...', value, factor, suffix, min_suffix) else: min_factor = factor