isolate workloads of different projects

This commit is contained in:
yangdsh 2019-12-15 07:14:51 +00:00 committed by Dana Van Aken
parent fc4cf0e18b
commit 7cc0c40d92
4 changed files with 78 additions and 38 deletions

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.23 on 2019-12-15 06:16
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('website', '0004_add_lhs'),
]
operations = [
migrations.AddField(
model_name='workload',
name='project',
field=models.ForeignKey(default=1, on_delete=django.db.models.deletion.CASCADE, to='website.Project'),
preserve_default=False,
),
migrations.AlterUniqueTogether(
name='workload',
unique_together=set([('dbms', 'hardware', 'name', 'project')]),
),
]

View File

@ -317,14 +317,15 @@ class MetricData(DataModel):
class WorkloadManager(models.Manager): class WorkloadManager(models.Manager):
def create_workload(self, dbms, hardware, name): def create_workload(self, dbms, hardware, name, project):
# (dbms,hardware,name) should be unique for each workload # (dbms,hardware,name) should be unique for each workload
try: try:
return Workload.objects.get(dbms=dbms, hardware=hardware, name=name) return Workload.objects.get(dbms=dbms, hardware=hardware, name=name, project=project)
except Workload.DoesNotExist: except Workload.DoesNotExist:
return self.create(dbms=dbms, return self.create(dbms=dbms,
hardware=hardware, hardware=hardware,
name=name) name=name,
project=project)
class Workload(BaseModel): class Workload(BaseModel):
@ -336,6 +337,7 @@ class Workload(BaseModel):
dbms = models.ForeignKey(DBMSCatalog) dbms = models.ForeignKey(DBMSCatalog)
hardware = models.ForeignKey(Hardware) hardware = models.ForeignKey(Hardware)
name = models.CharField(max_length=128, verbose_name='workload name') name = models.CharField(max_length=128, verbose_name='workload name')
project = models.ForeignKey(Project)
status = models.IntegerField(choices=WorkloadStatusType.choices(), status = models.IntegerField(choices=WorkloadStatusType.choices(),
default=WorkloadStatusType.MODIFIED, default=WorkloadStatusType.MODIFIED,
editable=False) editable=False)
@ -353,7 +355,7 @@ class Workload(BaseModel):
super(Workload, self).delete(using, keep_parents) super(Workload, self).delete(using, keep_parents)
class Meta: # pylint: disable=no-init class Meta: # pylint: disable=no-init
unique_together = ("dbms", "hardware", "name") unique_together = ("dbms", "hardware", "name", "project")
# @property # @property
# def isdefault(self): # def isdefault(self):

View File

@ -213,8 +213,8 @@ def aggregate_target_results(result_id, algorithm):
agg_data['X_matrix'] = np.array(cleaned_agg_data[0]) agg_data['X_matrix'] = np.array(cleaned_agg_data[0])
agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1]) agg_data['X_columnlabels'] = np.array(cleaned_agg_data[1])
LOG.debug('%s: Finished aggregating target results.\n\ndata=%s\n', LOG.debug('%s: Finished aggregating target results.\n\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(agg_data, pprint=True)) AlgorithmType.name(algorithm))
return agg_data, algorithm return agg_data, algorithm
@ -418,22 +418,7 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n
return conf_map_res return conf_map_res
@task(base=ConfigurationRecommendation, name='configuration_recommendation') def combine_workload(target_data):
def configuration_recommendation(recommendation_input):
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called')
if target_data['bad'] is True:
target_data_res = dict(
status='bad',
result_id=target_data['newest_result_id'],
info='WARNING: no training data, the config is generated randomly',
recommendation=target_data['config_recommend'],
pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Skipping configuration recommendation.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
return target_data_res
# Load mapped workload data # Load mapped workload data
mapped_workload_id = target_data['mapped_workload'][0] mapped_workload_id = target_data['mapped_workload'][0]
@ -498,10 +483,6 @@ def configuration_recommendation(recommendation_input):
'metrics (target_obj={})').format(len(target_obj_idx), 'metrics (target_obj={})').format(len(target_obj_idx),
target_objective)) target_objective))
metric_meta = db.target_objectives.get_metric_metadata(
newest_result.session.dbms.pk, newest_result.session.target_objective)
lessisbetter = metric_meta[target_objective].improvement == db.target_objectives.LESS_IS_BETTER
y_workload = y_workload[:, target_obj_idx] y_workload = y_workload[:, target_obj_idx]
y_target = y_target[:, target_obj_idx] y_target = y_target[:, target_obj_idx]
y_columnlabels = y_columnlabels[target_obj_idx] y_columnlabels = y_columnlabels[target_obj_idx]
@ -570,6 +551,14 @@ def configuration_recommendation(recommendation_input):
y_workload_scaler = StandardScaler() y_workload_scaler = StandardScaler()
y_scaled = y_workload_scaler.fit_transform(y_target) y_scaled = y_workload_scaler.fit_transform(y_target)
metric_meta = db.target_objectives.get_metric_metadata(
newest_result.session.dbms.pk, newest_result.session.target_objective)
lessisbetter = metric_meta[target_objective].improvement == db.target_objectives.LESS_IS_BETTER
# Maximize the throughput, moreisbetter
# Use gradient descent to minimize -throughput
if not lessisbetter:
y_scaled = -y_scaled
# Set up constraint helper # Set up constraint helper
constraint_helper = ParamConstraintHelper(scaler=X_scaler, constraint_helper = ParamConstraintHelper(scaler=X_scaler,
encoder=dummy_encoder, encoder=dummy_encoder,
@ -582,10 +571,6 @@ def configuration_recommendation(recommendation_input):
# ridge[:X_target.shape[0]] = 0.01 # ridge[:X_target.shape[0]] = 0.01
# ridge[X_target.shape[0]:] = 0.1 # ridge[X_target.shape[0]:] = 0.1
# FIXME: we should generate more samples and use a smarter sampling
# technique
num_samples = NUM_SAMPLES
X_samples = np.empty((num_samples, X_scaled.shape[1]))
X_min = np.empty(X_scaled.shape[1]) X_min = np.empty(X_scaled.shape[1])
X_max = np.empty(X_scaled.shape[1]) X_max = np.empty(X_scaled.shape[1])
X_scaler_matrix = np.zeros([1, X_scaled.shape[1]]) X_scaler_matrix = np.zeros([1, X_scaled.shape[1]])
@ -608,12 +593,37 @@ def configuration_recommendation(recommendation_input):
col_max = X_scaler.transform(X_scaler_matrix)[0][i] col_max = X_scaler.transform(X_scaler_matrix)[0][i]
X_min[i] = col_min X_min[i] = col_min
X_max[i] = col_max X_max[i] = col_max
X_samples[:, i] = np.random.rand(num_samples) * (col_max - col_min) + col_min
# Maximize the throughput, moreisbetter return X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min
# Use gradient descent to minimize -throughput
if not lessisbetter:
y_scaled = -y_scaled @task(base=ConfigurationRecommendation, name='configuration_recommendation')
def configuration_recommendation(recommendation_input):
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called')
if target_data['bad'] is True:
target_data_res = dict(
status='bad',
result_id=target_data['newest_result_id'],
info='WARNING: no training data, the config is generated randomly',
recommendation=target_data['config_recommend'],
pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Skipping configuration recommendation.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
return target_data_res
latest_pipeline_run = PipelineRun.objects.get(pk=target_data['pipeline_run'])
newest_result = Result.objects.get(pk=target_data['newest_result_id'])
X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min = combine_workload(target_data)
# FIXME: we should generate more samples and use a smarter sampling
# technique
num_samples = NUM_SAMPLES
X_samples = np.empty((num_samples, X_scaled.shape[1]))
for i in range(X_scaled.shape[1]):
X_samples[:, i] = np.random.rand(num_samples) * (X_max[i] - X_min[i]) + X_min[i]
q = queue.PriorityQueue() q = queue.PriorityQueue()
for x in range(0, y_scaled.shape[0]): for x in range(0, y_scaled.shape[0]):
@ -755,7 +765,8 @@ def map_workload(map_workload_input):
pipeline_data = PipelineData.objects.filter( pipeline_data = PipelineData.objects.filter(
pipeline_run=latest_pipeline_run, pipeline_run=latest_pipeline_run,
workload__dbms=target_workload.dbms, workload__dbms=target_workload.dbms,
workload__hardware=target_workload.hardware) workload__hardware=target_workload.hardware,
workload__project=target_workload.project)
# FIXME (dva): we should also compute the global (i.e., overall) ranked_knobs # FIXME (dva): we should also compute the global (i.e., overall) ranked_knobs
# and pruned metrics but we just use those from the first workload for now # and pruned metrics but we just use those from the first workload for now

View File

@ -566,6 +566,7 @@ def handle_result_files(session, files):
knob_data = KnobData.objects.create_knob_data( knob_data = KnobData.objects.create_knob_data(
session, JSONUtil.dumps(knob_dict, pprint=True, sort=True), session, JSONUtil.dumps(knob_dict, pprint=True, sort=True),
JSONUtil.dumps(tunable_knob_dict, pprint=True, sort=True), dbms) JSONUtil.dumps(tunable_knob_dict, pprint=True, sort=True), dbms)
LOG.debug(knob_data.data)
# Load, process, and store the runtime metrics exposed by the DBMS # Load, process, and store the runtime metrics exposed by the DBMS
initial_metric_dict, initial_metric_diffs = parser.parse_dbms_metrics( initial_metric_dict, initial_metric_diffs = parser.parse_dbms_metrics(
@ -583,7 +584,7 @@ def handle_result_files(session, files):
# Create a new workload if this one does not already exist # Create a new workload if this one does not already exist
workload = Workload.objects.create_workload( workload = Workload.objects.create_workload(
dbms, session.hardware, workload_name) dbms, session.hardware, workload_name, session.project)
# Save this result # Save this result
result = Result.objects.create_result( result = Result.objects.create_result(
@ -591,7 +592,7 @@ def handle_result_files(session, files):
start_time, end_time, observation_time) start_time, end_time, observation_time)
result.save() result.save()
# Workload is now modified so backgroundTasks can make calculationw # Workload is now modified so backgroundTasks can make calculation
workload.status = WorkloadStatusType.MODIFIED workload.status = WorkloadStatusType.MODIFIED
workload.save() workload.save()