diff --git a/client/driver/fabfile.py b/client/driver/fabfile.py index 0272ff5..2a1895a 100644 --- a/client/driver/fabfile.py +++ b/client/driver/fabfile.py @@ -338,7 +338,6 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None): url = dconf.WEBSITE_URL + '/query_and_get/' + upload_code elapsed = 0 response_dict = None - response = '' rout = '' while elapsed <= max_time_sec: @@ -347,8 +346,9 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None): assert response != 'null' rout = json.dumps(response, indent=4) if isinstance(response, dict) else response - LOG.debug('%s [status code: %d, content_type: %s, elapsed: %ds]', rout, - rsp.status_code, rsp.headers.get('Content-Type', ''), elapsed) + LOG.debug('%s\n\n[status code: %d, type(response): %s, elapsed: %ds, %s]', rout, + rsp.status_code, type(response), elapsed, + ', '.join(['{}: {}'.format(k, v) for k, v in rsp.headers.items()])) if rsp.status_code == 200: # Success @@ -368,12 +368,12 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None): elif rsp.status_code == 500: # Failure - if len(response) > 5000: - with open('error.html', 'w') as f: + msg = rout + if isinstance(response, str): + savepath = os.path.join(dconf.LOG_DIR, 'error.html') + with open(savepath, 'w') as f: f.write(response) - msg = "Saved HTML error to 'error.html'." - else: - msg = rout + msg = "Saved HTML error to '{}'.".format(os.path.relpath(savepath)) raise Exception( "Failed to download the next config.\nStatus code: {}\nMessage: {}\n".format( rsp.status_code, msg)) @@ -385,12 +385,12 @@ def get_result(max_time_sec=180, interval_sec=5, upload_code=None): if not response_dict: assert elapsed > max_time_sec, \ 'response={} but elapsed={}s <= max_time={}s'.format( - response, elapsed, max_time_sec) + rout, elapsed, max_time_sec) raise Exception( 'Failed to download the next config in {}s: {} (elapsed: {}s)'.format( max_time_sec, rout, elapsed)) - LOG.info('Downloaded the next config in %ds: %s', elapsed, rout) + LOG.info('Downloaded the next config in %ds', elapsed) return response_dict @@ -792,3 +792,5 @@ def integration_tests(): upload_code='ottertuneTestTuningGPR') response = get_result(upload_code='ottertuneTestTuningGPR') assert response['status'] == 'good' + + LOG.info("\n\nIntegration Tests: PASSED!!\n") diff --git a/server/website/tests/test_utils.py b/server/website/tests/test_utils.py index 9cb88b7..0a44ade 100644 --- a/server/website/tests/test_utils.py +++ b/server/website/tests/test_utils.py @@ -83,58 +83,56 @@ class TaskUtilTest(TestCase): # FIXME: Actually setup celery tasks instead of a dummy class? test_tasks = [] - (status, num_complete) = TaskUtil.get_task_status(test_tasks) - self.assertTrue(status is None and num_complete == 0) + (status, num_complete) = TaskUtil.get_task_status(test_tasks, 1) + self.assertTrue(status is 'UNAVAILABLE' and num_complete == 0) + + (status, num_complete) = TaskUtil.get_task_status(test_tasks, 0) + self.assertTrue(status is 'UNAVAILABLE' and num_complete == 0) test_tasks2 = [VarType() for i in range(5)] for task in test_tasks2: task.status = "SUCCESS" - (status, num_complete) = TaskUtil.get_task_status(test_tasks2) + (status, num_complete) = TaskUtil.get_task_status(test_tasks2, 5) self.assertTrue(status == "SUCCESS" and num_complete == 5) test_tasks3 = test_tasks2 test_tasks3[3].status = "FAILURE" - (status, num_complete) = TaskUtil.get_task_status(test_tasks3) + (status, num_complete) = TaskUtil.get_task_status(test_tasks3, 5) self.assertTrue(status == "FAILURE" and num_complete == 3) test_tasks4 = test_tasks3 test_tasks4[2].status = "REVOKED" - (status, num_complete) = TaskUtil.get_task_status(test_tasks4) + (status, num_complete) = TaskUtil.get_task_status(test_tasks4, 5) self.assertTrue(status == "REVOKED" and num_complete == 2) test_tasks5 = test_tasks4 test_tasks5[1].status = "RETRY" - (status, num_complete) = TaskUtil.get_task_status(test_tasks5) + (status, num_complete) = TaskUtil.get_task_status(test_tasks5, 5) self.assertTrue(status == "RETRY" and num_complete == 1) test_tasks6 = [VarType() for i in range(10)] for i, task in enumerate(test_tasks6): task.status = "PENDING" if i % 2 == 0 else "SUCCESS" - (status, num_complete) = TaskUtil.get_task_status(test_tasks6) + (status, num_complete) = TaskUtil.get_task_status(test_tasks6, 10) self.assertTrue(status == "PENDING" and num_complete == 5) test_tasks7 = test_tasks6 test_tasks7[9].status = "STARTED" - (status, num_complete) = TaskUtil.get_task_status(test_tasks7) + (status, num_complete) = TaskUtil.get_task_status(test_tasks7, 10) self.assertTrue(status == "STARTED" and num_complete == 4) test_tasks8 = test_tasks7 test_tasks8[9].status = "RECEIVED" - (status, num_complete) = TaskUtil.get_task_status(test_tasks8) + (status, num_complete) = TaskUtil.get_task_status(test_tasks8, 10) self.assertTrue(status == "RECEIVED" and num_complete == 4) - with self.assertRaises(Exception): - test_tasks9 = [VarType() for i in range(1)] - test_tasks9[0].status = "attemped" - TaskUtil.get_task_status(test_tasks9) - class DataUtilTest(TestCase): diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 2aa76ab..68ab9b8 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -28,7 +28,7 @@ from analysis.constraints import ParamConstraintHelper from website.models import (PipelineData, PipelineRun, Result, Workload, KnobCatalog, SessionKnob, MetricCatalog) from website import db -from website.types import PipelineTaskType, AlgorithmType +from website.types import PipelineTaskType, AlgorithmType, VarType from website.utils import DataUtil, JSONUtil from website.settings import IMPORTANT_KNOB_NUMBER, NUM_SAMPLES, TOP_NUM_CONFIG # pylint: disable=no-name-in-module from website.settings import (USE_GPFLOW, DEFAULT_LENGTH_SCALE, DEFAULT_MAGNITUDE, @@ -47,7 +47,6 @@ from website.settings import (USE_GPFLOW, DEFAULT_LENGTH_SCALE, DEFAULT_MAGNITUD GPR_MODEL_NAME, ENABLE_DUMMY_ENCODER, DNN_GD_ITER) from website.settings import INIT_FLIP_PROB, FLIP_PROB_DECAY -from website.types import VarType LOG = get_task_logger(__name__) @@ -116,32 +115,59 @@ class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-metho def clean_knob_data(knob_matrix, knob_labels, session): # Makes sure that all knobs in the dbms are included in the knob_matrix and knob_labels - knob_cat = SessionKnob.objects.get_knobs_for_session(session) - knob_cat = [knob["name"] for knob in knob_cat if knob["tunable"]] - matrix = np.array(knob_matrix) - missing_columns = set(knob_cat) - set(knob_labels) - unused_columns = set(knob_labels) - set(knob_cat) - LOG.debug("clean_knob_data added %d knobs and removed %d knobs.", len(missing_columns), - len(unused_columns)) - # If columns are missing from the matrix - if missing_columns: - for knob in missing_columns: - knob_object = KnobCatalog.objects.get(dbms=session.dbms, name=knob, tunable=True) - index = knob_cat.index(knob) + knob_matrix = np.array(knob_matrix) + session_knobs = SessionKnob.objects.get_knobs_for_session(session) + knob_cat = [k['name'] for k in session_knobs] + + if knob_cat == knob_labels: + # Nothing to do! + return knob_matrix, knob_labels + + LOG.info("session_knobs: %s, knob_labels: %s, missing: %s, extra: %s", len(knob_cat), + len(knob_labels), len(set(knob_cat) - set(knob_labels)), + len(set(knob_labels) - set(knob_cat))) + + nrows = knob_matrix.shape[0] # pylint: disable=unsubscriptable-object + new_labels = [] + new_columns = [] + + for knob in session_knobs: + knob_name = knob['name'] + if knob_name not in knob_labels: + # Add missing column initialized to knob's default value + default_val = knob['default'] try: - default_val = float(knob_object.default) + if knob['vartype'] == VarType.ENUM: + default_val = knob['enumvals'].split(',').index(default_val) + elif knob['vartype'] == VarType.BOOL: + default_val = str(default_val).lower() in ("on", "true", "yes", "0") + else: + default_val = float(default_val) except ValueError: default_val = 0 - matrix = np.insert(matrix, index, default_val, axis=1) - knob_labels.insert(index, knob) - # If they are useless columns in the matrix - if unused_columns: - indexes = [i for i, n in enumerate(knob_labels) if n in unused_columns] - # Delete unused columns - matrix = np.delete(matrix, indexes, 1) - for i in sorted(indexes, reverse=True): - del knob_labels[i] - return matrix, knob_labels + LOG.exception("Error parsing knob default: %s=%s", knob_name, default_val) + new_col = np.ones((nrows, 1), dtype=float) * default_val + new_lab = knob_name + else: + index = knob_labels.index(knob_name) + new_col = knob_matrix[:, index] + new_lab = knob_labels[index] + + new_labels.append(new_lab) + new_columns.append(new_col) + + new_matrix = np.hstack(new_columns).reshape(nrows, -1) + LOG.debug("Cleaned matrix: %s, knobs (%s): %s", new_matrix.shape, + len(new_labels), new_labels) + + assert new_labels == knob_cat, \ + "Expected knobs: {}\nActual knobs: {}\n".format( + knob_cat, new_labels) + assert new_matrix.shape == (nrows, len(knob_cat)), \ + "Expected shape: {}, Actual shape: {}".format( + (nrows, len(knob_cat)), new_matrix.shape) + + return new_matrix, new_labels def clean_metric_data(metric_matrix, metric_labels, session): @@ -438,8 +464,7 @@ def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-n metric_data = metric_data.flatten() metric_scalar = MinMaxScaler().fit(metric_data.reshape(1, -1)) normalized_metric_data = metric_scalar.transform(metric_data.reshape(1, -1))[0] - cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], - session) + cleaned_knob_data = clean_knob_data(agg_data['X_matrix'], agg_data['X_columnlabels'], session) knob_labels = np.array(cleaned_knob_data[1]).flatten() knob_num = len(knob_labels) metric_num = len(metric_data) @@ -834,7 +859,6 @@ def map_workload(map_workload_input): knob_data["data"], knob_data["columnlabels"] = clean_knob_data(knob_data["data"], knob_data["columnlabels"], newest_result.session) - metric_data = load_data_helper(pipeline_data, unique_workload, PipelineTaskType.METRIC_DATA) X_matrix = np.array(knob_data["data"]) y_matrix = np.array(metric_data["data"]) diff --git a/server/website/website/templates/result.html b/server/website/website/templates/result.html index ff494a7..9fb7b5d 100644 --- a/server/website/website/templates/result.html +++ b/server/website/website/templates/result.html @@ -12,9 +12,13 @@
{{ labels.id }}
{{ result.pk }} + +
{{ labels.creation_time }}
+ {{ result.creation_time }} +
{{ labels.session }}
- {{ result.session.name }} + {{ result.session.name }}
{{ labels.workload }}
@@ -28,10 +32,6 @@
{{ labels.metric_data }}
{{ result.metric_data.name }} - -
{{ labels.creation_time }}
- {{ result.creation_time }} - {% for key, value in default_metrics.items %}
{{ metric_meta|get_item:key|get_attr:"pprint" }}
@@ -47,8 +47,16 @@ {% endif %} {% if next_conf_available %} -
{{ labels.next_conf_available }}
- Download + +
+ {{ labels.next_conf }}
+ +
+ +
{{ next_conf }}
{% endif %} diff --git a/server/website/website/utils.py b/server/website/website/utils.py index eb29860..0f2d0b5 100644 --- a/server/website/website/utils.py +++ b/server/website/website/utils.py @@ -82,11 +82,8 @@ class TaskUtil(object): return TaskMeta.objects.filter(task_id__in=task_ids).order_by(preserved) @staticmethod - def get_task_status(tasks): - if not tasks: - return None, 0 - - overall_status = 'SUCCESS' + def get_task_status(tasks, num_tasks): + overall_status = 'UNAVAILABLE' num_completed = 0 for task in tasks: status = task.status @@ -101,6 +98,9 @@ class TaskUtil(object): task.id, status, task.task_id) overall_status = status + if num_tasks > 0 and num_tasks == num_completed: + overall_status = 'SUCCESS' + return overall_status, num_completed @@ -484,3 +484,13 @@ def delete_user(username): deleted = False return delete_info, deleted + + +def model_to_dict2(m, exclude=None): + exclude = exclude or [] + d = {} + for f in m._meta.fields: # pylint: disable=protected-access + fname = f.name + if fname not in exclude: + d[fname] = getattr(m, fname, None) + return d diff --git a/server/website/website/views.py b/server/website/website/views.py index 3026671..8a5748e 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -398,18 +398,33 @@ def result_view(request, project_id, session_id, result_id): # default_metrics = {mname: metric_data[mname] * metric_meta[mname].scale # for mname in default_metrics} - status = None - if target.task_ids is not None: - tasks = TaskUtil.get_tasks(target.task_ids) - status, _ = TaskUtil.get_task_status(tasks) - if status is None: - status = 'UNAVAILABLE' + task_ids = [t for t in (target.task_ids or '').split(',') if t.strip() != ''] + tasks = TaskUtil.get_tasks(task_ids) + status, _ = TaskUtil.get_task_status(tasks, len(task_ids)) next_conf_available = True if status == 'SUCCESS' else False + next_conf = '' + cfg = target.next_configuration + LOG.debug("status: %s, next_conf_available: %s, next_conf: %s, type: %s", + status, next_conf_available, cfg, type(cfg)) + + if next_conf_available: + try: + cfg = JSONUtil.loads(cfg)['recommendation'] + kwidth = max(len(k) for k in cfg.keys()) + vwidth = max(len(str(v)) for v in cfg.values()) + next_conf = '' + for k, v in cfg.items(): + next_conf += '{: <{kwidth}} = {: >{vwidth}}\n'.format( + k, v, kwidth=kwidth, vwidth=vwidth) + except Exception as e: # pylint: disable=broad-except + LOG.exception("Failed to format the next config (type=%s): %s.\n\n%s\n", + type(cfg), cfg, e) + form_labels = Result.get_labels() form_labels.update(LabelUtil.style_labels({ 'status': 'status', - 'next_conf_available': 'next configuration' + 'next_conf': 'next configuration', })) form_labels['title'] = 'Result Info' context = { @@ -417,6 +432,7 @@ def result_view(request, project_id, session_id, result_id): 'metric_meta': metric_meta, 'status': status, 'next_conf_available': next_conf_available, + 'next_conf': next_conf, 'labels': form_labels, 'project_id': project_id, 'session_id': session_id @@ -843,14 +859,15 @@ def download_debug_info(request, project_id, session_id): # pylint: disable=unu def tuner_status_view(request, project_id, session_id, result_id): # pylint: disable=unused-argument res = Result.objects.get(pk=result_id) - tasks = TaskUtil.get_tasks(res.task_ids) + task_ids = [t for t in (res.task_ids or '').split(',') if t.strip() != ''] + tasks = TaskUtil.get_tasks(task_ids) - overall_status, num_completed = TaskUtil.get_task_status(tasks) - if overall_status in ['PENDING', 'RECEIVED', 'STARTED', None]: + overall_status, num_completed = TaskUtil.get_task_status(tasks, len(task_ids)) + if overall_status in ['PENDING', 'RECEIVED', 'STARTED', 'UNAVAILABLE']: completion_time = 'N/A' total_runtime = 'N/A' else: - completion_time = tasks[-1].date_done + completion_time = tasks.reverse()[0].date_done total_runtime = (completion_time - res.creation_time).total_seconds() total_runtime = '{0:.2f} seconds'.format(total_runtime) @@ -1066,7 +1083,6 @@ def get_timeline_data(request): # get the lastest result def give_result(request, upload_code): # pylint: disable=unused-argument - try: session = Session.objects.get(upload_code=upload_code) except Session.DoesNotExist: @@ -1074,17 +1090,19 @@ def give_result(request, upload_code): # pylint: disable=unused-argument return HttpResponse("Invalid upload code: " + upload_code, status=400) latest_result = Result.objects.filter(session=session).latest('creation_time') - tasks = TaskUtil.get_tasks(latest_result.task_ids) - overall_status, num_completed = TaskUtil.get_task_status(tasks) - response = { - 'celery_status': overall_status, - 'result_id': latest_result.pk, - 'message': '', - 'errors': [], - } + task_ids = [t for t in (latest_result.task_ids or '').split(',') if t.strip() != ''] + tasks = TaskUtil.get_tasks(task_ids) + overall_status, num_completed = TaskUtil.get_task_status(tasks, len(task_ids)) + next_config = latest_result.next_configuration + LOG.debug("result_id: %s, overall_status: %s, tasks_completed: %s/%s, " + "next_config: %s\n", latest_result.pk, overall_status, num_completed, + len(task_ids), next_config) + + response = dict(celery_status=overall_status, result_id=latest_result.pk, message='', errors=[]) if overall_status == 'SUCCESS': - response.update(JSONUtil.loads(latest_result.next_configuration), + next_config = JSONUtil.loads(next_config) + response.update(next_config, message='Celery successfully recommended the next configuration') status_code = 200