diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index c2133ed..22cf3d1 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -8,6 +8,7 @@ import queue import numpy as np import tensorflow as tf import gpflow +import time from pyDOE import lhs from scipy.stats import uniform @@ -26,11 +27,12 @@ from analysis.gpr.optimize import tf_optimize from analysis.gpr.predict import gpflow_predict from analysis.preprocessing import Bin, DummyEncoder from analysis.constraints import ParamConstraintHelper +from django.utils.datetime_safe import datetime from website.models import PipelineData, PipelineRun, Result, Workload, SessionKnob, MetricCatalog from website import db from website.types import PipelineTaskType, AlgorithmType, VarType from website.utils import DataUtil, JSONUtil -from website.settings import ENABLE_DUMMY_ENCODER +from website.settings import ENABLE_DUMMY_ENCODER, TIME_ZONE LOG = get_task_logger(__name__) @@ -170,9 +172,17 @@ def clean_metric_data(metric_matrix, metric_labels, session): del metric_labels[i] return matrix, metric_labels +def save_execution_time(start_ts, fn, result): + end_ts = time.time() + exec_time = end_ts - start_ts + start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE)) + ExecutionTime.objects.create(module="celery.async_tasks", function=fn, tag="", + start_time=start_time, execution_time=exec_time, result=result) + @shared_task(base=IgnoreResultTask, name='aggregate_target_results') def aggregate_target_results(result_id, algorithm): + start_ts = time.time() # Check that we've completed the background tasks at least once. We need # this data in order to make a configuration recommendation (until we # implement a sampling technique to generate new training data). @@ -236,7 +246,7 @@ def aggregate_target_results(result_id, algorithm): LOG.debug('%s: Finished aggregating target results.\n\n', AlgorithmType.name(algorithm)) - + save_execution_time(start_ts, "aggregate_target_results", Result.objects.get(pk=result_id)) return agg_data, algorithm @@ -311,6 +321,7 @@ def gen_lhs_samples(knobs, nsamples): @shared_task(base=IgnoreResultTask, name='train_ddpg') def train_ddpg(result_id): + start_ts = time.time() LOG.info('Add training data to ddpg and train ddpg') result = Result.objects.get(pk=result_id) session = Result.objects.get(pk=result_id).session @@ -409,6 +420,7 @@ def train_ddpg(result_id): ddpg.update() session.ddpg_actor_model, session.ddpg_critic_model = ddpg.get_model() session.ddpg_reply_memory = ddpg.replay_memory.get() + save_execution_time(start_ts, "train_ddpg", Result.objects.get(pk=result_id)) session.save() return result_info @@ -432,6 +444,7 @@ def create_and_save_recommendation(recommended_knobs, result, status, **kwargs): @shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg') def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name + start_ts = time.time() LOG.info('Use ddpg to recommend configuration') result_id = result_info['newest_result_id'] result_list = Result.objects.filter(pk=result_id) @@ -465,6 +478,7 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n conf_map_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result, status='good', info='INFO: ddpg') + save_execution_time(start_ts, "configuration_recommendation_ddpg", Result.objects.get(pk=result_id)) return conf_map_res @@ -659,6 +673,7 @@ def combine_workload(target_data): @shared_task(base=ConfigurationRecommendation, name='configuration_recommendation') def configuration_recommendation(recommendation_input): + start_ts = time.time() target_data, algorithm = recommendation_input LOG.info('configuration_recommendation called') newest_result = Result.objects.get(pk=target_data['newest_result_id']) @@ -786,6 +801,7 @@ def configuration_recommendation(recommendation_input): LOG.debug('%s: Finished selecting the next config.\n\ndata=%s\n', AlgorithmType.name(algorithm), JSONUtil.dumps(conf_map_res, pprint=True)) + save_execution_time(start_ts, "configuration_recommendation", Result.objects.get(pk=result_id)) return conf_map_res @@ -798,6 +814,7 @@ def load_data_helper(filtered_pipeline_data, workload, task_type): @shared_task(base=MapWorkloadTask, name='map_workload') def map_workload(map_workload_input): + start_ts = time.time() target_data, algorithm = map_workload_input if target_data['bad']: @@ -979,4 +996,5 @@ def map_workload(map_workload_input): LOG.debug('%s: Finished mapping the workload.\n\ndata=%s\n', AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) + save_execution_time(start_ts, "map_workload", Result.objects.get(pk=result_id)) return target_data, algorithm diff --git a/server/website/website/tasks/periodic_tasks.py b/server/website/website/tasks/periodic_tasks.py index 87cad2d..572c9dd 100644 --- a/server/website/website/tasks/periodic_tasks.py +++ b/server/website/website/tasks/periodic_tasks.py @@ -5,6 +5,7 @@ # import copy import numpy as np +import time from celery import shared_task from celery.utils.log import get_task_logger @@ -17,8 +18,9 @@ from analysis.lasso import LassoPath from analysis.preprocessing import (Bin, get_shuffle_indices, DummyEncoder, consolidate_columnlabels) +from django.utils.datetime_safe import datetime from website.models import PipelineData, PipelineRun, Result, Workload -from website.settings import ENABLE_DUMMY_ENCODER +from website.settings import ENABLE_DUMMY_ENCODER, TIME_ZONE from website.types import PipelineTaskType, WorkloadStatusType from website.utils import DataUtil, JSONUtil @@ -27,9 +29,16 @@ LOG = get_task_logger(__name__) # Only process workload containing this minimum amount of results MIN_WORKLOAD_RESULTS_COUNT = 5 +def save_execution_time(start_ts, fn): + end_ts = time.time() + exec_time = end_ts - start_ts + start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE)) + ExecutionTime.objects.create(module="celery.periodic_tasks", function=fn, tag="", + start_time=start_time, execution_time=exec_time, result=None) @shared_task(name="run_background_tasks") def run_background_tasks(): + start_ts = time.time() LOG.debug("Starting background tasks") # Find modified and not modified workloads, we only have to calculate for the # modified workloads. @@ -147,6 +156,7 @@ def run_background_tasks(): pipeline_run_obj.end_time = now() pipeline_run_obj.save() LOG.debug("Finished background tasks") + save_execution_time(start_ts, "run_background_tasks") def aggregate_data(wkld_results): @@ -171,6 +181,7 @@ def aggregate_data(wkld_results): # columns in the knob_data matrix # - 'y_columnlabels': a list of the metric names corresponding to the # columns in the metric_data matrix + start_ts = time.time() aggregated_data = DataUtil.aggregate_data(wkld_results) # Separate knob & workload data into two "standard" dictionaries of the @@ -188,6 +199,7 @@ def aggregate_data(wkld_results): } # Return the knob & metric data + save_execution_time(start_ts, "aggregate_data") return knob_data, metric_data @@ -201,6 +213,7 @@ def run_workload_characterization(metric_data): # - 'rowlabels': a list of identifiers for the rows in the matrix # - 'columnlabels': a list of the metric names corresponding to # the columns in the data matrix + start_ts = time.time() matrix = metric_data['data'] columnlabels = metric_data['columnlabels'] @@ -250,6 +263,7 @@ def run_workload_characterization(metric_data): pruned_metrics = kmeans_models.cluster_map_[gapk.optimal_num_clusters_].get_closest_samples() # Return pruned metrics + save_execution_time(start_ts, "run_workload_characterization") return pruned_metrics @@ -268,6 +282,7 @@ def run_knob_identification(knob_data, metric_data, dbms): # When running the lasso algorithm, the knob_data matrix is set of # independent variables (X) and the metric_data is the set of # dependent variables (y). + start_ts = time.time() knob_matrix = knob_data['data'] knob_columnlabels = knob_data['columnlabels'] @@ -330,4 +345,5 @@ def run_knob_identification(knob_data, metric_data, dbms): encoded_knobs = lasso_model.get_ranked_features() consolidated_knobs = consolidate_columnlabels(encoded_knobs) + save_execution_time(start_ts, "run_knob_identification") return consolidated_knobs