Create/format/save the recommended config for the latest result before the on_success method.

This commit is contained in:
dvanaken 2020-01-07 16:10:30 -05:00 committed by Dana Van Aken
parent 8001b658c9
commit 8cec62160f
3 changed files with 66 additions and 78 deletions

View File

@ -109,19 +109,9 @@ class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-metho
def on_success(self, retval, task_id, args, kwargs):
super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs)
result_id = retval['result_id']
result = Result.objects.get(pk=result_id)
# Replace result with formatted result
formatted_params = db.parser.format_dbms_knobs(result.dbms.pk, retval['recommendation'])
# Create next configuration to try
config = db.parser.create_knob_configuration(result.dbms.pk, formatted_params)
task_meta = TaskMeta.objects.get(task_id=task_id)
retval['recommendation'] = config
task_meta.result = retval
task_meta.save()
result.next_configuration = JSONUtil.dumps(retval)
result.save()
def clean_knob_data(knob_matrix, knob_labels, session):
@ -419,13 +409,31 @@ def train_ddpg(result_id):
return result_info
def create_and_save_recommendation(recommended_knobs, result, status, **kwargs):
dbms_id = result.dbms.pk
formatted_knobs = db.parser.format_dbms_knobs(dbms_id, recommended_knobs)
config = db.parser.create_knob_configuration(dbms_id, formatted_knobs)
retval = dict(**kwargs)
retval.update(
status=status,
result_id=result.pk,
recommendation=config,
)
result.next_configuration = JSONUtil.dumps(retval)
result.save()
return retval
@task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg')
def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name
LOG.info('Use ddpg to recommend configuration')
result_id = result_info['newest_result_id']
result = Result.objects.filter(pk=result_id)
session = Result.objects.get(pk=result_id).session
agg_data = DataUtil.aggregate_data(result)
result_list = Result.objects.filter(pk=result_id)
result = result_list.first()
session = result.session
agg_data = DataUtil.aggregate_data(result_list)
metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session)
metric_data = metric_data.flatten()
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
@ -447,11 +455,10 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n
knob_bounds = np.vstack(DataUtil.get_knob_bounds(knob_labels, session))
knob_data = MinMaxScaler().fit(knob_bounds).inverse_transform(knob_data.reshape(1, -1))[0]
conf_map = {k: knob_data[i] for i, k in enumerate(knob_labels)}
conf_map_res = {}
conf_map_res['status'] = 'good'
conf_map_res['result_id'] = result_id
conf_map_res['recommendation'] = conf_map
conf_map_res['info'] = 'INFO: ddpg'
conf_map_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result,
status='good', info='INFO: ddpg')
return conf_map_res
@ -638,20 +645,18 @@ def combine_workload(target_data):
def configuration_recommendation(recommendation_input):
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called')
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
if target_data['bad'] is True:
target_data_res = dict(
status='bad',
result_id=target_data['newest_result_id'],
info='WARNING: no training data, the config is generated randomly',
recommendation=target_data['config_recommend'],
target_data_res = create_and_save_recommendation(
recommended_knobs=target_data['config_recommend'], result=newest_result,
status='bad', info='WARNING: no training data, the config is generated randomly',
pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Skipping configuration recommendation.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
return target_data_res
latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run'])
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min = combine_workload(target_data)
@ -756,12 +761,10 @@ def configuration_recommendation(recommendation_input):
best_config = np.maximum(best_config, X_min_inv)
conf_map = {k: best_config[i] for i, k in enumerate(X_columnlabels)}
conf_map_res = dict(
status='good',
result_id=target_data['newest_result_id'],
recommendation=conf_map,
info='INFO: training data size is {}'.format(X_scaled.shape[0]),
pipeline_run=latest_pipeline_run.pk)
conf_map_res = create_and_save_recommendation(
recommended_knobs=conf_map, result=newest_result,
status='good', info='INFO: training data size is {}'.format(X_scaled.shape[0]),
pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Finished selecting the next config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(conf_map_res, pprint=True))

View File

@ -17,6 +17,7 @@ from random import choice
import numpy as np
from django.contrib.auth.models import User
from django.db.models import Case, When
from django.utils.text import capfirst
from django_db_logger.models import StatusLog
from djcelery.models import TaskMeta
@ -72,34 +73,34 @@ class MediaUtil(object):
class TaskUtil(object):
@staticmethod
def get_tasks(tasks):
if not tasks:
return []
task_ids = tasks.split(',')
res = []
for task_id in task_ids:
task = TaskMeta.objects.filter(task_id=task_id)
if len(task) == 0:
continue # Task Not Finished
res.append(task[0])
return res
def get_tasks(task_ids):
task_ids = task_ids or []
if isinstance(task_ids, str):
task_ids = task_ids.split(',')
preserved = Case(*[
When(task_id=task_id, then=pos) for pos, task_id in enumerate(task_ids)])
return TaskMeta.objects.filter(task_id__in=task_ids).order_by(preserved)
@staticmethod
def get_task_status(tasks):
if len(tasks) == 0:
if not tasks:
return None, 0
overall_status = 'SUCCESS'
num_completed = 0
for task in tasks:
status = task.status
if status == "SUCCESS":
num_completed += 1
elif status in ['FAILURE', 'REVOKED', 'RETRY']:
elif status in ('FAILURE', 'REVOKED', 'RETRY'):
overall_status = status
break
else:
assert status in ['PENDING', 'RECEIVED', 'STARTED']
if status not in ('PENDING', 'RECEIVED', 'STARTED'):
LOG.warning("Task %s: invalid task status: '%s' (task_id=%s)",
task.id, status, task.task_id)
overall_status = status
return overall_status, num_completed

View File

@ -1067,14 +1067,6 @@ def get_timeline_data(request):
# get the lastest result
def give_result(request, upload_code): # pylint: disable=unused-argument
def _failed_response(_latest_result, _tasks, _num_completed, _status, _msg):
_msg = "{}\nSTATUS: {}\nRESULT ID: {}\n".format(_msg, _status, _latest_result)
if tasks:
_failed_task_idx = min(len(_tasks) - 1, _num_completed + 1)
_failed_task = _tasks[_failed_task_idx]
_msg += "TRACEBACK: {}".format(_failed_task.traceback)
return HttpResponse(_msg, status=400)
try:
session = Session.objects.get(upload_code=upload_code)
except Session.DoesNotExist:
@ -1084,39 +1076,31 @@ def give_result(request, upload_code): # pylint: disable=unused-argument
latest_result = Result.objects.filter(session=session).latest('creation_time')
tasks = TaskUtil.get_tasks(latest_result.task_ids)
overall_status, num_completed = TaskUtil.get_task_status(tasks)
response = {
'celery_status': overall_status,
'result_id': latest_result.pk,
'message': '',
'errors': [],
}
if overall_status == 'SUCCESS':
# The task status is set to SUCCESS before the next config is saved in
# the latest result so we must wait for it to be updated
max_wait_sec = 20
elapsed_sec = 0
while not latest_result.next_configuration and elapsed_sec <= max_wait_sec:
time.sleep(5)
elapsed_sec += 5
latest_result = Result.objects.get(id=latest_result.pk)
LOG.debug("Waiting for the next config for result %s to be updated... "
"(elapsed: %ss): %s", latest_result.pk, elapsed_sec,
model_to_dict(latest_result))
if not latest_result.next_configuration:
LOG.warning(
"Failed to get the next configuration from the latest result after %ss: %s",
elapsed_sec, model_to_dict(latest_result))
overall_status = 'FAILURE'
response = _failed_response(latest_result, tasks, num_completed, overall_status,
'Failed to get the next configuration.')
else:
response = HttpResponse(JSONUtil.dumps(latest_result.next_configuration),
content_type='application/json')
response.update(JSONUtil.loads(latest_result.next_configuration),
message='Celery successfully recommended the next configuration')
status_code = 200
elif overall_status in ('FAILURE', 'REVOKED', 'RETRY'):
response = _failed_response(latest_result, tasks, num_completed, overall_status,
'Celery failed to get the next configuration.')
task_errors = [t.traceback for t in tasks if t.traceback]
if task_errors:
LOG.warning('\n\n'.join(task_errors))
response.update(message='Celery failed to get the next configuration', errors=task_errors)
status_code = 400
else: # overall_status in ('PENDING', 'RECEIVED', 'STARTED'):
response = HttpResponse("{}: Result not ready".format(overall_status), status=202)
response.update(message='Result not ready')
status_code = 202
return response
return HttpResponse(JSONUtil.dumps(response, pprint=True), status=status_code,
content_type='application/json')
# get the lastest result