ottertune/server/website/tests/test_tasks.py

192 lines
8.4 KiB
Python
Raw Normal View History

2019-08-23 08:47:19 -07:00
#
# OtterTune - test_tasks.py
#
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
#
import copy
import numpy as np
from django.test import TestCase, override_settings
from django.db import transaction
from website.models import (Workload, PipelineRun, PipelineData,
Result, Session, DBMSCatalog, Hardware)
from website.tasks.periodic_tasks import (run_background_tasks,
aggregate_data,
run_workload_characterization,
run_knob_identification)
from website.types import PipelineTaskType, WorkloadStatusType
CELERY_TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
@override_settings(CELERY_ALWAYS_EAGER=True, TEST_RUNNER=CELERY_TEST_RUNNER)
class BackgroundTestCase(TestCase):
fixtures = ['test_website.json']
serialized_rollback = True
def testNoError(self):
result = run_background_tasks.delay()
self.assertTrue(result.successful())
def testProcessedWorkloadStatus(self):
before_workloads = Workload.objects.filter(status=WorkloadStatusType.MODIFIED)
run_background_tasks.delay()
for w in before_workloads:
self.assertEqual(w.status, WorkloadStatusType.PROCESSED)
def testNoModifiedWorkload(self):
# First Execution of Modified Workloads
run_background_tasks.delay()
first_pipeline_run = PipelineRun.objects.get_latest()
# Second Execution with no modified workloads
run_background_tasks.delay()
second_pipeline_run = PipelineRun.objects.get_latest()
# Check that the BG task has not run
self.assertEqual(first_pipeline_run.start_time, second_pipeline_run.start_time)
# Test that an empty workload is ignored by the BG task
def testEmptyWorkload(self):
with transaction.atomic():
# Create empty workload
empty_workload = Workload.objects.create_workload(dbms=DBMSCatalog.objects.get(pk=1),
hardware=Hardware.objects.get(pk=1),
2019-12-15 12:13:26 -08:00
name="empty_workload",
project=Project.objects.get(pk=1))
2019-08-23 08:47:19 -07:00
result = run_background_tasks.delay()
# Check that BG task successfully finished
self.assertTrue(result.successful())
# Check that the empty workload is still in MODIFIED Status
self.assertEqual(empty_workload.status, 1)
pipeline_data = PipelineData.objects.filter(pipeline_run=PipelineRun.objects.get_latest())
# Check that the empty workload is not in the pipeline datas
self.assertNotIn(empty_workload.pk, pipeline_data.values_list('workload_id', flat=True))
# Test that a workload that contain only one knob configuration will be ignored by the BG task
def testUniqueKnobConfigurationWorkload(self):
# Get workload to copy data from
origin_workload = Workload.objects.get(pk=1)
origin_session = Session.objects.get(pk=1)
# Create empty workload
fix_workload = Workload.objects.create_workload(dbms=origin_workload.dbms,
hardware=origin_workload.hardware,
2019-12-15 12:13:26 -08:00
name="fixed_knob_workload",
project=origin_workload.project)
2019-08-23 08:47:19 -07:00
fix_knob_data = Result.objects.filter(workload=origin_workload,
session=origin_session)[0].knob_data
# Add 5 Result with the same Knob Configuration
for res in Result.objects.filter(workload=origin_workload, session=origin_session)[:4]:
Result.objects.create_result(res.session, res.dbms, fix_workload,
fix_knob_data, res.metric_data,
res.observation_start_time,
res.observation_end_time,
res.observation_time)
result = run_background_tasks.delay()
# Check that BG task successfully finished
self.assertTrue(result.successful())
# Check that the empty workload is still in MODIFIED Status
self.assertEqual(fix_workload.status, 1)
pipeline_data = PipelineData.objects.filter(pipeline_run=PipelineRun.objects.get_latest())
# Check that the empty workload is not in the pipeline datas
self.assertNotIn(fix_workload.pk, pipeline_data.values_list('workload_id', flat=True))
def testNoWorkloads(self):
# delete any existing workloads
workloads = Workload.objects.all()
workloads.delete()
# background task should not fail
result = run_background_tasks.delay()
self.assertTrue(result.successful())
def testNewPipelineRun(self):
# this test currently relies on the fixture data so that
# it actually tests anything
workloads = Workload.objects.all()
if len(workloads) > 0:
runs_before = len(PipelineRun.objects.all())
run_background_tasks.delay()
runs_after = len(PipelineRun.objects.all())
self.assertEqual(runs_before + 1, runs_after)
def checkNewTask(self, task_type):
workloads = Workload.objects.all()
pruned_before = [len(PipelineData.objects.filter(
workload=workload, task_type=task_type)) for workload in workloads]
run_background_tasks.delay()
pruned_after = [len(PipelineData.objects.filter(
workload=workload, task_type=task_type)) for workload in workloads]
for before, after in zip(pruned_before, pruned_after):
self.assertEqual(before + 1, after)
def testNewPrunedMetrics(self):
self.checkNewTask(PipelineTaskType.PRUNED_METRICS)
def testNewRankedKnobs(self):
self.checkNewTask(PipelineTaskType.RANKED_KNOBS)
class AggregateTestCase(TestCase):
fixtures = ['test_website.json']
def testValidWorkload(self):
workloads = Workload.objects.all()
valid_workload = workloads[0]
wkld_results = Result.objects.filter(workload=valid_workload)
dicts = aggregate_data(wkld_results)
keys = ['data', 'rowlabels', 'columnlabels']
for d in dicts:
for k in keys:
self.assertIn(k, d)
class PrunedMetricTestCase(TestCase):
fixtures = ['test_website.json']
def testValidPrunedMetrics(self):
workloads = Workload.objects.all()
wkld_results = Result.objects.filter(workload=workloads[0])
metric_data = aggregate_data(wkld_results)[1]
pruned_metrics = run_workload_characterization(metric_data)
for m in pruned_metrics:
self.assertIn(m, metric_data['columnlabels'])
class RankedKnobTestCase(TestCase):
fixtures = ['test_website.json']
def testValidImportantKnobs(self):
workloads = Workload.objects.all()
wkld_results = Result.objects.filter(workload=workloads[0])
knob_data, metric_data = aggregate_data(wkld_results)
# instead of doing actual metric pruning by factor analysis / clustering,
# just randomly select 5 nonconstant metrics
nonconst_metric_columnlabels = []
for col, cl in zip(metric_data['data'].T, metric_data['columnlabels']):
if np.any(col != col[0]):
nonconst_metric_columnlabels.append(cl)
num_metrics = min(5, len(nonconst_metric_columnlabels))
selected_columnlabels = np.random.choice(nonconst_metric_columnlabels,
num_metrics, replace=False)
pruned_metric_idxs = [i for i, metric_name in
enumerate(metric_data['columnlabels'])
if metric_name in selected_columnlabels]
pruned_metric_data = {
'data': metric_data['data'][:, pruned_metric_idxs],
'rowlabels': copy.deepcopy(metric_data['rowlabels']),
'columnlabels': [metric_data['columnlabels'][i] for i in pruned_metric_idxs]
}
# run knob_identification using knob_data and fake pruned metrics
ranked_knobs = run_knob_identification(knob_data, pruned_metric_data,
workloads[0].dbms)
for k in ranked_knobs:
self.assertIn(k, knob_data['columnlabels'])