support user defined metrics

This commit is contained in:
bohanjason 2020-04-27 20:14:18 -04:00 committed by Dana Van Aken
parent d4b3364819
commit 346a2470b5
6 changed files with 208 additions and 13 deletions

View File

@ -92,6 +92,12 @@ DRIVER_HOME = os.path.dirname(os.path.realpath(__file__))
# Path to the directory for storing results # Path to the directory for storing results
RESULT_DIR = os.path.join(DRIVER_HOME, 'results') RESULT_DIR = os.path.join(DRIVER_HOME, 'results')
# Set this to add user defined metrics
ENABLE_UDM = True
# Path to the User Defined Metrics (UDM), only required when ENABLE_UDM is True
UDM_DIR = os.path.join(DRIVER_HOME, 'userDefinedMetrics')
# Path to temp directory # Path to temp directory
TEMP_DIR = '/tmp/driver' TEMP_DIR = '/tmp/driver'

View File

@ -318,6 +318,8 @@ def signal_controller():
def save_dbms_result(): def save_dbms_result():
t = int(time.time()) t = int(time.time())
files = ['knobs.json', 'metrics_after.json', 'metrics_before.json', 'summary.json'] files = ['knobs.json', 'metrics_after.json', 'metrics_before.json', 'summary.json']
if dconf.ENABLE_UDM:
files.append('user_defined_metrics.json')
for f_ in files: for f_ in files:
srcfile = os.path.join(dconf.CONTROLLER_HOME, 'output', f_) srcfile = os.path.join(dconf.CONTROLLER_HOME, 'output', f_)
dstfile = os.path.join(dconf.RESULT_DIR, '{}__{}'.format(t, f_)) dstfile = os.path.join(dconf.RESULT_DIR, '{}__{}'.format(t, f_))
@ -349,7 +351,10 @@ def upload_result(result_dir=None, prefix=None, upload_code=None):
prefix = prefix or '' prefix = prefix or ''
upload_code = upload_code or dconf.UPLOAD_CODE upload_code = upload_code or dconf.UPLOAD_CODE
files = {} files = {}
for base in ('summary', 'knobs', 'metrics_before', 'metrics_after'): bases = ['summary', 'knobs', 'metrics_before', 'metrics_after']
if dconf.ENABLE_UDM:
bases.append('user_defined_metrics')
for base in bases:
fpath = os.path.join(result_dir, prefix + base + '.json') fpath = os.path.join(result_dir, prefix + base + '.json')
# Replaces the true db version with the specified version to allow for # Replaces the true db version with the specified version to allow for
@ -466,8 +471,10 @@ def download_debug_info(pprint=False):
@task @task
def add_udf(): def add_udm(result_dir=None):
local('python3 ./LatencyUDF.py ../controller/output/') result_dir = result_dir or os.path.join(dconf.CONTROLLER_HOME, 'output')
with lcd(dconf.UDM_DIR): # pylint: disable=not-context-manager
local('python3 user_defined_metrics.py {}'.format(result_dir))
@task @task
@ -632,8 +639,9 @@ def loop(i):
p.join() p.join()
# add user defined target objective # add user defined metrics
# add_udf() if dconf.ENABLE_UDM is True:
add_udm()
# save result # save result
result_timestamp = save_dbms_result() result_timestamp = save_dbms_result()
@ -667,6 +675,8 @@ def run_loops(max_iter=10):
'knobs': b'{}', 'knobs': b'{}',
'metrics_before': b'{}', 'metrics_before': b'{}',
'metrics_after': b'{}'} 'metrics_after': b'{}'}
if dconf.ENABLE_UDM:
files['user_defined_metrics'] = b'{}'
response = requests.post(dconf.WEBSITE_URL + '/new_result/', files=files, response = requests.post(dconf.WEBSITE_URL + '/new_result/', files=files,
data={'upload_code': dconf.UPLOAD_CODE}) data={'upload_code': dconf.UPLOAD_CODE})
response = get_result() response = get_result()
@ -678,7 +688,7 @@ def run_loops(max_iter=10):
# reload database periodically # reload database periodically
if dconf.RELOAD_INTERVAL > 0: if dconf.RELOAD_INTERVAL > 0:
# wait 5 secs after restarting databases # wait 5 secs after restarting databases
time.sleep(5) time.sleep(15)
if i % dconf.RELOAD_INTERVAL == 0: if i % dconf.RELOAD_INTERVAL == 0:
if i == 0 and dump is False: if i == 0 and dump is False:
restore_database() restore_database()
@ -701,7 +711,10 @@ def rename_batch(result_dir=None):
prefix_len = os.path.basename(result).find('_') + 2 prefix_len = os.path.basename(result).find('_') + 2
prefix = prefix[:prefix_len] prefix = prefix[:prefix_len]
new_prefix = str(i) + '__' new_prefix = str(i) + '__'
for base in ('summary', 'knobs', 'metrics_before', 'metrics_after'): bases = ['summary', 'knobs', 'metrics_before', 'metrics_after']
if dconf.ENABLE_UDM:
bases.append('user_defined_metrics')
for base in bases:
fpath = os.path.join(result_dir, prefix + base + '.json') fpath = os.path.join(result_dir, prefix + base + '.json')
rename_path = os.path.join(result_dir, new_prefix + base + '.json') rename_path = os.path.join(result_dir, new_prefix + base + '.json')
os.rename(fpath, rename_path) os.rename(fpath, rename_path)

View File

@ -0,0 +1,66 @@
import json
import sys
import copy
import argparse
import os
sys.path.append("../../../")
from server.website.website.types import VarType # pylint: disable=import-error,wrong-import-position,line-too-long # noqa: E402
parser = argparse.ArgumentParser() # pylint: disable=invalid-name
parser.add_argument("result_dir")
args = parser.parse_args() # pylint: disable=invalid-name
HAS_TARGET_OBJECTIVE = True
USER_DEINFED_METRICS = {
"target_objective": {
"throughput": {
"more_is_better": True,
"unit": "transaction / second",
"short_unit": "txn/s",
"type": VarType.INTEGER
}
},
"metrics": {
"latency_99": {
"unit": "microseconds",
"short_unit": "us",
"type": VarType.INTEGER
},
"latency_95": {
"unit": "microseconds",
"short_unit": "us",
"type": VarType.INTEGER
}
}
}
def get_udm():
with open('../oltpbench.summary', 'r') as f:
info = json.load(f)
metrics = copy.deepcopy(USER_DEINFED_METRICS)
if HAS_TARGET_OBJECTIVE is False:
metrics["target_objective"] = None
else:
assert len(metrics["target_objective"]) == 1, "It should have only one target objective"
metrics["target_objective"]["throughput"]["value"] =\
info["Throughput (requests/second)"]
metrics["metrics"]["latency_99"]["value"] =\
info["Latency Distribution"]["99th Percentile Latency (microseconds)"]
metrics["metrics"]["latency_95"]["value"] =\
info["Latency Distribution"]["95th Percentile Latency (microseconds)"]
return metrics
def write_udm():
metrics = get_udm()
result_dir = args.result_dir
path = os.path.join(result_dir, 'user_defined_metrics.json')
with open(path, 'w') as f:
json.dump(metrics, f, indent=4)
if __name__ == "__main__":
write_udm()

View File

@ -357,7 +357,7 @@ class BaseParser:
LOG.warning("Changing metric %s from COUNTER to STATISTICS", met_name) LOG.warning("Changing metric %s from COUNTER to STATISTICS", met_name)
met_info.metric_type = MetricType.STATISTICS met_info.metric_type = MetricType.STATISTICS
met_info.save() met_info.save()
if allow_negative: if allow_negative and adj_val < 0:
LOG.warning('%s metric type %s value is negative (start=%s, end=%s, diff=%s)', LOG.warning('%s metric type %s value is negative (start=%s, end=%s, diff=%s)',
met_name, MetricType.name(met_info.metric_type), start_val, end_val, met_name, MetricType.name(met_info.metric_type), start_val, end_val,
end_val - start_val) end_val - start_val)

View File

@ -16,7 +16,7 @@ LOG = logging.getLogger(__name__)
LESS_IS_BETTER = '(less is better)' LESS_IS_BETTER = '(less is better)'
MORE_IS_BETTER = '(more is better)' MORE_IS_BETTER = '(more is better)'
THROUGHPUT = 'throughput_txn_per_sec' THROUGHPUT = 'throughput_txn_per_sec'
USER_DEFINED_TARGET = 'user_defined_metric'
class BaseMetric: class BaseMetric:
@ -73,6 +73,30 @@ class BaseThroughput(BaseTargetObjective):
return float(num_txns) / observation_time return float(num_txns) / observation_time
class UserDefinedTargetObjective(BaseTargetObjective):
_improvement_choices = (LESS_IS_BETTER, MORE_IS_BETTER, '')
def __init__(self):
super().__init__(name=USER_DEFINED_TARGET, pprint='User Defined Metric', unit='unknown',
short_unit='unknown', improvement='')
def is_registered(self):
return USER_DEFINED_TARGET != self.name
def register_target(self, name, more_is_better, unit, short_unit, pprint='User Defined Metric'):
self.name = name
assert isinstance(more_is_better, bool), 'more_is_better should be bool type'
if more_is_better:
self.improvement = MORE_IS_BETTER
else:
self.improvement = LESS_IS_BETTER
self.unit = unit
self.short_unit = short_unit
self.pprint = pprint
def compute(self, metrics, observation_time):
return metrics[self.name]
class TargetObjectives: class TargetObjectives:
LESS_IS_BETTER = LESS_IS_BETTER LESS_IS_BETTER = LESS_IS_BETTER
MORE_IS_BETTER = MORE_IS_BETTER MORE_IS_BETTER = MORE_IS_BETTER
@ -81,6 +105,7 @@ class TargetObjectives:
def __init__(self): def __init__(self):
self._registry = {} self._registry = {}
self._metric_metadatas = {} self._metric_metadatas = {}
self._udm_metadatas = {} # user defined metrics
self._default_target_objective = THROUGHPUT self._default_target_objective = THROUGHPUT
def register(self): def register(self):
@ -109,9 +134,33 @@ class TargetObjectives:
self._metric_metadatas[dbms_id] = [(mname, BaseMetric(mname)) for mname self._metric_metadatas[dbms_id] = [(mname, BaseMetric(mname)) for mname
in sorted(numeric_metrics)] in sorted(numeric_metrics)]
LOG.info('Registering user defined target objectives...')
dbmss = models.DBMSCatalog.objects.all()
for dbms in dbmss:
dbms_id = int(dbms.pk)
if dbms_id not in self._registry:
self._registry[dbms_id] = {}
self._registry[dbms_id][USER_DEFINED_TARGET] = UserDefinedTargetObjective()
def registered(self): def registered(self):
return len(self._registry) > 0 return len(self._registry) > 0
def udm_registered(self, dbms_id):
return dbms_id in self._udm_metadatas
def register_udm(self, dbms_id, metrics):
if dbms_id in self._udm_metadatas:
LOG.warning('User Defined Metrics have already been registered, append to existing one')
metadatas = self._udm_metadatas[dbms_id]
else:
metadatas = []
for name, info in metrics.items():
name = 'udm.' + name
metadatas.append((name,
BaseMetric(name, unit=info['unit'], short_unit=info['short_unit'])))
self._udm_metadatas[dbms_id] = metadatas
def get_metric_metadata(self, dbms_id, target_objective): def get_metric_metadata(self, dbms_id, target_objective):
if not self.registered(): if not self.registered():
self.register() self.register()
@ -121,8 +170,13 @@ class TargetObjectives:
if target_name == target_objective: if target_name == target_objective:
targets_list.insert(0, (target_name, target_instance)) targets_list.insert(0, (target_name, target_instance))
else: else:
targets_list.append((target_name, target_instance)) if target_name != USER_DEFINED_TARGET:
metadata = targets_list + list(self._metric_metadatas[dbms_id]) targets_list.append((target_name, target_instance))
if dbms_id in self._udm_metadatas:
metadata = targets_list + list(self._udm_metadatas[dbms_id]) +\
list(self._metric_metadatas[dbms_id])
else:
metadata = targets_list + list(self._metric_metadatas[dbms_id])
return OrderedDict(metadata) return OrderedDict(metadata)
def default(self): def default(self):

View File

@ -12,6 +12,7 @@ import re
import shutil import shutil
import socket import socket
import time import time
import copy
from collections import OrderedDict from collections import OrderedDict
from io import StringIO from io import StringIO
@ -51,6 +52,7 @@ from .utils import (JSONUtil, LabelUtil, MediaUtil, TaskUtil)
from .settings import LOG_DIR, TIME_ZONE, CHECK_CELERY from .settings import LOG_DIR, TIME_ZONE, CHECK_CELERY
from .set_default_knobs import set_default_knobs from .set_default_knobs import set_default_knobs
from .db.base.target_objective import USER_DEFINED_TARGET
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -486,6 +488,54 @@ def handle_result_files(session, files, execution_times=None):
# Load the contents of the controller's summary file # Load the contents of the controller's summary file
summary = JSONUtil.loads(files['summary']) summary = JSONUtil.loads(files['summary'])
dbms_id = session.dbms.pk
udm_before = {}
udm_after = {}
if ('user_defined_metrics' not in files) and (USER_DEFINED_TARGET == session.target_objective):
return HttpResponse('ERROR: user defined target objective is not uploaded!')
# User defined metrics
udm = {}
if 'user_defined_metrics' in files:
udm = JSONUtil.loads(files['user_defined_metrics'])
if len(udm) > 0:
udm_target = udm['target_objective']
udm_not_target = udm['metrics']
udm_all = copy.deepcopy(udm_not_target)
if (udm_target is None) and (USER_DEFINED_TARGET == session.target_objective):
return HttpResponse('ERROR: user defined target objective is not uploaded!')
if udm_target is not None:
udm_all.update(udm_target)
if not target_objectives.udm_registered(dbms_id):
target_objectives.register_udm(dbms_id, udm_all)
for name, info in udm_all.items():
udm_name = 'udm.' + name
udm_before[name] = 0
udm_after[name] = info['value']
if MetricCatalog.objects.filter(dbms=session.dbms, name=udm_name).exists():
continue
udm_catalog = MetricCatalog.objects.create(dbms=session.dbms,
name=udm_name,
vartype=info['type'],
scope='global',
metric_type=MetricType.STATISTICS)
udm_catalog.summary = 'user defined metric, not target objective'
udm_catalog.save()
if udm_target is not None:
target_name = 'udm.' + list(udm_target.keys())[0]
pprint_name = 'udf.' + list(udm_target.keys())[0]
info = list(udm_target.values())[0]
if USER_DEFINED_TARGET != session.target_objective:
LOG.warning('the target objective is not user defined metric (UDM),\
please disable UDM target objective in driver')
else:
udm_instance = target_objectives.get_instance(dbms_id, USER_DEFINED_TARGET)
if not udm_instance.is_registered():
udm_instance.register_target(name=target_name,
more_is_better=info['more_is_better'],
unit=info['unit'],
short_unit=info['short_unit'],
pprint=pprint_name)
# Find worst throughput # Find worst throughput
past_metrics = MetricData.objects.filter(session=session) past_metrics = MetricData.objects.filter(session=session)
metric_meta = target_objectives.get_instance(session.dbms.pk, session.target_objective) metric_meta = target_objectives.get_instance(session.dbms.pk, session.target_objective)
@ -643,10 +693,16 @@ def handle_result_files(session, files, execution_times=None):
JSONUtil.dumps(converted_knob_dict, pprint=True, sort=True), dbms) JSONUtil.dumps(converted_knob_dict, pprint=True, sort=True), dbms)
# Load, process, and store the runtime metrics exposed by the DBMS # Load, process, and store the runtime metrics exposed by the DBMS
metrics_before = JSONUtil.loads(files['metrics_before'])
metrics_after = JSONUtil.loads(files['metrics_after'])
# Add user defined metrics
if len(udm_before) > 0:
metrics_before['global']['udm'] = udm_before
metrics_after['global']['udm'] = udm_after
initial_metric_dict, initial_metric_diffs = parser.parse_dbms_metrics( initial_metric_dict, initial_metric_diffs = parser.parse_dbms_metrics(
dbms.pk, JSONUtil.loads(files['metrics_before'])) dbms.pk, metrics_before)
final_metric_dict, final_metric_diffs = parser.parse_dbms_metrics( final_metric_dict, final_metric_diffs = parser.parse_dbms_metrics(
dbms.pk, JSONUtil.loads(files['metrics_after'])) dbms.pk, metrics_after)
metric_dict = parser.calculate_change_in_metrics( metric_dict = parser.calculate_change_in_metrics(
dbms.pk, initial_metric_dict, final_metric_dict) dbms.pk, initial_metric_dict, final_metric_dict)
metric_diffs = OrderedDict([ metric_diffs = OrderedDict([