From 346a2470b5877d7a7d1ec923956c03caf8ad95db Mon Sep 17 00:00:00 2001 From: bohanjason Date: Mon, 27 Apr 2020 20:14:18 -0400 Subject: [PATCH] support user defined metrics --- client/driver/driver_config.py | 6 ++ client/driver/fabfile.py | 27 ++++++-- .../user_defined_metrics.py | 66 +++++++++++++++++++ server/website/website/db/base/parser.py | 2 +- .../website/db/base/target_objective.py | 60 ++++++++++++++++- server/website/website/views.py | 60 ++++++++++++++++- 6 files changed, 208 insertions(+), 13 deletions(-) create mode 100644 client/driver/userDefinedMetrics/user_defined_metrics.py diff --git a/client/driver/driver_config.py b/client/driver/driver_config.py index e45c443..017c944 100644 --- a/client/driver/driver_config.py +++ b/client/driver/driver_config.py @@ -92,6 +92,12 @@ DRIVER_HOME = os.path.dirname(os.path.realpath(__file__)) # Path to the directory for storing results RESULT_DIR = os.path.join(DRIVER_HOME, 'results') +# Set this to add user defined metrics +ENABLE_UDM = True + +# Path to the User Defined Metrics (UDM), only required when ENABLE_UDM is True +UDM_DIR = os.path.join(DRIVER_HOME, 'userDefinedMetrics') + # Path to temp directory TEMP_DIR = '/tmp/driver' diff --git a/client/driver/fabfile.py b/client/driver/fabfile.py index c9f8971..97e181c 100644 --- a/client/driver/fabfile.py +++ b/client/driver/fabfile.py @@ -318,6 +318,8 @@ def signal_controller(): def save_dbms_result(): t = int(time.time()) files = ['knobs.json', 'metrics_after.json', 'metrics_before.json', 'summary.json'] + if dconf.ENABLE_UDM: + files.append('user_defined_metrics.json') for f_ in files: srcfile = os.path.join(dconf.CONTROLLER_HOME, 'output', f_) dstfile = os.path.join(dconf.RESULT_DIR, '{}__{}'.format(t, f_)) @@ -349,7 +351,10 @@ def upload_result(result_dir=None, prefix=None, upload_code=None): prefix = prefix or '' upload_code = upload_code or dconf.UPLOAD_CODE files = {} - for base in ('summary', 'knobs', 'metrics_before', 'metrics_after'): + bases = ['summary', 'knobs', 'metrics_before', 'metrics_after'] + if dconf.ENABLE_UDM: + bases.append('user_defined_metrics') + for base in bases: fpath = os.path.join(result_dir, prefix + base + '.json') # Replaces the true db version with the specified version to allow for @@ -466,8 +471,10 @@ def download_debug_info(pprint=False): @task -def add_udf(): - local('python3 ./LatencyUDF.py ../controller/output/') +def add_udm(result_dir=None): + result_dir = result_dir or os.path.join(dconf.CONTROLLER_HOME, 'output') + with lcd(dconf.UDM_DIR): # pylint: disable=not-context-manager + local('python3 user_defined_metrics.py {}'.format(result_dir)) @task @@ -632,8 +639,9 @@ def loop(i): p.join() - # add user defined target objective - # add_udf() + # add user defined metrics + if dconf.ENABLE_UDM is True: + add_udm() # save result result_timestamp = save_dbms_result() @@ -667,6 +675,8 @@ def run_loops(max_iter=10): 'knobs': b'{}', 'metrics_before': b'{}', 'metrics_after': b'{}'} + if dconf.ENABLE_UDM: + files['user_defined_metrics'] = b'{}' response = requests.post(dconf.WEBSITE_URL + '/new_result/', files=files, data={'upload_code': dconf.UPLOAD_CODE}) response = get_result() @@ -678,7 +688,7 @@ def run_loops(max_iter=10): # reload database periodically if dconf.RELOAD_INTERVAL > 0: # wait 5 secs after restarting databases - time.sleep(5) + time.sleep(15) if i % dconf.RELOAD_INTERVAL == 0: if i == 0 and dump is False: restore_database() @@ -701,7 +711,10 @@ def rename_batch(result_dir=None): prefix_len = os.path.basename(result).find('_') + 2 prefix = prefix[:prefix_len] new_prefix = str(i) + '__' - for base in ('summary', 'knobs', 'metrics_before', 'metrics_after'): + bases = ['summary', 'knobs', 'metrics_before', 'metrics_after'] + if dconf.ENABLE_UDM: + bases.append('user_defined_metrics') + for base in bases: fpath = os.path.join(result_dir, prefix + base + '.json') rename_path = os.path.join(result_dir, new_prefix + base + '.json') os.rename(fpath, rename_path) diff --git a/client/driver/userDefinedMetrics/user_defined_metrics.py b/client/driver/userDefinedMetrics/user_defined_metrics.py new file mode 100644 index 0000000..7f2eacc --- /dev/null +++ b/client/driver/userDefinedMetrics/user_defined_metrics.py @@ -0,0 +1,66 @@ +import json +import sys +import copy +import argparse +import os +sys.path.append("../../../") +from server.website.website.types import VarType # pylint: disable=import-error,wrong-import-position,line-too-long # noqa: E402 + + +parser = argparse.ArgumentParser() # pylint: disable=invalid-name +parser.add_argument("result_dir") +args = parser.parse_args() # pylint: disable=invalid-name + +HAS_TARGET_OBJECTIVE = True +USER_DEINFED_METRICS = { + "target_objective": { + "throughput": { + "more_is_better": True, + "unit": "transaction / second", + "short_unit": "txn/s", + "type": VarType.INTEGER + } + }, + "metrics": { + "latency_99": { + "unit": "microseconds", + "short_unit": "us", + "type": VarType.INTEGER + }, + "latency_95": { + "unit": "microseconds", + "short_unit": "us", + "type": VarType.INTEGER + } + } +} + + +def get_udm(): + with open('../oltpbench.summary', 'r') as f: + info = json.load(f) + metrics = copy.deepcopy(USER_DEINFED_METRICS) + if HAS_TARGET_OBJECTIVE is False: + metrics["target_objective"] = None + else: + assert len(metrics["target_objective"]) == 1, "It should have only one target objective" + metrics["target_objective"]["throughput"]["value"] =\ + info["Throughput (requests/second)"] + + metrics["metrics"]["latency_99"]["value"] =\ + info["Latency Distribution"]["99th Percentile Latency (microseconds)"] + metrics["metrics"]["latency_95"]["value"] =\ + info["Latency Distribution"]["95th Percentile Latency (microseconds)"] + return metrics + + +def write_udm(): + metrics = get_udm() + result_dir = args.result_dir + path = os.path.join(result_dir, 'user_defined_metrics.json') + with open(path, 'w') as f: + json.dump(metrics, f, indent=4) + + +if __name__ == "__main__": + write_udm() diff --git a/server/website/website/db/base/parser.py b/server/website/website/db/base/parser.py index 4660600..bb95d52 100644 --- a/server/website/website/db/base/parser.py +++ b/server/website/website/db/base/parser.py @@ -357,7 +357,7 @@ class BaseParser: LOG.warning("Changing metric %s from COUNTER to STATISTICS", met_name) met_info.metric_type = MetricType.STATISTICS met_info.save() - if allow_negative: + if allow_negative and adj_val < 0: LOG.warning('%s metric type %s value is negative (start=%s, end=%s, diff=%s)', met_name, MetricType.name(met_info.metric_type), start_val, end_val, end_val - start_val) diff --git a/server/website/website/db/base/target_objective.py b/server/website/website/db/base/target_objective.py index 0f76a66..169e499 100644 --- a/server/website/website/db/base/target_objective.py +++ b/server/website/website/db/base/target_objective.py @@ -16,7 +16,7 @@ LOG = logging.getLogger(__name__) LESS_IS_BETTER = '(less is better)' MORE_IS_BETTER = '(more is better)' THROUGHPUT = 'throughput_txn_per_sec' - +USER_DEFINED_TARGET = 'user_defined_metric' class BaseMetric: @@ -73,6 +73,30 @@ class BaseThroughput(BaseTargetObjective): return float(num_txns) / observation_time +class UserDefinedTargetObjective(BaseTargetObjective): + _improvement_choices = (LESS_IS_BETTER, MORE_IS_BETTER, '') + + def __init__(self): + super().__init__(name=USER_DEFINED_TARGET, pprint='User Defined Metric', unit='unknown', + short_unit='unknown', improvement='') + + def is_registered(self): + return USER_DEFINED_TARGET != self.name + + def register_target(self, name, more_is_better, unit, short_unit, pprint='User Defined Metric'): + self.name = name + assert isinstance(more_is_better, bool), 'more_is_better should be bool type' + if more_is_better: + self.improvement = MORE_IS_BETTER + else: + self.improvement = LESS_IS_BETTER + self.unit = unit + self.short_unit = short_unit + self.pprint = pprint + + def compute(self, metrics, observation_time): + return metrics[self.name] + class TargetObjectives: LESS_IS_BETTER = LESS_IS_BETTER MORE_IS_BETTER = MORE_IS_BETTER @@ -81,6 +105,7 @@ class TargetObjectives: def __init__(self): self._registry = {} self._metric_metadatas = {} + self._udm_metadatas = {} # user defined metrics self._default_target_objective = THROUGHPUT def register(self): @@ -109,9 +134,33 @@ class TargetObjectives: self._metric_metadatas[dbms_id] = [(mname, BaseMetric(mname)) for mname in sorted(numeric_metrics)] + LOG.info('Registering user defined target objectives...') + dbmss = models.DBMSCatalog.objects.all() + for dbms in dbmss: + dbms_id = int(dbms.pk) + if dbms_id not in self._registry: + self._registry[dbms_id] = {} + self._registry[dbms_id][USER_DEFINED_TARGET] = UserDefinedTargetObjective() + + def registered(self): return len(self._registry) > 0 + def udm_registered(self, dbms_id): + return dbms_id in self._udm_metadatas + + def register_udm(self, dbms_id, metrics): + if dbms_id in self._udm_metadatas: + LOG.warning('User Defined Metrics have already been registered, append to existing one') + metadatas = self._udm_metadatas[dbms_id] + else: + metadatas = [] + for name, info in metrics.items(): + name = 'udm.' + name + metadatas.append((name, + BaseMetric(name, unit=info['unit'], short_unit=info['short_unit']))) + self._udm_metadatas[dbms_id] = metadatas + def get_metric_metadata(self, dbms_id, target_objective): if not self.registered(): self.register() @@ -121,8 +170,13 @@ class TargetObjectives: if target_name == target_objective: targets_list.insert(0, (target_name, target_instance)) else: - targets_list.append((target_name, target_instance)) - metadata = targets_list + list(self._metric_metadatas[dbms_id]) + if target_name != USER_DEFINED_TARGET: + targets_list.append((target_name, target_instance)) + if dbms_id in self._udm_metadatas: + metadata = targets_list + list(self._udm_metadatas[dbms_id]) +\ + list(self._metric_metadatas[dbms_id]) + else: + metadata = targets_list + list(self._metric_metadatas[dbms_id]) return OrderedDict(metadata) def default(self): diff --git a/server/website/website/views.py b/server/website/website/views.py index c2b4f38..1a850a8 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -12,6 +12,7 @@ import re import shutil import socket import time +import copy from collections import OrderedDict from io import StringIO @@ -51,6 +52,7 @@ from .utils import (JSONUtil, LabelUtil, MediaUtil, TaskUtil) from .settings import LOG_DIR, TIME_ZONE, CHECK_CELERY from .set_default_knobs import set_default_knobs +from .db.base.target_objective import USER_DEFINED_TARGET LOG = logging.getLogger(__name__) @@ -486,6 +488,54 @@ def handle_result_files(session, files, execution_times=None): # Load the contents of the controller's summary file summary = JSONUtil.loads(files['summary']) + dbms_id = session.dbms.pk + udm_before = {} + udm_after = {} + if ('user_defined_metrics' not in files) and (USER_DEFINED_TARGET == session.target_objective): + return HttpResponse('ERROR: user defined target objective is not uploaded!') + # User defined metrics + udm = {} + if 'user_defined_metrics' in files: + udm = JSONUtil.loads(files['user_defined_metrics']) + if len(udm) > 0: + udm_target = udm['target_objective'] + udm_not_target = udm['metrics'] + udm_all = copy.deepcopy(udm_not_target) + if (udm_target is None) and (USER_DEFINED_TARGET == session.target_objective): + return HttpResponse('ERROR: user defined target objective is not uploaded!') + if udm_target is not None: + udm_all.update(udm_target) + if not target_objectives.udm_registered(dbms_id): + target_objectives.register_udm(dbms_id, udm_all) + for name, info in udm_all.items(): + udm_name = 'udm.' + name + udm_before[name] = 0 + udm_after[name] = info['value'] + if MetricCatalog.objects.filter(dbms=session.dbms, name=udm_name).exists(): + continue + udm_catalog = MetricCatalog.objects.create(dbms=session.dbms, + name=udm_name, + vartype=info['type'], + scope='global', + metric_type=MetricType.STATISTICS) + udm_catalog.summary = 'user defined metric, not target objective' + udm_catalog.save() + if udm_target is not None: + target_name = 'udm.' + list(udm_target.keys())[0] + pprint_name = 'udf.' + list(udm_target.keys())[0] + info = list(udm_target.values())[0] + if USER_DEFINED_TARGET != session.target_objective: + LOG.warning('the target objective is not user defined metric (UDM),\ + please disable UDM target objective in driver') + else: + udm_instance = target_objectives.get_instance(dbms_id, USER_DEFINED_TARGET) + if not udm_instance.is_registered(): + udm_instance.register_target(name=target_name, + more_is_better=info['more_is_better'], + unit=info['unit'], + short_unit=info['short_unit'], + pprint=pprint_name) + # Find worst throughput past_metrics = MetricData.objects.filter(session=session) metric_meta = target_objectives.get_instance(session.dbms.pk, session.target_objective) @@ -643,10 +693,16 @@ def handle_result_files(session, files, execution_times=None): JSONUtil.dumps(converted_knob_dict, pprint=True, sort=True), dbms) # Load, process, and store the runtime metrics exposed by the DBMS + metrics_before = JSONUtil.loads(files['metrics_before']) + metrics_after = JSONUtil.loads(files['metrics_after']) + # Add user defined metrics + if len(udm_before) > 0: + metrics_before['global']['udm'] = udm_before + metrics_after['global']['udm'] = udm_after initial_metric_dict, initial_metric_diffs = parser.parse_dbms_metrics( - dbms.pk, JSONUtil.loads(files['metrics_before'])) + dbms.pk, metrics_before) final_metric_dict, final_metric_diffs = parser.parse_dbms_metrics( - dbms.pk, JSONUtil.loads(files['metrics_after'])) + dbms.pk, metrics_after) metric_dict = parser.calculate_change_in_metrics( dbms.pk, initial_metric_dict, final_metric_dict) metric_diffs = OrderedDict([