diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 9b09fc2..b4dfc36 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -179,7 +179,9 @@ def save_execution_time(start_ts, fn, result): def choose_value_in_range(num1, num2): - if num1 > 10 * num2 or num2 > 10 * num1: + if num2 < 10 and num1 < 10: + mean = min(num1, num2) + elif num1 > 10 * num2 or num2 > 10 * num1: # It is important to add 1 to avoid log(0) log_num1 = np.log(num1 + 1) log_num2 = np.log(num2 + 1) @@ -294,9 +296,16 @@ def preprocessing(result_id, algorithm): # this data in order to make a configuration recommendation (until we # implement a sampling technique to generate new training data). has_pipeline_data = PipelineData.objects.filter(workload=newest_result.workload).exists() - if not has_pipeline_data or session.tuning_session == 'lhs': + session_results = Result.objects.filter(session=session) + useful_results_cnt = len(session_results) + for result in session_results: + if 'range_test' in result.metric_data.name or 'default' in result.metric_data.name: + useful_results_cnt -= 1 + if not has_pipeline_data or useful_results_cnt == 0 or session.tuning_session == 'lhs': if not has_pipeline_data and session.tuning_session == 'tuning_session': LOG.debug("Background tasks haven't ran for this workload yet, picking data with lhs.") + if useful_results_cnt == 0 and session.tuning_session == 'tuning_session': + LOG.debug("Not enough data in this session, picking data with lhs.") all_samples = JSONUtil.loads(session.lhs_samples) if len(all_samples) == 0: @@ -448,20 +457,25 @@ def train_ddpg(train_ddpg_input): params = JSONUtil.loads(session.hyperparameters) session_results = Result.objects.filter(session=session, creation_time__lt=result.creation_time) + useful_results_cnt = len(session_results) + first_valid_result = -1 for i, result in enumerate(session_results): - if 'range_test' in result.metric_data.name: - session_results.pop(i) + if 'range_test' in result.metric_data.name or 'default' in result.metric_data.name: + useful_results_cnt -= 1 + else: + last_valid_result = i + first_valid_result = i if first_valid_result == -1 else first_valid_result target_data = {} target_data['newest_result_id'] = result_id # Extract data from result and previous results result = Result.objects.filter(pk=result_id) - if len(session_results) == 0: + if useful_results_cnt == 0: base_result_id = result_id prev_result_id = result_id else: - base_result_id = session_results[0].pk - prev_result_id = session_results[len(session_results) - 1].pk + base_result_id = session_results[first_valid_result].pk + prev_result_id = session_results[last_valid_result].pk base_result = Result.objects.filter(pk=base_result_id) prev_result = Result.objects.filter(pk=prev_result_id) diff --git a/server/website/website/tasks/periodic_tasks.py b/server/website/website/tasks/periodic_tasks.py index ca22b19..f4cb141 100644 --- a/server/website/website/tasks/periodic_tasks.py +++ b/server/website/website/tasks/periodic_tasks.py @@ -80,8 +80,8 @@ def run_background_tasks(): LOG.debug("Aggregating data for workload %d", workload.id) # Aggregate the knob & metric data for this workload knob_data, metric_data = aggregate_data(wkld_results) - LOG.debug("knob_data: %s", str(knob_data)) - LOG.debug("metric_data: %s", str(metric_data)) + # LOG.debug("knob_data: %s", str(knob_data)) + # LOG.debug("metric_data: %s", str(metric_data)) # Knob_data and metric_data are 2D numpy arrays. Convert them into a # JSON-friendly (nested) lists and then save them as new PipelineData @@ -184,7 +184,7 @@ def aggregate_data(wkld_results): # - 'y_columnlabels': a list of the metric names corresponding to the # columns in the metric_data matrix start_ts = time.time() - aggregated_data = DataUtil.aggregate_data(wkld_results) + aggregated_data = DataUtil.aggregate_data(wkld_results, ignore=['range_test', 'default', '*']) # Separate knob & workload data into two "standard" dictionaries of the # same form diff --git a/server/website/website/utils.py b/server/website/website/utils.py index 8fc20ac..5a45d2b 100644 --- a/server/website/website/utils.py +++ b/server/website/website/utils.py @@ -145,15 +145,17 @@ class DataUtil(object): return np.array(minvals), np.array(maxvals) @staticmethod - def aggregate_data(results): + def aggregate_data(results, ignore=None): + if ignore is None: + ignore = ['range_test', 'default'] knob_labels = list(JSONUtil.loads(results[0].knob_data.data).keys()) metric_labels = list(JSONUtil.loads(results[0].metric_data.data).keys()) - X_matrix = np.empty((len(results), len(knob_labels)), dtype=float) - y_matrix = np.empty((len(results), len(metric_labels)), dtype=float) - rowlabels = np.empty(len(results), dtype=int) + X_matrix = [] + y_matrix = [] + rowlabels = [] - for i, result in enumerate(results): - if 'range_test' in result.metric_data.name: + for result in results: + if any(symbol in result.metric_data.name for symbol in ignore): continue param_data = JSONUtil.loads(result.knob_data.data) if len(param_data) != len(knob_labels): @@ -167,13 +169,13 @@ class DataUtil(object): ("Incorrect number of metrics " "(expected={}, actual={})").format(len(metric_labels), len(metric_data))) - X_matrix[i, :] = [param_data[l] for l in knob_labels] - y_matrix[i, :] = [metric_data[l] for l in metric_labels] - rowlabels[i] = result.pk + X_matrix.append([param_data[l] for l in knob_labels]) + y_matrix.append([metric_data[l] for l in metric_labels]) + rowlabels.append(result.pk) return { - 'X_matrix': X_matrix, - 'y_matrix': y_matrix, - 'rowlabels': rowlabels.tolist(), + 'X_matrix': np.array(X_matrix, dtype=np.float64), + 'y_matrix': np.array(y_matrix, dtype=np.float64), + 'rowlabels': rowlabels, 'X_columnlabels': knob_labels, 'y_columnlabels': metric_labels, } diff --git a/server/website/website/views.py b/server/website/website/views.py index 37a6739..71e8c71 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -495,19 +495,23 @@ def handle_result_files(session, files, execution_times=None): continue target_value = JSONUtil.loads(past_metric.data)[session.target_objective] if metric_meta.improvement == target_objectives.MORE_IS_BETTER: - if worst_target_value is None or target_value < worst_target_value: + if '*' in worst_metric.name or target_value < worst_target_value: worst_target_value = target_value worst_metric = past_metric else: - if worst_target_value is None or target_value > worst_target_value: + if '*' in worst_metric.name or target_value > worst_target_value: worst_target_value = target_value worst_metric = past_metric - LOG.debug("Worst target value so far is: %d", worst_target_value) - penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2) - if metric_meta.improvement == target_objectives.MORE_IS_BETTER: - penalty_target_value = worst_target_value / penalty_factor + if '*' in worst_metric.name: + LOG.debug("All previous results are invalid") + penalty_target_value = worst_target_value else: - penalty_target_value = worst_target_value * penalty_factor + LOG.debug("Worst target value so far is: %d", worst_target_value) + penalty_factor = JSONUtil.loads(session.hyperparameters).get('PENALTY_FACTOR', 2) + if metric_meta.improvement == target_objectives.MORE_IS_BETTER: + penalty_target_value = worst_target_value / penalty_factor + else: + penalty_target_value = worst_target_value * penalty_factor # Update the past invalid results for past_metric in past_metrics: @@ -647,6 +651,11 @@ def handle_result_files(session, files, execution_times=None): # We tag the metric as invalid, so later they will be set to the worst result metric_data.name = 'range_test_' + metric_data.name + '*' metric_data.save() + if 'status' in summary and summary['status'] == "default": + # The metric should not be used for learning because the driver did not run workload + # We tag the metric as invalid, so later they will be set to the worst result + metric_data.name = 'default_' + metric_data.name + metric_data.save() # Create a new workload if this one does not already exist workload = Workload.objects.create_workload( @@ -1239,6 +1248,7 @@ def give_result(request, upload_code): # pylint: disable=unused-argument elif group_res.ready(): assert group_res.successful() + latest_result = Result.objects.filter(session=session).latest('creation_time') next_config = JSONUtil.loads(latest_result.next_configuration) response.update( next_config, celery_status='SUCCESS',