Fixed bugs and improved logging in config recommendation tasks/views

This commit is contained in:
dvanaken 2020-01-08 03:37:10 -05:00 committed by Dana Van Aken
parent 3786714093
commit 9393fb7aca
6 changed files with 145 additions and 85 deletions

View File

@ -338,7 +338,6 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None):
url = dconf.WEBSITE_URL + '/query_and_get/' + upload_code
elapsed = 0
response_dict = None
response = ''
rout = ''
while elapsed <= max_time_sec:
@ -347,8 +346,9 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None):
assert response != 'null'
rout = json.dumps(response, indent=4) if isinstance(response, dict) else response
LOG.debug('%s [status code: %d, content_type: %s, elapsed: %ds]', rout,
rsp.status_code, rsp.headers.get('Content-Type', ''), elapsed)
LOG.debug('%s\n\n[status code: %d, type(response): %s, elapsed: %ds, %s]', rout,
rsp.status_code, type(response), elapsed,
', '.join(['{}: {}'.format(k, v) for k, v in rsp.headers.items()]))
if rsp.status_code == 200:
# Success
@ -368,12 +368,12 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None):
elif rsp.status_code == 500:
# Failure
if len(response) > 5000:
with open('error.html', 'w') as f:
f.write(response)
msg = "Saved HTML error to 'error.html'."
else:
msg = rout
if isinstance(response, str):
savepath = os.path.join(dconf.LOG_DIR, 'error.html')
with open(savepath, 'w') as f:
f.write(response)
msg = "Saved HTML error to '{}'.".format(os.path.relpath(savepath))
raise Exception(
"Failed to download the next config.\nStatus code: {}\nMessage: {}\n".format(
rsp.status_code, msg))
@ -385,12 +385,12 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None):
if not response_dict:
assert elapsed > max_time_sec, \
'response={} but elapsed={}s <= max_time={}s'.format(
response, elapsed, max_time_sec)
rout, elapsed, max_time_sec)
raise Exception(
'Failed to download the next config in {}s: {} (elapsed: {}s)'.format(
max_time_sec, rout, elapsed))
LOG.info('Downloaded the next config in %ds: %s', elapsed, rout)
LOG.info('Downloaded the next config in %ds', elapsed)
return response_dict
@ -792,3 +792,5 @@ def integration_tests():
upload_code='ottertuneTestTuningGPR')
response = get_result(upload_code='ottertuneTestTuningGPR')
assert response['status'] == 'good'
LOG.info("\n\nIntegration Tests: PASSED!!\n")

View File

@ -83,58 +83,56 @@ class TaskUtilTest(TestCase):
# FIXME: Actually setup celery tasks instead of a dummy class?
test_tasks = []
(status, num_complete) = TaskUtil.get_task_status(test_tasks)
self.assertTrue(status is None and num_complete == 0)
(status, num_complete) = TaskUtil.get_task_status(test_tasks, 1)
self.assertTrue(status is 'UNAVAILABLE' and num_complete == 0)
(status, num_complete) = TaskUtil.get_task_status(test_tasks, 0)
self.assertTrue(status is 'UNAVAILABLE' and num_complete == 0)
test_tasks2 = [VarType() for i in range(5)]
for task in test_tasks2:
task.status = "SUCCESS"
(status, num_complete) = TaskUtil.get_task_status(test_tasks2)
(status, num_complete) = TaskUtil.get_task_status(test_tasks2, 5)
self.assertTrue(status == "SUCCESS" and num_complete == 5)
test_tasks3 = test_tasks2
test_tasks3[3].status = "FAILURE"
(status, num_complete) = TaskUtil.get_task_status(test_tasks3)
(status, num_complete) = TaskUtil.get_task_status(test_tasks3, 5)
self.assertTrue(status == "FAILURE" and num_complete == 3)
test_tasks4 = test_tasks3
test_tasks4[2].status = "REVOKED"
(status, num_complete) = TaskUtil.get_task_status(test_tasks4)
(status, num_complete) = TaskUtil.get_task_status(test_tasks4, 5)
self.assertTrue(status == "REVOKED" and num_complete == 2)
test_tasks5 = test_tasks4
test_tasks5[1].status = "RETRY"
(status, num_complete) = TaskUtil.get_task_status(test_tasks5)
(status, num_complete) = TaskUtil.get_task_status(test_tasks5, 5)
self.assertTrue(status == "RETRY" and num_complete == 1)
test_tasks6 = [VarType() for i in range(10)]
for i, task in enumerate(test_tasks6):
task.status = "PENDING" if i % 2 == 0 else "SUCCESS"
(status, num_complete) = TaskUtil.get_task_status(test_tasks6)
(status, num_complete) = TaskUtil.get_task_status(test_tasks6, 10)
self.assertTrue(status == "PENDING" and num_complete == 5)
test_tasks7 = test_tasks6
test_tasks7[9].status = "STARTED"
(status, num_complete) = TaskUtil.get_task_status(test_tasks7)
(status, num_complete) = TaskUtil.get_task_status(test_tasks7, 10)
self.assertTrue(status == "STARTED" and num_complete == 4)
test_tasks8 = test_tasks7
test_tasks8[9].status = "RECEIVED"
(status, num_complete) = TaskUtil.get_task_status(test_tasks8)
(status, num_complete) = TaskUtil.get_task_status(test_tasks8, 10)
self.assertTrue(status == "RECEIVED" and num_complete == 4)
with self.assertRaises(Exception):
test_tasks9 = [VarType() for i in range(1)]
test_tasks9[0].status = "attemped"
TaskUtil.get_task_status(test_tasks9)
class DataUtilTest(TestCase):

View File

@ -28,7 +28,7 @@ from analysis.constraints import ParamConstraintHelper
from website.models import (PipelineData, PipelineRun, Result, Workload, KnobCatalog, SessionKnob,
MetricCatalog)
from website import db
from website.types import PipelineTaskType, AlgorithmType
from website.types import PipelineTaskType, AlgorithmType, VarType
from website.utils import DataUtil, JSONUtil
from website.settings import IMPORTANT_KNOB_NUMBER, NUM_SAMPLES, TOP_NUM_CONFIG # pylint: disable=no-name-in-module
from website.settings import (USE_GPFLOW, DEFAULT_LENGTH_SCALE, DEFAULT_MAGNITUDE,
@ -47,7 +47,6 @@ from website.settings import (USE_GPFLOW, DEFAULT_LENGTH_SCALE, DEFAULT_MAGNITUD
GPR_MODEL_NAME, ENABLE_DUMMY_ENCODER, DNN_GD_ITER)
from website.settings import INIT_FLIP_PROB, FLIP_PROB_DECAY
from website.types import VarType
LOG = get_task_logger(__name__)
@ -116,32 +115,59 @@ class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-metho
def clean_knob_data(knob_matrix, knob_labels, session):
# Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels
knob_cat = SessionKnob.objects.get_knobs_for_session(session)
knob_cat = [knob["name"] for knob in knob_cat if knob["tunable"]]
matrix = np.array(knob_matrix)
missing_columns = set(knob_cat) - set(knob_labels)
unused_columns = set(knob_labels) - set(knob_cat)
LOG.debug("clean_knob_data added %d knobs and removed %d knobs.", len(missing_columns),
len(unused_columns))
# If columns are missing from the matrix
if missing_columns:
for knob in missing_columns:
knob_object = KnobCatalog.objects.get(dbms=session.dbms, name=knob, tunable=True)
index = knob_cat.index(knob)
knob_matrix = np.array(knob_matrix)
session_knobs = SessionKnob.objects.get_knobs_for_session(session)
knob_cat = [k['name'] for k in session_knobs]
if knob_cat == knob_labels:
# Nothing to do!
return knob_matrix, knob_labels
LOG.info("session_knobs: %s, knob_labels: %s, missing: %s, extra: %s", len(knob_cat),
len(knob_labels), len(set(knob_cat) - set(knob_labels)),
len(set(knob_labels) - set(knob_cat)))
nrows = knob_matrix.shape[0] # pylint: disable=unsubscriptable-object
new_labels = []
new_columns = []
for knob in session_knobs:
knob_name = knob['name']
if knob_name not in knob_labels:
# Add missing column initialized to knob's default value
default_val = knob['default']
try:
default_val = float(knob_object.default)
if knob['vartype'] == VarType.ENUM:
default_val = knob['enumvals'].split(',').index(default_val)
elif knob['vartype'] == VarType.BOOL:
default_val = str(default_val).lower() in ("on", "true", "yes", "0")
else:
default_val = float(default_val)
except ValueError:
default_val = 0
matrix = np.insert(matrix, index, default_val, axis=1)
knob_labels.insert(index, knob)
# If they are useless columns in the matrix
if unused_columns:
indexes = [i for i, n in enumerate(knob_labels) if n in unused_columns]
# Delete unused columns
matrix = np.delete(matrix, indexes, 1)
for i in sorted(indexes, reverse=True):
del knob_labels[i]
return matrix, knob_labels
LOG.exception("Error parsing knob default: %s=%s", knob_name, default_val)
new_col = np.ones((nrows, 1), dtype=float) * default_val
new_lab = knob_name
else:
index = knob_labels.index(knob_name)
new_col = knob_matrix[:, index]
new_lab = knob_labels[index]
new_labels.append(new_lab)
new_columns.append(new_col)
new_matrix = np.hstack(new_columns).reshape(nrows, -1)
LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape,
len(new_labels), new_labels)
assert new_labels == knob_cat, \
"Expected knobs: {}\nActual knobs: {}\n".format(
knob_cat, new_labels)
assert new_matrix.shape == (nrows, len(knob_cat)), \
"Expected shape: {}, Actual shape: {}".format(
(nrows, len(knob_cat)), new_matrix.shape)
return new_matrix, new_labels
def clean_metric_data(metric_matrix, metric_labels, session):
@ -438,8 +464,7 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n
metric_data = metric_data.flatten()
metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1))
normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0]
cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'],
session)
cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session)
knob_labels = np.array(cleaned_knob_data[1]).flatten()
knob_num = len(knob_labels)
metric_num = len(metric_data)
@ -834,7 +859,6 @@ def map_workload(map_workload_input):
knob_data["data"], knob_data["columnlabels"] = clean_knob_data(knob_data["data"],
knob_data["columnlabels"],
newest_result.session)
metric_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.METRIC_DATA)
X_matrix = np.array(knob_data["data"])
y_matrix = np.array(metric_data["data"])

View File

@ -12,9 +12,13 @@
<td style="width: 50%"><div class="text-right">{{ labels.id }}</div></td>
<td style="width: 50%">{{ result.pk }}</td>
</tr>
<tr>
<td><div class="text-right">{{ labels.creation_time }}</div></td>
<td>{{ result.creation_time }}</td>
</tr>
<tr>
<td><div class="text-right">{{ labels.session }}</div></td>
<td>{{ result.session.name }}</td>
<td><a href="{% url 'session' project_id session_id %}">{{ result.session.name }}</a></td>
</tr>
<tr>
<td><div class="text-right">{{ labels.workload }}</div></td>
@ -28,10 +32,6 @@
<td><div class="text-top text-right">{{ labels.metric_data }}</div></td>
<td><a href="{% url 'metric_data' project_id session_id result.metric_data.pk %}">{{ result.metric_data.name }}</a></td>
</tr>
<tr>
<td><div class="text-right">{{ labels.creation_time }}</div></td>
<td>{{ result.creation_time }}</td>
</tr>
{% for key, value in default_metrics.items %}
<tr>
<td><div class="text-right">{{ metric_meta|get_item:key|get_attr:"pprint" }}</div></td>
@ -47,8 +47,16 @@
{% endif %}
{% if next_conf_available %}
<tr>
<td><div class="text-right">{{ labels.next_conf_available }}</div></td>
<td><a href="/get_result_data_file/?id={{ result.pk }}&type=next_conf">Download</a></td>
<td>
<div class="text-right">
{{ labels.next_conf }}<br>
<button type="submit" style="margin-top:15px;" class="btn btn-default btn-lg alert alert-info"
onclick="location.href='/get_result_data_file/?id={{ result.pk }}&type=next_conf'">
<span class="glyphicon glyphicon-download-alt" aria-hidden="true"></span>
</button>
</div>
</td>
<td><pre style="display:inline-block">{{ next_conf }}</pre></td>
</tr>
{% endif %}
</tbody>

View File

@ -82,11 +82,8 @@ class TaskUtil(object):
return TaskMeta.objects.filter(task_id__in=task_ids).order_by(preserved)
@staticmethod
def get_task_status(tasks):
if not tasks:
return None, 0
overall_status = 'SUCCESS'
def get_task_status(tasks, num_tasks):
overall_status = 'UNAVAILABLE'
num_completed = 0
for task in tasks:
status = task.status
@ -101,6 +98,9 @@ class TaskUtil(object):
task.id, status, task.task_id)
overall_status = status
if num_tasks > 0 and num_tasks == num_completed:
overall_status = 'SUCCESS'
return overall_status, num_completed
@ -484,3 +484,13 @@ def delete_user(username):
deleted = False
return delete_info, deleted
def model_to_dict2(m, exclude=None):
exclude = exclude or []
d = {}
for f in m._meta.fields: # pylint: disable=protected-access
fname = f.name
if fname not in exclude:
d[fname] = getattr(m, fname, None)
return d

View File

@ -398,18 +398,33 @@ def result_view(request, project_id, session_id, result_id):
# default_metrics = {mname: metric_data[mname] * metric_meta[mname].scale
# for mname in default_metrics}
status = None
if target.task_ids is not None:
tasks = TaskUtil.get_tasks(target.task_ids)
status, _ = TaskUtil.get_task_status(tasks)
if status is None:
status = 'UNAVAILABLE'
task_ids = [t for t in (target.task_ids or '').split(',') if t.strip() != '']
tasks = TaskUtil.get_tasks(task_ids)
status, _ = TaskUtil.get_task_status(tasks, len(task_ids))
next_conf_available = True if status == 'SUCCESS' else False
next_conf = ''
cfg = target.next_configuration
LOG.debug("status: %s, next_conf_available: %s, next_conf: %s, type: %s",
status, next_conf_available, cfg, type(cfg))
if next_conf_available:
try:
cfg = JSONUtil.loads(cfg)['recommendation']
kwidth = max(len(k) for k in cfg.keys())
vwidth = max(len(str(v)) for v in cfg.values())
next_conf = ''
for k, v in cfg.items():
next_conf += '{: <{kwidth}} = {: >{vwidth}}\n'.format(
k, v, kwidth=kwidth, vwidth=vwidth)
except Exception as e: # pylint: disable=broad-except
LOG.exception("Failed to format the next config (type=%s): %s.\n\n%s\n",
type(cfg), cfg, e)
form_labels = Result.get_labels()
form_labels.update(LabelUtil.style_labels({
'status': 'status',
'next_conf_available': 'next configuration'
'next_conf': 'next configuration',
}))
form_labels['title'] = 'Result Info'
context = {
@ -417,6 +432,7 @@ def result_view(request, project_id, session_id, result_id):
'metric_meta': metric_meta,
'status': status,
'next_conf_available': next_conf_available,
'next_conf': next_conf,
'labels': form_labels,
'project_id': project_id,
'session_id': session_id
@ -843,14 +859,15 @@ def download_debug_info(request, project_id, session_id): # pylint: disable=unu
def tuner_status_view(request, project_id, session_id, result_id): # pylint: disable=unused-argument
res = Result.objects.get(pk=result_id)
tasks = TaskUtil.get_tasks(res.task_ids)
task_ids = [t for t in (res.task_ids or '').split(',') if t.strip() != '']
tasks = TaskUtil.get_tasks(task_ids)
overall_status, num_completed = TaskUtil.get_task_status(tasks)
if overall_status in ['PENDING', 'RECEIVED', 'STARTED', None]:
overall_status, num_completed = TaskUtil.get_task_status(tasks, len(task_ids))
if overall_status in ['PENDING', 'RECEIVED', 'STARTED', 'UNAVAILABLE']:
completion_time = 'N/A'
total_runtime = 'N/A'
else:
completion_time = tasks[-1].date_done
completion_time = tasks.reverse()[0].date_done
total_runtime = (completion_time - res.creation_time).total_seconds()
total_runtime = '{0:.2f} seconds'.format(total_runtime)
@ -1066,7 +1083,6 @@ def get_timeline_data(request):
# get the lastest result
def give_result(request, upload_code): # pylint: disable=unused-argument
try:
session = Session.objects.get(upload_code=upload_code)
except Session.DoesNotExist:
@ -1074,17 +1090,19 @@ def give_result(request, upload_code): # pylint: disable=unused-argument
return HttpResponse("Invalid upload code: " + upload_code, status=400)
latest_result = Result.objects.filter(session=session).latest('creation_time')
tasks = TaskUtil.get_tasks(latest_result.task_ids)
overall_status, num_completed = TaskUtil.get_task_status(tasks)
response = {
'celery_status': overall_status,
'result_id': latest_result.pk,
'message': '',
'errors': [],
}
task_ids = [t for t in (latest_result.task_ids or '').split(',') if t.strip() != '']
tasks = TaskUtil.get_tasks(task_ids)
overall_status, num_completed = TaskUtil.get_task_status(tasks, len(task_ids))
next_config = latest_result.next_configuration
LOG.debug("result_id: %s, overall_status: %s, tasks_completed: %s/%s, "
"next_config: %s\n", latest_result.pk, overall_status, num_completed,
len(task_ids), next_config)
response = dict(celery_status=overall_status, result_id=latest_result.pk, message='', errors=[])
if overall_status == 'SUCCESS':
response.update(JSONUtil.loads(latest_result.next_configuration),
next_config = JSONUtil.loads(next_config)
response.update(next_config,
message='Celery successfully recommended the next configuration')
status_code = 200