Store execution times

This commit is contained in:
arifiorino 2020-02-19 02:37:44 +00:00 committed by Dana Van Aken
parent f7e22ff5bb
commit a2a77f9a75
2 changed files with 37 additions and 3 deletions

View File

@ -8,6 +8,7 @@ import queue
import numpy as np
import tensorflow as tf
import gpflow
import time
from pyDOE import lhs
from scipy.stats import uniform
@ -26,11 +27,12 @@ from analysis.gpr.optimize import tf_optimize
from analysis.gpr.predict import gpflow_predict
from analysis.preprocessing import Bin, DummyEncoder
from analysis.constraints import ParamConstraintHelper
from django.utils.datetime_safe import datetime
from website.models import PipelineData, PipelineRun, Result, Workload, SessionKnob, MetricCatalog
from website import db
from website.types import PipelineTaskType, AlgorithmType, VarType
from website.utils import DataUtil, JSONUtil
from website.settings import ENABLE_DUMMY_ENCODER
from website.settings import ENABLE_DUMMY_ENCODER, TIME_ZONE
LOG = get_task_logger(__name__)
@ -170,9 +172,17 @@ def clean_metric_data(metric_matrix, metric_labels, session):
del metric_labels[i]
return matrix, metric_labels
def save_execution_time(start_ts, fn, result):
end_ts = time.time()
exec_time = end_ts - start_ts
start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE))
ExecutionTime.objects.create(module="celery.async_tasks", function=fn, tag="",
start_time=start_time, execution_time=exec_time, result=result)
@shared_task(base=IgnoreResultTask, name='aggregate_target_results')
def aggregate_target_results(result_id, algorithm):
start_ts = time.time()
# Check that we've completed the background tasks at least once. We need
# this data in order to make a configuration recommendation (until we
# implement a sampling technique to generate new training data).
@ -236,7 +246,7 @@ def aggregate_target_results(result_id, algorithm):
LOG.debug('%s: Finished aggregating target results.\n\n',
AlgorithmType.name(algorithm))
save_execution_time(start_ts, "aggregate_target_results", Result.objects.get(pk=result_id))
return agg_data, algorithm
@ -311,6 +321,7 @@ def gen_lhs_samples(knobs, nsamples):
@shared_task(base=IgnoreResultTask, name='train_ddpg')
def train_ddpg(result_id):
start_ts = time.time()
LOG.info('Add training data to ddpg and train ddpg')
result = Result.objects.get(pk=result_id)
session = Result.objects.get(pk=result_id).session
@ -409,6 +420,7 @@ def train_ddpg(result_id):
ddpg.update()
session.ddpg_actor_model, session.ddpg_critic_model = ddpg.get_model()
session.ddpg_reply_memory = ddpg.replay_memory.get()
save_execution_time(start_ts, "train_ddpg", Result.objects.get(pk=result_id))
session.save()
return result_info
@ -432,6 +444,7 @@ def create_and_save_recommendation(recommended_knobs, result, status, **kwargs):
@shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg')
def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name
start_ts = time.time()
LOG.info('Use ddpg to recommend configuration')
result_id = result_info['newest_result_id']
result_list = Result.objects.filter(pk=result_id)
@ -465,6 +478,7 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n
conf_map_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result,
status='good', info='INFO: ddpg')
save_execution_time(start_ts, "configuration_recommendation_ddpg", Result.objects.get(pk=result_id))
return conf_map_res
@ -659,6 +673,7 @@ def combine_workload(target_data):
@shared_task(base=ConfigurationRecommendation, name='configuration_recommendation')
def configuration_recommendation(recommendation_input):
start_ts = time.time()
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called')
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
@ -786,6 +801,7 @@ def configuration_recommendation(recommendation_input):
LOG.debug('%s: Finished selecting the next config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(conf_map_res, pprint=True))
save_execution_time(start_ts, "configuration_recommendation", Result.objects.get(pk=result_id))
return conf_map_res
@ -798,6 +814,7 @@ def load_data_helper(filtered_pipeline_data, workload, task_type):
@shared_task(base=MapWorkloadTask, name='map_workload')
def map_workload(map_workload_input):
start_ts = time.time()
target_data, algorithm = map_workload_input
if target_data['bad']:
@ -979,4 +996,5 @@ def map_workload(map_workload_input):
LOG.debug('%s: Finished mapping the workload.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
save_execution_time(start_ts, "map_workload", Result.objects.get(pk=result_id))
return target_data, algorithm

View File

@ -5,6 +5,7 @@
#
import copy
import numpy as np
import time
from celery import shared_task
from celery.utils.log import get_task_logger
@ -17,8 +18,9 @@ from analysis.lasso import LassoPath
from analysis.preprocessing import (Bin, get_shuffle_indices,
DummyEncoder,
consolidate_columnlabels)
from django.utils.datetime_safe import datetime
from website.models import PipelineData, PipelineRun, Result, Workload
from website.settings import ENABLE_DUMMY_ENCODER
from website.settings import ENABLE_DUMMY_ENCODER, TIME_ZONE
from website.types import PipelineTaskType, WorkloadStatusType
from website.utils import DataUtil, JSONUtil
@ -27,9 +29,16 @@ LOG = get_task_logger(__name__)
# Only process workload containing this minimum amount of results
MIN_WORKLOAD_RESULTS_COUNT = 5
def save_execution_time(start_ts, fn):
end_ts = time.time()
exec_time = end_ts - start_ts
start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE))
ExecutionTime.objects.create(module="celery.periodic_tasks", function=fn, tag="",
start_time=start_time, execution_time=exec_time, result=None)
@shared_task(name="run_background_tasks")
def run_background_tasks():
start_ts = time.time()
LOG.debug("Starting background tasks")
# Find modified and not modified workloads, we only have to calculate for the
# modified workloads.
@ -147,6 +156,7 @@ def run_background_tasks():
pipeline_run_obj.end_time = now()
pipeline_run_obj.save()
LOG.debug("Finished background tasks")
save_execution_time(start_ts, "run_background_tasks")
def aggregate_data(wkld_results):
@ -171,6 +181,7 @@ def aggregate_data(wkld_results):
# columns in the knob_data matrix
# - 'y_columnlabels': a list of the metric names corresponding to the
# columns in the metric_data matrix
start_ts = time.time()
aggregated_data = DataUtil.aggregate_data(wkld_results)
# Separate knob & workload data into two "standard" dictionaries of the
@ -188,6 +199,7 @@ def aggregate_data(wkld_results):
}
# Return the knob & metric data
save_execution_time(start_ts, "aggregate_data")
return knob_data, metric_data
@ -201,6 +213,7 @@ def run_workload_characterization(metric_data):
# - 'rowlabels': a list of identifiers for the rows in the matrix
# - 'columnlabels': a list of the metric names corresponding to
# the columns in the data matrix
start_ts = time.time()
matrix = metric_data['data']
columnlabels = metric_data['columnlabels']
@ -250,6 +263,7 @@ def run_workload_characterization(metric_data):
pruned_metrics = kmeans_models.cluster_map_[gapk.optimal_num_clusters_].get_closest_samples()
# Return pruned metrics
save_execution_time(start_ts, "run_workload_characterization")
return pruned_metrics
@ -268,6 +282,7 @@ def run_knob_identification(knob_data, metric_data, dbms):
# When running the lasso algorithm, the knob_data matrix is set of
# independent variables (X) and the metric_data is the set of
# dependent variables (y).
start_ts = time.time()
knob_matrix = knob_data['data']
knob_columnlabels = knob_data['columnlabels']
@ -330,4 +345,5 @@ def run_knob_identification(knob_data, metric_data, dbms):
encoded_knobs = lasso_model.get_ranked_features()
consolidated_knobs = consolidate_columnlabels(encoded_knobs)
save_execution_time(start_ts, "run_knob_identification")
return consolidated_knobs