Consolidated driver scripts and added status codes to the query_and_get and new_result responses

This commit is contained in:
Dana Van Aken 2019-10-01 01:17:49 -04:00
parent 8743c0c2b1
commit d1c13de42f
6 changed files with 256 additions and 225 deletions

4
client/driver/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
dumpfiles
log
results
next_config

View File

@ -1,69 +0,0 @@
#
# OtterTune - ConfParser.py
#
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
#
'''
Created on Mar 23, 2018
@author: Jacky, bohan, Dongsheng
'''
import sys
import json
from collections import OrderedDict
def change_postgres_conf(recommendation, postgresqlconf):
lines = postgresqlconf.readlines()
settings_idx = lines.index("# Add settings for extensions here\n")
postgresqlconf.seek(0)
postgresqlconf.truncate(0)
lines = lines[0:(settings_idx + 1)]
for line in lines:
postgresqlconf.write(line)
for (knob_name, knob_value) in list(recommendation.items()):
postgresqlconf.write(str(knob_name) + " = " + str(knob_value) + "\n")
def change_oracle_conf(recommendation, oracle_conf):
lines = oracle_conf.readlines()
signal = "# configurations recommended by ottertune:\n"
if signal not in lines:
oracle_conf.write('\n' + signal)
oracle_conf.flush()
oracle_conf.seek(0)
lines = oracle_conf.readlines()
settings_idx = lines.index(signal)
oracle_conf.seek(0)
oracle_conf.truncate(0)
lines = lines[0:(settings_idx + 1)]
for line in lines:
oracle_conf.write(line)
for (knob_name, knob_value) in list(recommendation.items()):
oracle_conf.write(str(knob_name) + " = " + str(knob_value).strip('B') + "\n")
def main():
if len(sys.argv) != 4:
raise Exception("Usage: python [DB type] ConfParser.py [Next Config] [Current Config]")
database_type = sys.argv[1]
next_config_name = sys.argv[2]
cur_config_name = sys.argv[3]
with open(next_config_name, 'r') as next_config, open(cur_config_name, 'r+') as cur_config:
config = json.load(next_config, encoding="UTF-8", object_pairs_hook=OrderedDict)
recommendation = config['recommendation']
if database_type == 'postgres':
change_postgres_conf(recommendation, cur_config)
elif database_type == 'oracle':
change_oracle_conf(recommendation, cur_config)
else:
raise Exception("Database Type {} Not Implemented !".format(database_type))
if __name__ == "__main__":
main()

View File

@ -3,19 +3,18 @@
"database_name" : "tpcc",
"database_disk": "/dev/xvda1",
"database_conf": "/etc/postgresql/9.6/main/postgresql.conf",
"database_save_path": "/home/ubuntu/ottertune",
"username" : "bohan",
"password" : "bohan",
"oltpbench_home": "/home/ubuntu/oltpbench",
"oltpbench_config": "/home/ubuntu/oltpbench/config/tpcc_config_postgres.xml",
"database_save_path": "~/ottertune/client/driver/dumpfiles",
"username" : "dbuser",
"password" : "dbpassword",
"oltpbench_home": "~/oltpbench",
"oltpbench_config": "~/oltpbench/config/tpcc_config_postgres.xml",
"oltpbench_workload": "tpcc",
"oltpbench_log" : "/home/ubuntu/ottertune/client/driver/oltp.log",
"controller_config": "/home/ubuntu/ottertune/client/controller/config/sample_postgres_config.json",
"controller_log" : "/home/ubuntu/ottertune/client/driver/controller.log",
"save_path": "/home/ubuntu/results",
"controller_home": "~/ottertune/client/controller",
"log_path": "~/ottertune/client/driver/log",
"save_path": "~/ottertune/client/driver/results",
"upload_url" : "http://127.0.0.1:8000",
"upload_code" : "I5I10PXK3PK27FM86YYS",
"lhs_knob_path" : "/home/ubuntu/ottertune/client/driver/knobs/postgres-96.json",
"lhs_save_path" : "/home/ubuntu/ottertune/client/driver/configs",
"lhs_knob_path" : "~/ottertune/client/driver/knobs/postgres-96.json",
"lhs_save_path" : "~/ottertune/client/driver/configs",
"oracle_awr_enabled": false
}

View File

@ -8,27 +8,21 @@ Created on Mar 23, 2018
@author: bohan
'''
import sys
import glob
import json
import logging
import time
import os
import re
import glob
import time
from collections import OrderedDict
from multiprocessing import Process
from fabric.api import (env, local, task, lcd)
import logging
from logging.handlers import RotatingFileHandler
import requests
from fabric.api import env, local, task, lcd
from fabric.state import output as fabric_output
LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)
Formatter = logging.Formatter( # pylint: disable=invalid-name
"%(asctime)s [%(levelname)s] %(message)s")
# print the log
ConsoleHandler = logging.StreamHandler(sys.stdout) # pylint: disable=invalid-name
ConsoleHandler.setFormatter(Formatter)
LOG.addHandler(ConsoleHandler)
# Fabric environment settings
env.hosts = ['localhost']
fabric_output.update({
@ -41,20 +35,55 @@ RELOAD_INTERVAL = 10
# maximum disk usage
MAX_DISK_USAGE = 90
with open('driver_config.json', 'r') as f:
CONF = json.load(f)
# Load config
with open('driver_config.json', 'r') as _f:
CONF = {k: os.path.expanduser(v) if isinstance(v, str) and v.startswith('~') else v
for k, v in json.load(_f).items()}
# Create output directories
for _dir in (CONF['database_save_path'], CONF['log_path'], CONF['save_path'],
CONF['lhs_save_path']):
os.makedirs(_dir, exist_ok=True)
# Define paths
CONF['driver_log'] = os.path.join(CONF['log_path'], 'driver.log')
CONF['oltpbench_log'] = os.path.join(CONF['log_path'], 'oltpbench.log')
CONF['controller_log'] = os.path.join(CONF['log_path'], 'controller.log')
CONF['controller_config'] = os.path.join(CONF['controller_home'], 'config',
'{}_config.json'.format(CONF['database_type']))
# Configure logging
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
Formatter = logging.Formatter( # pylint: disable=invalid-name
fmt='%(asctime)s [%(funcName)s:%(lineno)03d] %(levelname)-5s: %(message)s',
datefmt='%m-%d-%Y %H:%M:%S')
ConsoleHandler = logging.StreamHandler() # pylint: disable=invalid-name
ConsoleHandler.setFormatter(Formatter)
LOG.addHandler(ConsoleHandler)
FileHandler = RotatingFileHandler( # pylint: disable=invalid-name
CONF['driver_log'], maxBytes=50000, backupCount=2)
FileHandler.setFormatter(Formatter)
LOG.addHandler(FileHandler)
def _parse_bool(value):
if not isinstance(value, bool):
value = str(value).lower() == 'true'
return value
@task
def check_disk_usage():
partition = CONF['database_disk']
disk_use = 0
cmd = "df -h {}".format(partition)
out = local(cmd, capture=True).splitlines()[1]
m = re.search('\d+(?=%)', out) # pylint: disable=anomalous-backslash-in-string
if m:
disk_use = int(m.group(0))
LOG.info("Current Disk Usage: %s%s", disk_use, '%')
if partition:
cmd = "df -h {}".format(partition)
out = local(cmd, capture=True).splitlines()[1]
m = re.search(r'\d+(?=%)', out)
if m:
disk_use = int(m.group(0))
LOG.info("Current Disk Usage: %s%s", disk_use, '%')
return disk_use
@ -64,6 +93,29 @@ def check_memory_usage():
local(cmd)
@task
def create_controller_config():
if CONF['database_type'] == 'postgres':
dburl_fmt = 'jdbc:postgresql://localhost:5432/{db}'.format
elif CONF['database_type'] == 'oracle':
dburl_fmt = 'jdbc:oracle:thin:@localhost:1521:{db}'.format
else:
raise Exception("Database Type {} Not Implemented !".format(CONF['database_type']))
config = dict(
database_type=CONF['database_type'],
database_url=dburl_fmt(db=CONF['database_name']),
username=CONF['username'],
password=CONF['password'],
upload_code='DEPRECATED',
upload_url='DEPRECATED',
workload_name=CONF['oltpbench_workload']
)
with open(CONF['controller_config'], 'w') as f:
json.dump(config, f, indent=2)
@task
def restart_database():
if CONF['database_type'] == 'postgres':
@ -96,11 +148,45 @@ def create_database():
@task
def change_conf():
next_conf = 'next_config'
cmd = "sudo python3 ConfParser.py {} {} {}".\
format(CONF['database_type'], next_conf, CONF['database_conf'])
local(cmd)
def reset_conf():
change_conf(next_conf='')
@task
def change_conf(next_conf='next_config'):
signal = "# configurations recommended by ottertune:\n"
next_conf = next_conf or {}
with open(CONF['database_conf'], 'r') as f:
lines = f.readlines()
if signal not in lines:
lines += ['\n', signal]
signal_idx = lines.index(signal)
lines = lines[0:signal_idx + 1]
if isinstance(next_conf, str):
with open(next_conf, 'r') as f:
recommendation = json.load(
f, encoding="UTF-8", object_pairs_hook=OrderedDict)['recommendation']
else:
recommendation = next_conf
assert isinstance(recommendation, dict)
for name, value in recommendation.items():
if CONF['database_type'] == 'oracle' and isinstance(value, str):
value = value.strip('B')
lines.append('{} = {}\n'.format(name, value))
lines.append('\n')
tmpconf = 'tmp_' + os.path.basename(CONF['database_conf'])
with open(tmpconf, 'w') as f:
f.write(''.join(lines))
local('sudo cp {0} {0}.ottertune.bak'.format(CONF['database_conf']))
local('sudo mv {} {}'.format(tmpconf, CONF['database_conf']))
@task
@ -129,31 +215,31 @@ def run_oltpbench_bg():
@task
def run_controller():
cmd = 'sudo gradle run -PappArgs="-c {} -d output/" --no-daemon > {}'.\
if not os.path.exists(CONF['controller_config']):
create_controller_config()
cmd = 'gradle run -PappArgs="-c {} -d output/" --no-daemon > {}'.\
format(CONF['controller_config'], CONF['controller_log'])
with lcd("../controller"): # pylint: disable=not-context-manager
with lcd(CONF['controller_home']): # pylint: disable=not-context-manager
local(cmd)
@task
def signal_controller():
pid = int(open('../controller/pid.txt').read())
pidfile = os.path.join(CONF['controller_home'], 'pid.txt')
pid = int(open(pidfile).read())
cmd = 'sudo kill -2 {}'.format(pid)
with lcd("../controller"): # pylint: disable=not-context-manager
with lcd(CONF['controller_home']): # pylint: disable=not-context-manager
local(cmd)
@task
def save_dbms_result():
if not os.path.exists(CONF['save_path']):
os.makedirs(CONF['save_path'])
t = int(time.time())
files = ['knobs.json', 'metrics_after.json', 'metrics_before.json', 'summary.json']
for f_ in files:
f_prefix = f_.split('.')[0]
cmd = 'cp ../controller/output/{} {}/{}__{}.json'.\
format(f_, CONF['save_path'], t, f_prefix)
local(cmd)
srcfile = os.path.join(CONF['controller_home'], 'output', f_)
dstfile = os.path.join(CONF['save_path'], '{}__{}'.format(t, f_))
local('cp {} {}'.format(srcfile, dstfile))
@task
@ -163,18 +249,83 @@ def free_cache():
@task
def upload_result():
cmd = 'python3 ../../server/website/script/upload/upload.py \
../controller/output/ {} {}/new_result/'.format(CONF['upload_code'],
CONF['upload_url'])
local(cmd)
def upload_result(result_dir=None, prefix=None):
result_dir = result_dir or os.path.join(CONF['controller_home'], 'output')
prefix = prefix or ''
files = {}
for base in ('summary', 'knobs', 'metrics_before', 'metrics_after'):
fpath = os.path.join(result_dir, prefix + base + '.json')
# Replaces the true db version with the specified version to allow for
# testing versions not officially supported by OtterTune
if base == 'summary' and 'override_database_version' in CONF and \
CONF['override_database_version']:
with open(fpath, 'r') as f:
summary = json.load(f)
summary['real_database_version'] = summary['database_version']
summary['database_version'] = CONF['override_database_version']
with open(fpath, 'w') as f:
json.dump(summary, f, indent=1)
files[base] = open(fpath, 'rb')
response = requests.post(CONF['upload_url'] + '/new_result/', files=files,
data={'upload_code': CONF['upload_code']})
if response.status_code != 200:
raise Exception('Error uploading result.\nStatus: {}\nMessage: {}\n'.format(
response.status_code, response.content))
LOG.info(response.content)
return response
@task
def get_result():
cmd = 'python3 ../../script/query_and_get.py {} {} 5'.\
format(CONF['upload_url'], CONF['upload_code'])
local(cmd)
def get_result(max_time_sec=180, interval_sec=1):
max_time_sec = int(max_time_sec)
interval_sec = int(interval_sec)
url = CONF['upload_url'] + '/query_and_get/' + CONF['upload_code']
elapsed = 0.0
response_dict = None
response = ''
start_time = time.time()
while elapsed <= max_time_sec:
rsp = requests.get(url)
response = rsp.content.decode()
LOG.debug('Response:\n\n%s\n', response)
if rsp.status_code == 200:
# Success
response_dict = json.loads(rsp.json(), object_pairs_hook=OrderedDict)
break
elif rsp.status_code == 202:
# Not ready
time.sleep(interval_sec)
elif rsp.status_code == 400:
# Failure
raise Exception(
"Failed to download the next config.\nStatus code: {}\nMessage: {}\n".format(
rsp.status_code, response))
else:
raise NotImplementedError(
"Unhandled status code: '{}'.\nMessage: {}".format(rsp.status_code, response))
elapsed = time.time() - start_time
if not response_dict:
assert elapsed > max_time_sec, \
'response={} but elapsed={:.1f}s <= max_time={:.1f}s'.format(
response, elapsed, max_time_sec)
raise Exception(
'Failed to download the next config in {}s: {} (elapsed: {:.1f}s)'.format(
max_time_sec, response, elapsed))
LOG.info('Downloaded the next config in %.0fs: %s', elapsed,
json.dumps(response_dict, indent=4))
return response_dict
@task
@ -184,16 +335,23 @@ def add_udf():
@task
def upload_batch():
cmd = 'python3 ./upload_batch.py {} {} {}/new_result/'.format(CONF['save_path'],
CONF['upload_code'],
CONF['upload_url'])
local(cmd)
def upload_batch(result_dir, sort=True):
sort = _parse_bool(sort)
results = glob.glob(os.path.join(result_dir, '*__summary.json'))
if sort:
results = sorted(results)
count = len(results)
LOG.info('Uploading %d samples from %s...', count, result_dir)
for i, result in enumerate(results):
prefix = os.path.basename(result).split('__')[0]
upload_result(result_dir=result_dir, prefix=prefix)
LOG.info('Uploaded result %d/%d: %s__*.json', i + 1, count, prefix)
@task
def dump_database():
db_file_path = '{}/{}.dump'.format(CONF['database_save_path'], CONF['database_name'])
db_file_path = os.path.join(CONF['database_save_path'], CONF['database_name'] + '.dump')
if os.path.exists(db_file_path):
LOG.info('%s already exists ! ', db_file_path)
return False
@ -235,21 +393,19 @@ def restore_database():
def _ready_to_start_oltpbench():
return (os.path.exists(CONF['controller_log']) and
'Output the process pid to'
in open(CONF['controller_log']).read())
return os.path.exists(CONF['controller_log']) and \
'Output the process pid to' in open(CONF['controller_log']).read()
def _ready_to_start_controller():
return (os.path.exists(CONF['oltpbench_log']) and
'Warmup complete, starting measurements'
in open(CONF['oltpbench_log']).read())
return os.path.exists(CONF['oltpbench_log']) and \
'Warmup complete, starting measurements' in open(CONF['oltpbench_log']).read()
def _ready_to_shut_down_controller():
pid_file_path = '../controller/pid.txt'
return (os.path.exists(pid_file_path) and os.path.exists(CONF['oltpbench_log']) and
'Output throughput samples into file' in open(CONF['oltpbench_log']).read())
pid_file_path = os.path.join(CONF['controller_home'], 'pid.txt')
return os.path.exists(pid_file_path) and os.path.exists(CONF['oltpbench_log']) and \
'Output throughput samples into file' in open(CONF['oltpbench_log']).read()
def clean_logs():
@ -264,8 +420,6 @@ def clean_logs():
@task
def lhs_samples(count=10):
if not os.path.exists(CONF['lhs_save_path']):
os.makedirs(CONF['lhs_save_path'])
cmd = 'python3 lhs.py {} {} {}'.format(count, CONF['lhs_knob_path'], CONF['lhs_save_path'])
local(cmd)
@ -284,7 +438,7 @@ def loop():
# check disk usage
if check_disk_usage() > MAX_DISK_USAGE:
LOG.WARN('Exceeds max disk usage %s', MAX_DISK_USAGE)
LOG.warning('Exceeds max disk usage %s', MAX_DISK_USAGE)
# run controller from another process
p = Process(target=run_controller, args=())
@ -293,19 +447,19 @@ def loop():
# run oltpbench as a background job
while not _ready_to_start_oltpbench():
pass
time.sleep(1)
run_oltpbench_bg()
LOG.info('Run OLTP-Bench')
# the controller starts the first collection
while not _ready_to_start_controller():
pass
time.sleep(1)
signal_controller()
LOG.info('Start the first collection')
# stop the experiment
while not _ready_to_shut_down_controller():
pass
time.sleep(1)
signal_controller()
LOG.info('Start the second collection, shut down the controller')
@ -352,7 +506,7 @@ def run_lhs():
# check disk usage
if check_disk_usage() > MAX_DISK_USAGE:
LOG.WARN('Exceeds max disk usage %s', MAX_DISK_USAGE)
LOG.warning('Exceeds max disk usage %s', MAX_DISK_USAGE)
# copy lhs-sampled config to the to-be-used config
cmd = 'cp {} next_config'.format(sample)

View File

@ -1,63 +0,0 @@
#
# OtterTune - upload_batch.py
#
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
#
import argparse
import glob
import logging
import os
import requests
# Logging
LOG = logging.getLogger(__name__)
LOG.addHandler(logging.StreamHandler())
LOG.setLevel(logging.INFO)
# Upload all files in the datadir to OtterTune's server side.
# You may want to upload your training data to the non-tuning session.
def upload_batch(datadir, upload_code, url):
samples = glob.glob(os.path.join(datadir, '*__summary.json'))
count = len(samples)
samples_prefix = []
LOG.info('Uploading %d samples in %s...', count, datadir)
for sample in samples:
prefix = sample.split('/')[-1].split('__')[0]
samples_prefix.append(prefix)
for i in range(count):
prefix = samples_prefix[i]
params = {
'summary': open(os.path.join(datadir, '{}__summary.json'.format(prefix)), 'rb'),
'knobs': open(os.path.join(datadir, '{}__knobs.json'.format(prefix)), 'rb'),
'metrics_before': open(os.path.join(datadir,
'{}__metrics_before.json'.format(prefix)), 'rb'),
'metrics_after': open(os.path.join(datadir,
'{}__metrics_after.json'.format(prefix)), 'rb'),
}
LOG.info('Upload %d-th sample %s__*.json', i + 1, prefix)
response = requests.post(url,
files=params,
data={'upload_code': upload_code})
LOG.info(response.content)
def main():
parser = argparse.ArgumentParser(description="Upload generated data to the website")
parser.add_argument('datadir', type=str, nargs=1,
help='Directory containing the generated data')
parser.add_argument('upload_code', type=str, nargs=1,
help='The website\'s upload code')
parser.add_argument('url', type=str, default='http://0.0.0.0:8000/new_result/',
nargs='?', help='The upload url: server_ip/new_result/')
args = parser.parse_args()
upload_batch(args.datadir[0], args.upload_code[0], args.url)
if __name__ == "__main__":
main()

View File

@ -414,17 +414,19 @@ def new_result(request):
if not form.is_valid():
LOG.warning("New result form is not valid: %s", str(form.errors))
return HttpResponse("New result form is not valid: " + str(form.errors))
return HttpResponse("New result form is not valid: " + str(form.errors), status=400)
upload_code = form.cleaned_data['upload_code']
try:
session = Session.objects.get(upload_code=upload_code)
except Session.DoesNotExist:
LOG.warning("Invalid upload code: %s", upload_code)
return HttpResponse("Invalid upload code: " + upload_code)
return HttpResponse("Invalid upload code: " + upload_code, status=400)
return handle_result_files(session, request.FILES)
LOG.warning("Request type was not POST")
return HttpResponse("Request type was not POST")
return HttpResponse("Request type was not POST", status=400)
def handle_result_files(session, files):
@ -958,22 +960,26 @@ def give_result(request, upload_code): # pylint: disable=unused-argument
session = Session.objects.get(upload_code=upload_code)
except Session.DoesNotExist:
LOG.warning("Invalid upload code: %s", upload_code)
return HttpResponse("Invalid upload code: " + upload_code)
return HttpResponse("Invalid upload code: " + upload_code, status=400)
results = Result.objects.filter(session=session)
lastest_result = results[len(results) - 1]
tasks = TaskUtil.get_tasks(lastest_result.task_ids)
overall_status, _ = TaskUtil.get_task_status(tasks)
overall_status, num_completed = TaskUtil.get_task_status(tasks)
if overall_status in ['PENDING', 'RECEIVED', 'STARTED']:
return HttpResponse("Result not ready")
# unclear behaviors for REVOKED and RETRY, treat as failure
elif overall_status in ['FAILURE', 'REVOKED', 'RETRY']:
return HttpResponse("Fail")
if overall_status == 'SUCCESS':
res = Result.objects.get(pk=lastest_result.pk)
response = HttpResponse(JSONUtil.dumps(res.next_configuration),
content_type='application/json')
elif overall_status in ('PENDING', 'RECEIVED', 'STARTED'):
response = HttpResponse("{}: Result not ready".format(overall_status), status=202)
else: # overall_status in ('FAILURE', 'REVOKED', 'RETRY'):
failed_task_idx = min(len(tasks) - 1, num_completed + 1)
failed_task = tasks[failed_task_idx]
response = HttpResponse(
"{}: {}".format(overall_status, failed_task.traceback), status=400)
# success
res = Result.objects.get(pk=lastest_result.pk)
return HttpResponse(JSONUtil.dumps(res.next_configuration), content_type='application/json')
return response
def train_ddpg_loops(request, session_id): # pylint: disable=unused-argument