ottertune/server/website/website/tasks/async_tasks.py

1155 lines
52 KiB
Python

#
# OtterTune - async_tasks.py
#
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
#
import random
import queue
import time
import numpy as np
import tensorflow as tf
import gpflow
from pyDOE import lhs
from scipy.stats import uniform
from pytz import timezone
from celery import shared_task, Task
from celery.utils.log import get_task_logger
from djcelery.models import TaskMeta
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from django.utils.datetime_safe import datetime
from analysis.ddpg.ddpg import DDPG
from analysis.gp import GPRNP
from analysis.gp_tf import GPRGD
from analysis.nn_tf import NeuralNet
from analysis.gpr import gpr_models
from analysis.gpr import ucb
from analysis.gpr.optimize import tf_optimize
from analysis.gpr.predict import gpflow_predict
from analysis.preprocessing import Bin, DummyEncoder
from analysis.constraints import ParamConstraintHelper
from website.models import (PipelineData, PipelineRun, Result, Workload, SessionKnob,
MetricCatalog, ExecutionTime, KnobCatalog)
from website import db
from website.types import PipelineTaskType, AlgorithmType, VarType
from website.utils import DataUtil, JSONUtil
from website.settings import ENABLE_DUMMY_ENCODER, TIME_ZONE
LOG = get_task_logger(__name__)
class BaseTask(Task): # pylint: disable=abstract-method
abstract = True
def __init__(self):
self.max_retries = 0
class IgnoreResultTask(BaseTask): # pylint: disable=abstract-method
abstract = True
def on_success(self, retval, task_id, args, kwargs):
super().on_success(retval, task_id, args, kwargs)
# Completely delete this result because it's huge and not interesting
task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = None
task_meta.save()
class MapWorkloadTask(BaseTask): # pylint: disable=abstract-method
abstract = True
def on_success(self, retval, task_id, args, kwargs):
super().on_success(retval, task_id, args, kwargs)
task_meta = TaskMeta.objects.get(task_id=task_id)
new_res = None
# Replace result with formatted result
if args[0][0]['status'] == 'good' and args[0][0]['mapped_workload'] is not None:
new_res = {
'scores': sorted(args[0][0]['scores'].items()),
'mapped_workload_id': args[0][0]['mapped_workload'],
}
task_meta.result = new_res
task_meta.save()
class ConfigurationRecommendation(BaseTask): # pylint: disable=abstract-method
def on_success(self, retval, task_id, args, kwargs):
super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs)
task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = retval
task_meta.save()
def clean_knob_data(knob_matrix, knob_labels, session):
# Filter and amend knob_matrix and knob_labels according to the tunable knobs in the session
knob_matrix = np.array(knob_matrix)
session_knobs = SessionKnob.objects.get_knobs_for_session(session)
knob_cat = [k['name'] for k in session_knobs]
if knob_cat == knob_labels:
# Nothing to do!
return knob_matrix, knob_labels
LOG.info("session_knobs: %s, knob_labels: %s, missing: %s, extra: %s", len(knob_cat),
len(knob_labels), len(set(knob_cat) - set(knob_labels)),
len(set(knob_labels) - set(knob_cat)))
nrows = knob_matrix.shape[0] # pylint: disable=unsubscriptable-object
new_labels = []
new_columns = []
for knob in session_knobs:
knob_name = knob['name']
if knob_name not in knob_labels:
# Add missing column initialized to knob's default value
default_val = knob['default']
try:
if knob['vartype'] == VarType.ENUM:
default_val = knob['enumvals'].split(',').index(default_val)
elif knob['vartype'] == VarType.BOOL:
default_val = str(default_val).lower() in ("on", "true", "yes", "0")
else:
default_val = float(default_val)
except ValueError:
default_val = 0
LOG.exception("Error parsing knob default: %s=%s", knob_name, default_val)
new_col = np.ones((nrows, 1), dtype=float) * default_val
new_lab = knob_name
else:
index = knob_labels.index(knob_name)
new_col = knob_matrix[:, index]
new_lab = knob_labels[index]
new_labels.append(new_lab)
new_columns.append(new_col)
new_matrix = np.hstack(new_columns).reshape(nrows, -1)
LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape,
len(new_labels), new_labels)
assert new_labels == knob_cat, \
"Expected knobs: {}\nActual knobs: {}\n".format(
knob_cat, new_labels)
assert new_matrix.shape == (nrows, len(knob_cat)), \
"Expected shape: {}, Actual shape: {}".format(
(nrows, len(knob_cat)), new_matrix.shape)
return new_matrix, new_labels
def clean_metric_data(metric_matrix, metric_labels, session):
# Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels
metric_objs = MetricCatalog.objects.filter(dbms=session.dbms)
metric_cat = [session.target_objective]
for metric_obj in metric_objs:
metric_cat.append(metric_obj.name)
missing_columns = sorted(set(metric_cat) - set(metric_labels))
unused_columns = set(metric_labels) - set(metric_cat)
LOG.debug("clean_metric_data added %d metrics and removed %d metric.", len(missing_columns),
len(unused_columns))
default_val = 0
metric_cat_size = len(metric_cat)
matrix = np.ones((len(metric_matrix), metric_cat_size)) * default_val
metric_labels_dict = {n: i for i, n in enumerate(metric_labels)}
# column labels in matrix has the same order as ones in metric catalog
# missing values are filled with default_val
for i, metric_name in enumerate(metric_cat):
if metric_name in metric_labels_dict:
index = metric_labels_dict[metric_name]
matrix[:, i] = metric_matrix[:, index]
LOG.debug(matrix.shape)
return matrix, metric_cat
def save_execution_time(start_ts, fn, result):
end_ts = time.time()
exec_time = end_ts - start_ts
start_time = datetime.fromtimestamp(int(start_ts), timezone(TIME_ZONE))
ExecutionTime.objects.create(module="celery.async_tasks", function=fn, tag="",
start_time=start_time, execution_time=exec_time, result=result)
def choose_value_in_range(num1, num2):
if num1 > 10 * num2 or num2 > 10 * num1:
# It is important to add 1 to avoid log(0)
log_num1 = np.log(num1 + 1)
log_num2 = np.log(num2 + 1)
mean = np.exp((log_num1 + log_num2) / 2)
else:
mean = (num1 + num2) / 2
return mean
def calc_next_knob_range(algorithm, knob_info, newest_result, good_val, bad_val, mode):
session = newest_result.session
knob = KnobCatalog.objects.get(name=knob_info['name'], dbms=session.dbms)
knob_file = newest_result.knob_data
knob_values = JSONUtil.loads(knob_file.data)
last_value = float(knob_values[knob.name])
session_knob = SessionKnob.objects.get(session=session, knob=knob)
# The collected knob value may be different from the expected value
# We use the expected value to set the knob range
expected_value = choose_value_in_range(good_val, bad_val)
session_results = Result.objects.filter(session=session).order_by("-id")
last_conf_value = ''
if len(session_results) > 1:
last_conf = session_results[1].next_configuration
if last_conf is not None:
last_conf = JSONUtil.loads(last_conf)["recommendation"]
# The names cannot be matched directly because of the 'global.' prefix
for name in last_conf.keys():
if name in knob.name:
last_conf_value = last_conf[name]
fomatted_expect_value = db.parser.format_dbms_knobs(
session.dbms.pk, {knob.name: expected_value})[knob.name]
# The last result was testing the of this knob
if last_conf_value == fomatted_expect_value:
# Fixme: '*' is a special symbol indicating that the knob setting is invalid
# In the future we can add a field to indicate if the knob setting is invalid
if '*' in knob_file.name:
if mode == 'lowerbound':
session_knob.lowerbound = str(int(expected_value))
else:
session_knob.upperbound = str(int(expected_value))
next_value = choose_value_in_range(expected_value, good_val)
else:
if mode == 'lowerbound':
session_knob.minval = str(int(expected_value))
# Terminate the search if the observed value is very different from the set one
if expected_value < last_value / 10:
session_knob.minval = str(int(last_value))
session_knob.lowerbound = str(int(last_value))
session_knob.save()
# The return value means we will not generate next config to test this knob
return False, None
else:
session_knob.maxval = str(int(expected_value))
next_value = choose_value_in_range(expected_value, bad_val)
session_knob.save()
else:
next_value = expected_value
if mode == 'lowerbound':
next_config = {knob.name: next_value}
else:
next_config = {knob.name: next_value}
target_data = {}
target_data['newest_result_id'] = newest_result.pk
target_data['status'] = 'range_test'
target_data['config_recommend'] = next_config
LOG.debug('%s: Generated a config to test %s of %s.\n\ndata=%s\n',
AlgorithmType.name(algorithm), mode, knob.name,
JSONUtil.dumps(target_data, pprint=True))
return True, target_data
@shared_task(base=IgnoreResultTask, name='preprocessing')
def preprocessing(result_id, algorithm):
start_ts = time.time()
target_data = {}
target_data['newest_result_id'] = result_id
newest_result = Result.objects.get(pk=result_id)
session = newest_result.session
knobs = SessionKnob.objects.get_knobs_for_session(session)
# Check that the minvals of tunable knobs are all decided
for knob_info in knobs:
if knob_info.get('lowerbound', None) is not None:
lowerbound = float(knob_info['lowerbound'])
minval = float(knob_info['minval'])
if lowerbound < minval * 0.7:
# We need to do binary search to determine the minval of this knob
successful, target_data = calc_next_knob_range(
algorithm, knob_info, newest_result, minval, lowerbound, 'lowerbound')
if successful:
save_execution_time(start_ts, "preprocessing", newest_result)
return result_id, algorithm, target_data
# Check that the maxvals of tunable knobs are all decided
for knob_info in knobs:
if knob_info.get('upperbound', None) is not None:
upperbound = float(knob_info['upperbound'])
maxval = float(knob_info['maxval'])
if upperbound > maxval * 1.3:
# We need to do binary search to determine the maxval of this knob
successful, target_data = calc_next_knob_range(
algorithm, knob_info, newest_result, maxval, upperbound, 'upperbound')
if successful:
save_execution_time(start_ts, "preprocessing", newest_result)
return result_id, algorithm, target_data
# Check that we've completed the background tasks at least once. We need
# this data in order to make a configuration recommendation (until we
# implement a sampling technique to generate new training data).
has_pipeline_data = PipelineData.objects.filter(workload=newest_result.workload).exists()
if not has_pipeline_data or session.tuning_session == 'lhs':
if not has_pipeline_data and session.tuning_session == 'tuning_session':
LOG.debug("Background tasks haven't ran for this workload yet, picking data with lhs.")
all_samples = JSONUtil.loads(session.lhs_samples)
if len(all_samples) == 0:
if session.tuning_session == 'lhs':
all_samples = gen_lhs_samples(knobs, 100)
else:
all_samples = gen_lhs_samples(knobs, 10)
LOG.debug('%s: Generated LHS.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(all_samples[:5], pprint=True))
samples = all_samples.pop()
target_data['status'] = 'lhs'
target_data['config_recommend'] = samples
session.lhs_samples = JSONUtil.dumps(all_samples)
session.save()
LOG.debug('%s: Got LHS config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
elif session.tuning_session == 'randomly_generate':
# generate a config randomly
random_knob_result = gen_random_data(knobs)
target_data['status'] = 'random'
target_data['config_recommend'] = random_knob_result
LOG.debug('%s: Generated a random config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
save_execution_time(start_ts, "preprocessing", newest_result)
return result_id, algorithm, target_data
@shared_task(base=IgnoreResultTask, name='aggregate_target_results')
def aggregate_target_results(aggregate_target_results_input):
start_ts = time.time()
result_id, algorithm, target_data = aggregate_target_results_input
# If the preprocessing method has already generated a config, bypass this method.
if 'config_recommend' in target_data:
assert 'newest_result_id' in target_data and 'status' in target_data
LOG.debug('%s: Skipping aggregate_target_results.\nData:\n%s\n\n',
AlgorithmType.name(algorithm), target_data)
return target_data, algorithm
newest_result = Result.objects.get(pk=result_id)
session = newest_result.session
# Aggregate all knob config results tried by the target so far in this
# tuning session and this tuning workload.
target_results = Result.objects.filter(session=session,
dbms=newest_result.dbms,
workload=newest_result.workload)
if len(target_results) == 0:
raise Exception('Cannot find any results for session_id={}, dbms_id={}'
.format(session, newest_result.dbms))
agg_data = DataUtil.aggregate_data(target_results)
agg_data['newest_result_id'] = result_id
agg_data['status'] = 'good'
# Clean knob data
cleaned_agg_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'],
session)
agg_data['X_matrix'] = np.array(cleaned_agg_data[0])
agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1])
LOG.debug('%s: Finished aggregating target results.\n\n',
AlgorithmType.name(algorithm))
save_execution_time(start_ts, "aggregate_target_results", Result.objects.get(pk=result_id))
return agg_data, algorithm
def gen_random_data(knobs):
random_knob_result = {}
for knob in knobs:
name = knob["name"]
if knob["vartype"] == VarType.BOOL:
flag = random.randint(0, 1)
if flag == 0:
random_knob_result[name] = False
else:
random_knob_result[name] = True
elif knob["vartype"] == VarType.ENUM:
enumvals = knob["enumvals"].split(',')
enumvals_len = len(enumvals)
rand_idx = random.randint(0, enumvals_len - 1)
random_knob_result[name] = rand_idx
elif knob["vartype"] == VarType.INTEGER:
random_knob_result[name] = random.randint(int(knob["minval"]), int(knob["maxval"]))
elif knob["vartype"] == VarType.REAL:
random_knob_result[name] = random.uniform(
float(knob["minval"]), float(knob["maxval"]))
elif knob["vartype"] == VarType.STRING:
random_knob_result[name] = "None"
elif knob["vartype"] == VarType.TIMESTAMP:
random_knob_result[name] = "None"
else:
raise Exception(
'Unknown variable type: {}'.format(knob["vartype"]))
return random_knob_result
def gen_lhs_samples(knobs, nsamples):
names = []
maxvals = []
minvals = []
types = []
for knob in knobs:
names.append(knob['name'])
maxvals.append(float(knob['maxval']))
minvals.append(float(knob['minval']))
types.append(knob['vartype'])
nfeats = len(knobs)
samples = lhs(nfeats, samples=nsamples, criterion='maximin')
maxvals = np.array(maxvals)
minvals = np.array(minvals)
scales = maxvals - minvals
for fidx in range(nfeats):
samples[:, fidx] = uniform(loc=minvals[fidx], scale=scales[fidx]).ppf(samples[:, fidx])
lhs_samples = []
for sidx in range(nsamples):
lhs_samples.append(dict())
for fidx in range(nfeats):
if types[fidx] == VarType.INTEGER:
lhs_samples[-1][names[fidx]] = int(round(samples[sidx][fidx]))
elif types[fidx] == VarType.BOOL:
lhs_samples[-1][names[fidx]] = int(round(samples[sidx][fidx]))
elif types[fidx] == VarType.ENUM:
lhs_samples[-1][names[fidx]] = int(round(samples[sidx][fidx]))
elif types[fidx] == VarType.REAL:
lhs_samples[-1][names[fidx]] = float(samples[sidx][fidx])
else:
LOG.debug("LHS type not supported: %s", types[fidx])
random.shuffle(lhs_samples)
return lhs_samples
@shared_task(base=IgnoreResultTask, name='train_ddpg')
def train_ddpg(train_ddpg_input):
start_ts = time.time()
result_id, algorithm, target_data = train_ddpg_input
# If the preprocessing method has already generated a config, bypass this method.
if 'config_recommend' in target_data:
assert 'newest_result_id' in target_data and 'status' in target_data
LOG.debug('Skipping training DDPG.\nData:\n%s\n\n', target_data)
return target_data, algorithm
LOG.info('Add training data to ddpg and train ddpg')
result = Result.objects.get(pk=result_id)
session = result.session
params = JSONUtil.loads(session.hyperparameters)
session_results = Result.objects.filter(session=session,
creation_time__lt=result.creation_time)
for i, result in enumerate(session_results):
if 'range_test' in result.metric_data.name:
session_results.pop(i)
target_data = {}
target_data['newest_result_id'] = result_id
# Extract data from result and previous results
result = Result.objects.filter(pk=result_id)
if len(session_results) == 0:
base_result_id = result_id
prev_result_id = result_id
else:
base_result_id = session_results[0].pk
prev_result_id = session_results[len(session_results) - 1].pk
base_result = Result.objects.filter(pk=base_result_id)
prev_result = Result.objects.filter(pk=prev_result_id)
agg_data = DataUtil.aggregate_data(result)
base_metric_data = (DataUtil.aggregate_data(base_result))['y_matrix'].flatten()
prev_metric_data = (DataUtil.aggregate_data(prev_result))['y_matrix'].flatten()
target_objective = session.target_objective
prev_obj_idx = [i for i, n in enumerate(agg_data['y_columnlabels']) if n == target_objective]
# Clean metric data
metric_data, metric_labels = clean_metric_data(agg_data['y_matrix'],
agg_data['y_columnlabels'], session)
metric_data = metric_data.flatten()
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
# Clean knob data
cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session)
knob_data = np.array(cleaned_knob_data[0])
knob_labels = np.array(cleaned_knob_data[1])
knob_bounds = np.vstack(DataUtil.get_knob_bounds(knob_labels.flatten(), session))
knob_data = MinMaxScaler().fit(knob_bounds).transform(knob_data)[0]
knob_num = len(knob_data)
metric_num = len(metric_data)
LOG.info('knob_num: %d, metric_num: %d', knob_num, metric_num)
# Filter ys by current target objective metric
target_obj_idx = [i for i, n in enumerate(metric_labels) if n == target_objective]
if len(target_obj_idx) == 0:
raise Exception(('Could not find target objective in metrics '
'(target_obj={})').format(target_objective))
elif len(target_obj_idx) > 1:
raise Exception(('Found {} instances of target objective in '
'metrics (target_obj={})').format(len(target_obj_idx),
target_objective))
objective = metric_data[target_obj_idx]
base_objective = base_metric_data[prev_obj_idx]
prev_objective = prev_metric_data[prev_obj_idx]
metric_meta = db.target_objectives.get_metric_metadata(session.dbms.pk,
session.target_objective)
# Calculate the reward
if params['DDPG_SIMPLE_REWARD']:
objective = objective / base_objective
if metric_meta[target_objective].improvement == '(less is better)':
reward = -objective
else:
reward = objective
else:
if metric_meta[target_objective].improvement == '(less is better)':
if objective - base_objective <= 0: # positive reward
reward = (np.square((2 * base_objective - objective) / base_objective) - 1)\
* abs(2 * prev_objective - objective) / prev_objective
else: # negative reward
reward = -(np.square(objective / base_objective) - 1) * objective / prev_objective
else:
if objective - base_objective > 0: # positive reward
reward = (np.square(objective / base_objective) - 1) * objective / prev_objective
else: # negative reward
reward = -(np.square((2 * base_objective - objective) / base_objective) - 1)\
* abs(2 * prev_objective - objective) / prev_objective
LOG.info('reward: %f', reward)
# Update ddpg
ddpg = DDPG(n_actions=knob_num, n_states=metric_num, alr=params['DDPG_ACTOR_LEARNING_RATE'],
clr=params['DDPG_CRITIC_LEARNING_RATE'], gamma=params['DDPG_GAMMA'],
batch_size=params['DDPG_BATCH_SIZE'],
a_hidden_sizes=params['DDPG_ACTOR_HIDDEN_SIZES'],
c_hidden_sizes=params['DDPG_CRITIC_HIDDEN_SIZES'],
use_default=params['DDPG_USE_DEFAULT'])
if session.ddpg_actor_model and session.ddpg_critic_model:
ddpg.set_model(session.ddpg_actor_model, session.ddpg_critic_model)
if session.ddpg_reply_memory:
ddpg.replay_memory.set(session.ddpg_reply_memory)
ddpg.add_sample(normalized_metric_data, knob_data, reward, normalized_metric_data)
for _ in range(params['DDPG_UPDATE_EPOCHS']):
ddpg.update()
session.ddpg_actor_model, session.ddpg_critic_model = ddpg.get_model()
session.ddpg_reply_memory = ddpg.replay_memory.get()
save_execution_time(start_ts, "train_ddpg", Result.objects.get(pk=result_id))
session.save()
return target_data, algorithm
def create_and_save_recommendation(recommended_knobs, result, status, **kwargs):
dbms_id = result.dbms.pk
formatted_knobs = db.parser.format_dbms_knobs(dbms_id, recommended_knobs)
config = db.parser.create_knob_configuration(dbms_id, formatted_knobs)
retval = dict(**kwargs)
retval.update(
status=status,
result_id=result.pk,
recommendation=config,
)
result.next_configuration = JSONUtil.dumps(retval)
result.save()
return retval
def check_early_return(target_data, algorithm):
result_id = target_data['newest_result_id']
newest_result = Result.objects.get(pk=result_id)
if target_data.get('status', 'good') != 'good': # No status or status is not 'good'
if target_data['status'] == 'random':
info = 'The config is generated by Random'
elif target_data['status'] == 'lhs':
info = 'The config is generated by LHS'
elif target_data['status'] == 'range_test':
info = 'Searching for valid knob ranges'
else:
info = 'Unknown'
target_data_res = create_and_save_recommendation(
recommended_knobs=target_data['config_recommend'], result=newest_result,
status=target_data['status'], info=info, pipeline_run=None)
LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n',
AlgorithmType.name(algorithm), target_data)
return True, target_data_res
return False, None
@shared_task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg')
def configuration_recommendation_ddpg(recommendation_ddpg_input): # pylint: disable=invalid-name
start_ts = time.time()
LOG.info('configuration_recommendation called (DDPG)')
target_data, algorithm = recommendation_ddpg_input
early_return, target_data_res = check_early_return(target_data, algorithm)
if early_return:
return target_data_res
result_id = target_data['newest_result_id']
result_list = Result.objects.filter(pk=result_id)
result = result_list.first()
session = result.session
params = JSONUtil.loads(session.hyperparameters)
agg_data = DataUtil.aggregate_data(result_list)
metric_data, _ = clean_metric_data(agg_data['y_matrix'], agg_data['y_columnlabels'], session)
metric_data = metric_data.flatten()
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session)
knob_labels = np.array(cleaned_knob_data[1]).flatten()
knob_num = len(knob_labels)
metric_num = len(metric_data)
ddpg = DDPG(n_actions=knob_num, n_states=metric_num,
a_hidden_sizes=params['DDPG_ACTOR_HIDDEN_SIZES'],
c_hidden_sizes=params['DDPG_CRITIC_HIDDEN_SIZES'],
use_default=params['DDPG_USE_DEFAULT'])
if session.ddpg_actor_model is not None and session.ddpg_critic_model is not None:
ddpg.set_model(session.ddpg_actor_model, session.ddpg_critic_model)
if session.ddpg_reply_memory is not None:
ddpg.replay_memory.set(session.ddpg_reply_memory)
knob_data = ddpg.choose_action(normalized_metric_data)
knob_bounds = np.vstack(DataUtil.get_knob_bounds(knob_labels, session))
knob_data = MinMaxScaler().fit(knob_bounds).inverse_transform(knob_data.reshape(1, -1))[0]
conf_map = {k: knob_data[i] for i, k in enumerate(knob_labels)}
target_data_res = create_and_save_recommendation(recommended_knobs=conf_map, result=result,
status='good', info='INFO: ddpg')
save_execution_time(start_ts, "configuration_recommendation_ddpg", result)
return target_data_res
def combine_workload(target_data):
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run'])
session = newest_result.session
params = JSONUtil.loads(session.hyperparameters)
pipeline_data_knob = None
pipeline_data_metric = None
# Load mapped workload data
if target_data['mapped_workload'] is not None:
mapped_workload_id = target_data['mapped_workload'][0]
mapped_workload = Workload.objects.get(pk=mapped_workload_id)
workload_knob_data = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.KNOB_DATA)
workload_knob_data = JSONUtil.loads(workload_knob_data.data)
workload_metric_data = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.METRIC_DATA)
workload_metric_data = JSONUtil.loads(workload_metric_data.data)
cleaned_workload_knob_data = clean_knob_data(workload_knob_data["data"],
workload_knob_data["columnlabels"],
newest_result.session)
X_workload = np.array(cleaned_workload_knob_data[0])
X_columnlabels = np.array(cleaned_workload_knob_data[1])
y_workload = np.array(workload_metric_data['data'])
y_columnlabels = np.array(workload_metric_data['columnlabels'])
rowlabels_workload = np.array(workload_metric_data['rowlabels'])
else:
# combine the target_data with itself is actually adding nothing to the target_data
X_workload = np.array(target_data['X_matrix'])
X_columnlabels = np.array(target_data['X_columnlabels'])
y_workload = np.array(target_data['y_matrix'])
y_columnlabels = np.array(target_data['y_columnlabels'])
rowlabels_workload = np.array(target_data['rowlabels'])
# Target workload data
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
X_target = target_data['X_matrix']
y_target = target_data['y_matrix']
rowlabels_target = np.array(target_data['rowlabels'])
if not np.array_equal(X_columnlabels, target_data['X_columnlabels']):
raise Exception(('The workload and target data should have '
'identical X columnlabels (sorted knob names)'),
X_columnlabels, target_data['X_columnlabels'])
if not np.array_equal(y_columnlabels, target_data['y_columnlabels']):
raise Exception(('The workload and target data should have '
'identical y columnlabels (sorted metric names)'),
y_columnlabels, target_data['y_columnlabels'])
if target_data['mapped_workload'] is not None:
# Filter Xs by top 'IMPORTANT_KNOB_NUMBER' ranked knobs
ranked_knobs = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.RANKED_KNOBS)
pipeline_data_knob = ranked_knobs
pipeline_data_metric = PipelineData.objects.get(
pipeline_run=latest_pipeline_run,
workload=mapped_workload,
task_type=PipelineTaskType.PRUNED_METRICS)
ranked_knobs = JSONUtil.loads(ranked_knobs.data)[:params['IMPORTANT_KNOB_NUMBER']]
ranked_knob_idxs = [i for i, cl in enumerate(X_columnlabels) if cl in ranked_knobs]
X_workload = X_workload[:, ranked_knob_idxs]
X_target = X_target[:, ranked_knob_idxs]
X_columnlabels = X_columnlabels[ranked_knob_idxs]
# Filter ys by current target objective metric
target_objective = newest_result.session.target_objective
target_obj_idx = [i for i, cl in enumerate(y_columnlabels) if cl == target_objective]
if len(target_obj_idx) == 0:
raise Exception(('Could not find target objective in metrics '
'(target_obj={})').format(target_objective))
elif len(target_obj_idx) > 1:
raise Exception(('Found {} instances of target objective in '
'metrics (target_obj={})').format(len(target_obj_idx),
target_objective))
y_workload = y_workload[:, target_obj_idx]
y_target = y_target[:, target_obj_idx]
y_columnlabels = y_columnlabels[target_obj_idx]
# Combine duplicate rows in the target/workload data (separately)
X_workload, y_workload, rowlabels_workload = DataUtil.combine_duplicate_rows(
X_workload, y_workload, rowlabels_workload)
X_target, y_target, rowlabels_target = DataUtil.combine_duplicate_rows(
X_target, y_target, rowlabels_target)
# Delete any rows that appear in both the workload data and the target
# data from the workload data
dups_filter = np.ones(X_workload.shape[0], dtype=bool)
target_row_tups = [tuple(row) for row in X_target]
for i, row in enumerate(X_workload):
if tuple(row) in target_row_tups:
dups_filter[i] = False
X_workload = X_workload[dups_filter, :]
y_workload = y_workload[dups_filter, :]
rowlabels_workload = rowlabels_workload[dups_filter]
# Combine target & workload Xs for preprocessing
X_matrix = np.vstack([X_target, X_workload])
# Dummy encode categorial variables
if ENABLE_DUMMY_ENCODER:
categorical_info = DataUtil.dummy_encoder_helper(X_columnlabels,
newest_result.dbms)
dummy_encoder = DummyEncoder(categorical_info['n_values'],
categorical_info['categorical_features'],
categorical_info['cat_columnlabels'],
categorical_info['noncat_columnlabels'])
X_matrix = dummy_encoder.fit_transform(X_matrix)
binary_encoder = categorical_info['binary_vars']
# below two variables are needed for correctly determing max/min on dummies
binary_index_set = set(categorical_info['binary_vars'])
total_dummies = dummy_encoder.total_dummies()
else:
dummy_encoder = None
binary_encoder = None
binary_index_set = []
total_dummies = 0
# Scale to N(0, 1)
X_scaler = StandardScaler()
X_scaled = X_scaler.fit_transform(X_matrix)
if y_target.shape[0] < 5: # FIXME
# FIXME (dva): if there are fewer than 5 target results so far
# then scale the y values (metrics) using the workload's
# y_scaler. I'm not sure if 5 is the right cutoff.
y_target_scaler = None
y_workload_scaler = StandardScaler()
y_matrix = np.vstack([y_target, y_workload])
y_scaled = y_workload_scaler.fit_transform(y_matrix)
else:
# FIXME (dva): otherwise try to compute a separate y_scaler for
# the target and scale them separately.
try:
y_target_scaler = StandardScaler()
y_workload_scaler = StandardScaler()
y_target_scaled = y_target_scaler.fit_transform(y_target)
y_workload_scaled = y_workload_scaler.fit_transform(y_workload)
y_scaled = np.vstack([y_target_scaled, y_workload_scaled])
except ValueError:
y_target_scaler = None
y_workload_scaler = StandardScaler()
y_scaled = y_workload_scaler.fit_transform(y_target)
metric_meta = db.target_objectives.get_metric_metadata(
newest_result.session.dbms.pk, newest_result.session.target_objective)
lessisbetter = metric_meta[target_objective].improvement == db.target_objectives.LESS_IS_BETTER
# Maximize the throughput, moreisbetter
# Use gradient descent to minimize -throughput
if not lessisbetter:
y_scaled = -y_scaled
# Set up constraint helper
constraint_helper = ParamConstraintHelper(scaler=X_scaler,
encoder=dummy_encoder,
binary_vars=binary_encoder,
init_flip_prob=params['INIT_FLIP_PROB'],
flip_prob_decay=params['FLIP_PROB_DECAY'])
# FIXME (dva): check if these are good values for the ridge
# ridge = np.empty(X_scaled.shape[0])
# ridge[:X_target.shape[0]] = 0.01
# ridge[X_target.shape[0]:] = 0.1
X_min = np.empty(X_scaled.shape[1])
X_max = np.empty(X_scaled.shape[1])
X_scaler_matrix = np.zeros([1, X_scaled.shape[1]])
session_knobs = SessionKnob.objects.get_knobs_for_session(newest_result.session)
# Set min/max for knob values
for i in range(X_scaled.shape[1]):
if i < total_dummies or i in binary_index_set:
col_min = 0
col_max = 1
else:
col_min = X_scaled[:, i].min()
col_max = X_scaled[:, i].max()
for knob in session_knobs:
if X_columnlabels[i] == knob["name"]:
X_scaler_matrix[0][i] = knob["minval"]
col_min = X_scaler.transform(X_scaler_matrix)[0][i]
X_scaler_matrix[0][i] = knob["maxval"]
col_max = X_scaler.transform(X_scaler_matrix)[0][i]
X_min[i] = col_min
X_max[i] = col_max
return X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\
dummy_encoder, constraint_helper, pipeline_data_knob, pipeline_data_metric
@shared_task(base=ConfigurationRecommendation, name='configuration_recommendation')
def configuration_recommendation(recommendation_input):
start_ts = time.time()
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called')
early_return, target_data_res = check_early_return(target_data, algorithm)
if early_return:
return target_data_res
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
session = newest_result.session
params = JSONUtil.loads(session.hyperparameters)
X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\
dummy_encoder, constraint_helper, pipeline_knobs,\
pipeline_metrics = combine_workload(target_data)
# FIXME: we should generate more samples and use a smarter sampling technique
num_samples = params['NUM_SAMPLES']
X_samples = np.empty((num_samples, X_scaled.shape[1]))
for i in range(X_scaled.shape[1]):
X_samples[:, i] = np.random.rand(num_samples) * (X_max[i] - X_min[i]) + X_min[i]
q = queue.PriorityQueue()
for x in range(0, y_scaled.shape[0]):
q.put((y_scaled[x][0], x))
i = 0
while i < params['TOP_NUM_CONFIG']:
try:
item = q.get_nowait()
# Tensorflow get broken if we use the training data points as
# starting points for GPRGD. We add a small bias for the
# starting points. GPR_EPS default value is 0.001
# if the starting point is X_max, we minus a small bias to
# make sure it is within the range.
dist = sum(np.square(X_max - X_scaled[item[1]]))
if dist < 0.001:
X_samples = np.vstack((X_samples, X_scaled[item[1]] - abs(params['GPR_EPS'])))
else:
X_samples = np.vstack((X_samples, X_scaled[item[1]] + abs(params['GPR_EPS'])))
i = i + 1
except queue.Empty:
break
res = None
if algorithm == AlgorithmType.DNN:
# neural network model
model_nn = NeuralNet(n_input=X_samples.shape[1],
batch_size=X_samples.shape[0],
explore_iters=params['DNN_EXPLORE_ITER'],
noise_scale_begin=params['DNN_NOISE_SCALE_BEGIN'],
noise_scale_end=params['DNN_NOISE_SCALE_END'],
debug=params['DNN_DEBUG'],
debug_interval=params['DNN_DEBUG_INTERVAL'])
if session.dnn_model is not None:
model_nn.set_weights_bin(session.dnn_model)
model_nn.fit(X_scaled, y_scaled, fit_epochs=params['DNN_TRAIN_ITER'])
res = model_nn.recommend(X_samples, X_min, X_max,
explore=params['DNN_EXPLORE'],
recommend_epochs=params['DNN_GD_ITER'])
session.dnn_model = model_nn.get_weights_bin()
session.save()
elif algorithm == AlgorithmType.GPR:
# default gpr model
if params['GPR_USE_GPFLOW']:
model_kwargs = {}
model_kwargs['model_learning_rate'] = params['GPR_HP_LEARNING_RATE']
model_kwargs['model_maxiter'] = params['GPR_HP_MAX_ITER']
opt_kwargs = {}
opt_kwargs['learning_rate'] = params['GPR_LEARNING_RATE']
opt_kwargs['maxiter'] = params['GPR_MAX_ITER']
opt_kwargs['bounds'] = [X_min, X_max]
opt_kwargs['debug'] = params['GPR_DEBUG']
opt_kwargs['ucb_beta'] = ucb.get_ucb_beta(params['GPR_UCB_BETA'],
scale=params['GPR_UCB_SCALE'],
t=i + 1., ndim=X_scaled.shape[1])
tf.reset_default_graph()
graph = tf.get_default_graph()
gpflow.reset_default_session(graph=graph)
m = gpr_models.create_model(params['GPR_MODEL_NAME'], X=X_scaled, y=y_scaled,
**model_kwargs)
res = tf_optimize(m.model, X_samples, **opt_kwargs)
else:
model = GPRGD(length_scale=params['GPR_LENGTH_SCALE'],
magnitude=params['GPR_MAGNITUDE'],
max_train_size=params['GPR_MAX_TRAIN_SIZE'],
batch_size=params['GPR_BATCH_SIZE'],
num_threads=params['TF_NUM_THREADS'],
learning_rate=params['GPR_LEARNING_RATE'],
epsilon=params['GPR_EPSILON'],
max_iter=params['GPR_MAX_ITER'],
sigma_multiplier=params['GPR_SIGMA_MULTIPLIER'],
mu_multiplier=params['GPR_MU_MULTIPLIER'],
ridge=params['GPR_RIDGE'])
model.fit(X_scaled, y_scaled, X_min, X_max)
res = model.predict(X_samples, constraint_helper=constraint_helper)
best_config_idx = np.argmin(res.minl.ravel())
best_config = res.minl_conf[best_config_idx, :]
best_config = X_scaler.inverse_transform(best_config)
if ENABLE_DUMMY_ENCODER:
# Decode one-hot encoding into categorical knobs
best_config = dummy_encoder.inverse_transform(best_config)
# Although we have max/min limits in the GPRGD training session, it may
# lose some precisions. e.g. 0.99..99 >= 1.0 may be True on the scaled data,
# when we inversely transform the scaled data, the different becomes much larger
# and cannot be ignored. Here we check the range on the original data
# directly, and make sure the recommended config lies within the range
X_min_inv = X_scaler.inverse_transform(X_min)
X_max_inv = X_scaler.inverse_transform(X_max)
best_config = np.minimum(best_config, X_max_inv)
best_config = np.maximum(best_config, X_min_inv)
conf_map = {k: best_config[i] for i, k in enumerate(X_columnlabels)}
newest_result.pipeline_knobs = pipeline_knobs
newest_result.pipeline_metrics = pipeline_metrics
conf_map_res = create_and_save_recommendation(
recommended_knobs=conf_map, result=newest_result,
status='good', info='INFO: training data size is {}'.format(X_scaled.shape[0]),
pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Finished selecting the next config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(conf_map_res, pprint=True))
save_execution_time(start_ts, "configuration_recommendation", newest_result)
return conf_map_res
def load_data_helper(filtered_pipeline_data, workload, task_type):
pipeline_data = filtered_pipeline_data.get(workload=workload,
task_type=task_type)
LOG.debug("PIPELINE DATA: %s", str(pipeline_data.data))
return JSONUtil.loads(pipeline_data.data)
@shared_task(base=MapWorkloadTask, name='map_workload')
def map_workload(map_workload_input):
start_ts = time.time()
target_data, algorithm = map_workload_input
assert target_data is not None
if target_data['status'] != 'good':
LOG.debug('%s: Skipping workload mapping.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
return target_data, algorithm
# Get the latest version of pipeline data that's been computed so far.
latest_pipeline_run = PipelineRun.objects.get_latest()
assert latest_pipeline_run is not None
target_data['pipeline_run'] = latest_pipeline_run.pk
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
session = newest_result.session
params = JSONUtil.loads(session.hyperparameters)
target_workload = newest_result.workload
X_columnlabels = np.array(target_data['X_columnlabels'])
y_columnlabels = np.array(target_data['y_columnlabels'])
# Find all pipeline data belonging to the latest version with the same
# DBMS and hardware as the target
pipeline_data = PipelineData.objects.filter(
pipeline_run=latest_pipeline_run,
workload__dbms=target_workload.dbms,
workload__hardware=target_workload.hardware,
workload__project=target_workload.project)
# FIXME (dva): we should also compute the global (i.e., overall) ranked_knobs
# and pruned metrics but we just use those from the first workload for now
initialized = False
global_ranked_knobs = None
global_pruned_metrics = None
ranked_knob_idxs = None
pruned_metric_idxs = None
unique_workloads = pipeline_data.values_list('workload', flat=True).distinct()
workload_data = {}
# Compute workload mapping data for each unique workload
for unique_workload in unique_workloads:
workload_obj = Workload.objects.get(pk=unique_workload)
wkld_results = Result.objects.filter(workload=workload_obj)
if wkld_results.exists() is False:
# delete the workload
workload_obj.delete()
continue
# Load knob & metric data for this workload
knob_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.KNOB_DATA)
knob_data["data"], knob_data["columnlabels"] = clean_knob_data(knob_data["data"],
knob_data["columnlabels"],
newest_result.session)
metric_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.METRIC_DATA)
X_matrix = np.array(knob_data["data"])
y_matrix = np.array(metric_data["data"])
rowlabels = np.array(knob_data["rowlabels"])
assert np.array_equal(rowlabels, metric_data["rowlabels"])
if not initialized:
# For now set ranked knobs & pruned metrics to be those computed
# for the first workload
global_ranked_knobs = load_data_helper(
pipeline_data, unique_workload,
PipelineTaskType.RANKED_KNOBS)[:params['IMPORTANT_KNOB_NUMBER']]
global_pruned_metrics = load_data_helper(
pipeline_data, unique_workload, PipelineTaskType.PRUNED_METRICS)
ranked_knob_idxs = [i for i in range(X_matrix.shape[1]) if X_columnlabels[
i] in global_ranked_knobs]
pruned_metric_idxs = [i for i in range(y_matrix.shape[1]) if y_columnlabels[
i] in global_pruned_metrics]
# Filter X & y columnlabels by top ranked_knobs & pruned_metrics
X_columnlabels = X_columnlabels[ranked_knob_idxs]
y_columnlabels = y_columnlabels[pruned_metric_idxs]
initialized = True
# Filter X & y matrices by top ranked_knobs & pruned_metrics
X_matrix = X_matrix[:, ranked_knob_idxs]
y_matrix = y_matrix[:, pruned_metric_idxs]
# Combine duplicate rows (rows with same knob settings)
X_matrix, y_matrix, rowlabels = DataUtil.combine_duplicate_rows(
X_matrix, y_matrix, rowlabels)
workload_data[unique_workload] = {
'X_matrix': X_matrix,
'y_matrix': y_matrix,
'rowlabels': rowlabels,
}
if len(workload_data) == 0:
# The background task that aggregates the data has not finished running yet
target_data.update(mapped_workload=None, scores=None)
LOG.debug('%s: Skipping workload mapping because there is no parsed workload.\n',
AlgorithmType.name(algorithm))
return target_data, algorithm
# Stack all X & y matrices for preprocessing
Xs = np.vstack([entry['X_matrix'] for entry in list(workload_data.values())])
ys = np.vstack([entry['y_matrix'] for entry in list(workload_data.values())])
# Scale the X & y values, then compute the deciles for each column in y
X_scaler = StandardScaler(copy=False)
X_scaler.fit(Xs)
y_scaler = StandardScaler(copy=False)
y_scaler.fit_transform(ys)
y_binner = Bin(bin_start=1, axis=0)
y_binner.fit(ys)
del Xs
del ys
# Filter the target's X & y data by the ranked knobs & pruned metrics.
X_target = target_data['X_matrix'][:, ranked_knob_idxs]
y_target = target_data['y_matrix'][:, pruned_metric_idxs]
# Now standardize the target's data and bin it by the deciles we just
# calculated
X_target = X_scaler.transform(X_target)
y_target = y_scaler.transform(y_target)
y_target = y_binner.transform(y_target)
scores = {}
for workload_id, workload_entry in list(workload_data.items()):
predictions = np.empty_like(y_target)
X_workload = workload_entry['X_matrix']
X_scaled = X_scaler.transform(X_workload)
y_workload = workload_entry['y_matrix']
y_scaled = y_scaler.transform(y_workload)
for j, y_col in enumerate(y_scaled.T):
# Using this workload's data, train a Gaussian process model
# and then predict the performance of each metric for each of
# the knob configurations attempted so far by the target.
y_col = y_col.reshape(-1, 1)
if params['GPR_USE_GPFLOW']:
model_kwargs = {'lengthscales': params['GPR_LENGTH_SCALE'],
'variance': params['GPR_MAGNITUDE'],
'noise_variance': params['GPR_RIDGE']}
tf.reset_default_graph()
graph = tf.get_default_graph()
gpflow.reset_default_session(graph=graph)
m = gpr_models.create_model(params['GPR_MODEL_NAME'], X=X_scaled, y=y_col,
**model_kwargs)
gpr_result = gpflow_predict(m.model, X_target)
else:
model = GPRNP(length_scale=params['GPR_LENGTH_SCALE'],
magnitude=params['GPR_MAGNITUDE'],
max_train_size=params['GPR_MAX_TRAIN_SIZE'],
batch_size=params['GPR_BATCH_SIZE'])
model.fit(X_scaled, y_col, ridge=params['GPR_RIDGE'])
gpr_result = model.predict(X_target)
predictions[:, j] = gpr_result.ypreds.ravel()
# Bin each of the predicted metric columns by deciles and then
# compute the score (i.e., distance) between the target workload
# and each of the known workloads
predictions = y_binner.transform(predictions)
dists = np.sqrt(np.sum(np.square(
np.subtract(predictions, y_target)), axis=1))
scores[workload_id] = np.mean(dists)
# Find the best (minimum) score
best_score = np.inf
best_workload_id = None
best_workload_name = None
scores_info = {}
for workload_id, similarity_score in list(scores.items()):
workload_name = Workload.objects.get(pk=workload_id).name
if similarity_score < best_score:
best_score = similarity_score
best_workload_id = workload_id
best_workload_name = workload_name
scores_info[workload_id] = (workload_name, similarity_score)
target_data.update(mapped_workload=(best_workload_id, best_workload_name, best_score),
scores=scores_info)
LOG.debug('%s: Finished mapping the workload.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
save_execution_time(start_ts, "map_workload", newest_result)
return target_data, algorithm