From d1c13de42f21d231c789c799c7ecfb207ff47935 Mon Sep 17 00:00:00 2001 From: Dana Van Aken Date: Tue, 1 Oct 2019 01:17:49 -0400 Subject: [PATCH] Consolidated driver scripts and added status codes to the query_and_get and new_result responses --- client/driver/.gitignore | 4 + client/driver/ConfParser.py | 69 -------- client/driver/driver_config.json | 21 ++- client/driver/fabfile.py | 292 +++++++++++++++++++++++-------- client/driver/upload_batch.py | 63 ------- server/website/website/views.py | 32 ++-- 6 files changed, 256 insertions(+), 225 deletions(-) create mode 100644 client/driver/.gitignore delete mode 100644 client/driver/ConfParser.py delete mode 100644 client/driver/upload_batch.py diff --git a/client/driver/.gitignore b/client/driver/.gitignore new file mode 100644 index 0000000..42124e7 --- /dev/null +++ b/client/driver/.gitignore @@ -0,0 +1,4 @@ +dumpfiles +log +results +next_config diff --git a/client/driver/ConfParser.py b/client/driver/ConfParser.py deleted file mode 100644 index 48c43a6..0000000 --- a/client/driver/ConfParser.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# OtterTune - ConfParser.py -# -# Copyright (c) 2017-18, Carnegie Mellon University Database Group -# -''' -Created on Mar 23, 2018 -@author: Jacky, bohan, Dongsheng -''' - -import sys -import json -from collections import OrderedDict - - -def change_postgres_conf(recommendation, postgresqlconf): - lines = postgresqlconf.readlines() - settings_idx = lines.index("# Add settings for extensions here\n") - postgresqlconf.seek(0) - postgresqlconf.truncate(0) - - lines = lines[0:(settings_idx + 1)] - for line in lines: - postgresqlconf.write(line) - - for (knob_name, knob_value) in list(recommendation.items()): - postgresqlconf.write(str(knob_name) + " = " + str(knob_value) + "\n") - - -def change_oracle_conf(recommendation, oracle_conf): - lines = oracle_conf.readlines() - signal = "# configurations recommended by ottertune:\n" - if signal not in lines: - oracle_conf.write('\n' + signal) - oracle_conf.flush() - oracle_conf.seek(0) - lines = oracle_conf.readlines() - settings_idx = lines.index(signal) - - oracle_conf.seek(0) - oracle_conf.truncate(0) - - lines = lines[0:(settings_idx + 1)] - for line in lines: - oracle_conf.write(line) - - for (knob_name, knob_value) in list(recommendation.items()): - oracle_conf.write(str(knob_name) + " = " + str(knob_value).strip('B') + "\n") - - -def main(): - if len(sys.argv) != 4: - raise Exception("Usage: python [DB type] ConfParser.py [Next Config] [Current Config]") - database_type = sys.argv[1] - next_config_name = sys.argv[2] - cur_config_name = sys.argv[3] - with open(next_config_name, 'r') as next_config, open(cur_config_name, 'r+') as cur_config: - config = json.load(next_config, encoding="UTF-8", object_pairs_hook=OrderedDict) - recommendation = config['recommendation'] - if database_type == 'postgres': - change_postgres_conf(recommendation, cur_config) - elif database_type == 'oracle': - change_oracle_conf(recommendation, cur_config) - else: - raise Exception("Database Type {} Not Implemented !".format(database_type)) - - -if __name__ == "__main__": - main() diff --git a/client/driver/driver_config.json b/client/driver/driver_config.json index 838341f..f7ed573 100644 --- a/client/driver/driver_config.json +++ b/client/driver/driver_config.json @@ -3,19 +3,18 @@ "database_name" : "tpcc", "database_disk": "/dev/xvda1", "database_conf": "/etc/postgresql/9.6/main/postgresql.conf", - "database_save_path": "/home/ubuntu/ottertune", - "username" : "bohan", - "password" : "bohan", - "oltpbench_home": "/home/ubuntu/oltpbench", - "oltpbench_config": "/home/ubuntu/oltpbench/config/tpcc_config_postgres.xml", + "database_save_path": "~/ottertune/client/driver/dumpfiles", + "username" : "dbuser", + "password" : "dbpassword", + "oltpbench_home": "~/oltpbench", + "oltpbench_config": "~/oltpbench/config/tpcc_config_postgres.xml", "oltpbench_workload": "tpcc", - "oltpbench_log" : "/home/ubuntu/ottertune/client/driver/oltp.log", - "controller_config": "/home/ubuntu/ottertune/client/controller/config/sample_postgres_config.json", - "controller_log" : "/home/ubuntu/ottertune/client/driver/controller.log", - "save_path": "/home/ubuntu/results", + "controller_home": "~/ottertune/client/controller", + "log_path": "~/ottertune/client/driver/log", + "save_path": "~/ottertune/client/driver/results", "upload_url" : "http://127.0.0.1:8000", "upload_code" : "I5I10PXK3PK27FM86YYS", - "lhs_knob_path" : "/home/ubuntu/ottertune/client/driver/knobs/postgres-96.json", - "lhs_save_path" : "/home/ubuntu/ottertune/client/driver/configs", + "lhs_knob_path" : "~/ottertune/client/driver/knobs/postgres-96.json", + "lhs_save_path" : "~/ottertune/client/driver/configs", "oracle_awr_enabled": false } diff --git a/client/driver/fabfile.py b/client/driver/fabfile.py index 6ddb231..f095d63 100644 --- a/client/driver/fabfile.py +++ b/client/driver/fabfile.py @@ -8,27 +8,21 @@ Created on Mar 23, 2018 @author: bohan ''' -import sys +import glob import json -import logging -import time import os import re -import glob +import time +from collections import OrderedDict from multiprocessing import Process -from fabric.api import (env, local, task, lcd) + +import logging +from logging.handlers import RotatingFileHandler + +import requests +from fabric.api import env, local, task, lcd from fabric.state import output as fabric_output -LOG = logging.getLogger() -LOG.setLevel(logging.DEBUG) -Formatter = logging.Formatter( # pylint: disable=invalid-name - "%(asctime)s [%(levelname)s] %(message)s") - -# print the log -ConsoleHandler = logging.StreamHandler(sys.stdout) # pylint: disable=invalid-name -ConsoleHandler.setFormatter(Formatter) -LOG.addHandler(ConsoleHandler) - # Fabric environment settings env.hosts = ['localhost'] fabric_output.update({ @@ -41,20 +35,55 @@ RELOAD_INTERVAL = 10 # maximum disk usage MAX_DISK_USAGE = 90 -with open('driver_config.json', 'r') as f: - CONF = json.load(f) +# Load config +with open('driver_config.json', 'r') as _f: + CONF = {k: os.path.expanduser(v) if isinstance(v, str) and v.startswith('~') else v + for k, v in json.load(_f).items()} + +# Create output directories +for _dir in (CONF['database_save_path'], CONF['log_path'], CONF['save_path'], + CONF['lhs_save_path']): + os.makedirs(_dir, exist_ok=True) + +# Define paths +CONF['driver_log'] = os.path.join(CONF['log_path'], 'driver.log') +CONF['oltpbench_log'] = os.path.join(CONF['log_path'], 'oltpbench.log') +CONF['controller_log'] = os.path.join(CONF['log_path'], 'controller.log') +CONF['controller_config'] = os.path.join(CONF['controller_home'], 'config', + '{}_config.json'.format(CONF['database_type'])) + +# Configure logging +LOG = logging.getLogger(__name__) +LOG.setLevel(logging.DEBUG) +Formatter = logging.Formatter( # pylint: disable=invalid-name + fmt='%(asctime)s [%(funcName)s:%(lineno)03d] %(levelname)-5s: %(message)s', + datefmt='%m-%d-%Y %H:%M:%S') +ConsoleHandler = logging.StreamHandler() # pylint: disable=invalid-name +ConsoleHandler.setFormatter(Formatter) +LOG.addHandler(ConsoleHandler) +FileHandler = RotatingFileHandler( # pylint: disable=invalid-name + CONF['driver_log'], maxBytes=50000, backupCount=2) +FileHandler.setFormatter(Formatter) +LOG.addHandler(FileHandler) + + +def _parse_bool(value): + if not isinstance(value, bool): + value = str(value).lower() == 'true' + return value @task def check_disk_usage(): partition = CONF['database_disk'] disk_use = 0 - cmd = "df -h {}".format(partition) - out = local(cmd, capture=True).splitlines()[1] - m = re.search('\d+(?=%)', out) # pylint: disable=anomalous-backslash-in-string - if m: - disk_use = int(m.group(0)) - LOG.info("Current Disk Usage: %s%s", disk_use, '%') + if partition: + cmd = "df -h {}".format(partition) + out = local(cmd, capture=True).splitlines()[1] + m = re.search(r'\d+(?=%)', out) + if m: + disk_use = int(m.group(0)) + LOG.info("Current Disk Usage: %s%s", disk_use, '%') return disk_use @@ -64,6 +93,29 @@ def check_memory_usage(): local(cmd) +@task +def create_controller_config(): + if CONF['database_type'] == 'postgres': + dburl_fmt = 'jdbc:postgresql://localhost:5432/{db}'.format + elif CONF['database_type'] == 'oracle': + dburl_fmt = 'jdbc:oracle:thin:@localhost:1521:{db}'.format + else: + raise Exception("Database Type {} Not Implemented !".format(CONF['database_type'])) + + config = dict( + database_type=CONF['database_type'], + database_url=dburl_fmt(db=CONF['database_name']), + username=CONF['username'], + password=CONF['password'], + upload_code='DEPRECATED', + upload_url='DEPRECATED', + workload_name=CONF['oltpbench_workload'] + ) + + with open(CONF['controller_config'], 'w') as f: + json.dump(config, f, indent=2) + + @task def restart_database(): if CONF['database_type'] == 'postgres': @@ -96,11 +148,45 @@ def create_database(): @task -def change_conf(): - next_conf = 'next_config' - cmd = "sudo python3 ConfParser.py {} {} {}".\ - format(CONF['database_type'], next_conf, CONF['database_conf']) - local(cmd) +def reset_conf(): + change_conf(next_conf='') + + +@task +def change_conf(next_conf='next_config'): + signal = "# configurations recommended by ottertune:\n" + next_conf = next_conf or {} + + with open(CONF['database_conf'], 'r') as f: + lines = f.readlines() + + if signal not in lines: + lines += ['\n', signal] + + signal_idx = lines.index(signal) + lines = lines[0:signal_idx + 1] + + if isinstance(next_conf, str): + with open(next_conf, 'r') as f: + recommendation = json.load( + f, encoding="UTF-8", object_pairs_hook=OrderedDict)['recommendation'] + else: + recommendation = next_conf + + assert isinstance(recommendation, dict) + + for name, value in recommendation.items(): + if CONF['database_type'] == 'oracle' and isinstance(value, str): + value = value.strip('B') + lines.append('{} = {}\n'.format(name, value)) + lines.append('\n') + + tmpconf = 'tmp_' + os.path.basename(CONF['database_conf']) + with open(tmpconf, 'w') as f: + f.write(''.join(lines)) + + local('sudo cp {0} {0}.ottertune.bak'.format(CONF['database_conf'])) + local('sudo mv {} {}'.format(tmpconf, CONF['database_conf'])) @task @@ -129,31 +215,31 @@ def run_oltpbench_bg(): @task def run_controller(): - cmd = 'sudo gradle run -PappArgs="-c {} -d output/" --no-daemon > {}'.\ + if not os.path.exists(CONF['controller_config']): + create_controller_config() + cmd = 'gradle run -PappArgs="-c {} -d output/" --no-daemon > {}'.\ format(CONF['controller_config'], CONF['controller_log']) - with lcd("../controller"): # pylint: disable=not-context-manager + with lcd(CONF['controller_home']): # pylint: disable=not-context-manager local(cmd) @task def signal_controller(): - pid = int(open('../controller/pid.txt').read()) + pidfile = os.path.join(CONF['controller_home'], 'pid.txt') + pid = int(open(pidfile).read()) cmd = 'sudo kill -2 {}'.format(pid) - with lcd("../controller"): # pylint: disable=not-context-manager + with lcd(CONF['controller_home']): # pylint: disable=not-context-manager local(cmd) @task def save_dbms_result(): - if not os.path.exists(CONF['save_path']): - os.makedirs(CONF['save_path']) t = int(time.time()) files = ['knobs.json', 'metrics_after.json', 'metrics_before.json', 'summary.json'] for f_ in files: - f_prefix = f_.split('.')[0] - cmd = 'cp ../controller/output/{} {}/{}__{}.json'.\ - format(f_, CONF['save_path'], t, f_prefix) - local(cmd) + srcfile = os.path.join(CONF['controller_home'], 'output', f_) + dstfile = os.path.join(CONF['save_path'], '{}__{}'.format(t, f_)) + local('cp {} {}'.format(srcfile, dstfile)) @task @@ -163,18 +249,83 @@ def free_cache(): @task -def upload_result(): - cmd = 'python3 ../../server/website/script/upload/upload.py \ - ../controller/output/ {} {}/new_result/'.format(CONF['upload_code'], - CONF['upload_url']) - local(cmd) +def upload_result(result_dir=None, prefix=None): + result_dir = result_dir or os.path.join(CONF['controller_home'], 'output') + prefix = prefix or '' + + files = {} + for base in ('summary', 'knobs', 'metrics_before', 'metrics_after'): + fpath = os.path.join(result_dir, prefix + base + '.json') + + # Replaces the true db version with the specified version to allow for + # testing versions not officially supported by OtterTune + if base == 'summary' and 'override_database_version' in CONF and \ + CONF['override_database_version']: + with open(fpath, 'r') as f: + summary = json.load(f) + summary['real_database_version'] = summary['database_version'] + summary['database_version'] = CONF['override_database_version'] + with open(fpath, 'w') as f: + json.dump(summary, f, indent=1) + + files[base] = open(fpath, 'rb') + + response = requests.post(CONF['upload_url'] + '/new_result/', files=files, + data={'upload_code': CONF['upload_code']}) + if response.status_code != 200: + raise Exception('Error uploading result.\nStatus: {}\nMessage: {}\n'.format( + response.status_code, response.content)) + + LOG.info(response.content) + + return response @task -def get_result(): - cmd = 'python3 ../../script/query_and_get.py {} {} 5'.\ - format(CONF['upload_url'], CONF['upload_code']) - local(cmd) +def get_result(max_time_sec=180, interval_sec=1): + max_time_sec = int(max_time_sec) + interval_sec = int(interval_sec) + url = CONF['upload_url'] + '/query_and_get/' + CONF['upload_code'] + elapsed = 0.0 + response_dict = None + response = '' + start_time = time.time() + + while elapsed <= max_time_sec: + rsp = requests.get(url) + response = rsp.content.decode() + LOG.debug('Response:\n\n%s\n', response) + + if rsp.status_code == 200: + # Success + response_dict = json.loads(rsp.json(), object_pairs_hook=OrderedDict) + break + elif rsp.status_code == 202: + # Not ready + time.sleep(interval_sec) + elif rsp.status_code == 400: + # Failure + raise Exception( + "Failed to download the next config.\nStatus code: {}\nMessage: {}\n".format( + rsp.status_code, response)) + else: + raise NotImplementedError( + "Unhandled status code: '{}'.\nMessage: {}".format(rsp.status_code, response)) + + elapsed = time.time() - start_time + + if not response_dict: + assert elapsed > max_time_sec, \ + 'response={} but elapsed={:.1f}s <= max_time={:.1f}s'.format( + response, elapsed, max_time_sec) + raise Exception( + 'Failed to download the next config in {}s: {} (elapsed: {:.1f}s)'.format( + max_time_sec, response, elapsed)) + + LOG.info('Downloaded the next config in %.0fs: %s', elapsed, + json.dumps(response_dict, indent=4)) + + return response_dict @task @@ -184,16 +335,23 @@ def add_udf(): @task -def upload_batch(): - cmd = 'python3 ./upload_batch.py {} {} {}/new_result/'.format(CONF['save_path'], - CONF['upload_code'], - CONF['upload_url']) - local(cmd) +def upload_batch(result_dir, sort=True): + sort = _parse_bool(sort) + results = glob.glob(os.path.join(result_dir, '*__summary.json')) + if sort: + results = sorted(results) + count = len(results) + + LOG.info('Uploading %d samples from %s...', count, result_dir) + for i, result in enumerate(results): + prefix = os.path.basename(result).split('__')[0] + upload_result(result_dir=result_dir, prefix=prefix) + LOG.info('Uploaded result %d/%d: %s__*.json', i + 1, count, prefix) @task def dump_database(): - db_file_path = '{}/{}.dump'.format(CONF['database_save_path'], CONF['database_name']) + db_file_path = os.path.join(CONF['database_save_path'], CONF['database_name'] + '.dump') if os.path.exists(db_file_path): LOG.info('%s already exists ! ', db_file_path) return False @@ -235,21 +393,19 @@ def restore_database(): def _ready_to_start_oltpbench(): - return (os.path.exists(CONF['controller_log']) and - 'Output the process pid to' - in open(CONF['controller_log']).read()) + return os.path.exists(CONF['controller_log']) and \ + 'Output the process pid to' in open(CONF['controller_log']).read() def _ready_to_start_controller(): - return (os.path.exists(CONF['oltpbench_log']) and - 'Warmup complete, starting measurements' - in open(CONF['oltpbench_log']).read()) + return os.path.exists(CONF['oltpbench_log']) and \ + 'Warmup complete, starting measurements' in open(CONF['oltpbench_log']).read() def _ready_to_shut_down_controller(): - pid_file_path = '../controller/pid.txt' - return (os.path.exists(pid_file_path) and os.path.exists(CONF['oltpbench_log']) and - 'Output throughput samples into file' in open(CONF['oltpbench_log']).read()) + pid_file_path = os.path.join(CONF['controller_home'], 'pid.txt') + return os.path.exists(pid_file_path) and os.path.exists(CONF['oltpbench_log']) and \ + 'Output throughput samples into file' in open(CONF['oltpbench_log']).read() def clean_logs(): @@ -264,8 +420,6 @@ def clean_logs(): @task def lhs_samples(count=10): - if not os.path.exists(CONF['lhs_save_path']): - os.makedirs(CONF['lhs_save_path']) cmd = 'python3 lhs.py {} {} {}'.format(count, CONF['lhs_knob_path'], CONF['lhs_save_path']) local(cmd) @@ -284,7 +438,7 @@ def loop(): # check disk usage if check_disk_usage() > MAX_DISK_USAGE: - LOG.WARN('Exceeds max disk usage %s', MAX_DISK_USAGE) + LOG.warning('Exceeds max disk usage %s', MAX_DISK_USAGE) # run controller from another process p = Process(target=run_controller, args=()) @@ -293,19 +447,19 @@ def loop(): # run oltpbench as a background job while not _ready_to_start_oltpbench(): - pass + time.sleep(1) run_oltpbench_bg() LOG.info('Run OLTP-Bench') # the controller starts the first collection while not _ready_to_start_controller(): - pass + time.sleep(1) signal_controller() LOG.info('Start the first collection') # stop the experiment while not _ready_to_shut_down_controller(): - pass + time.sleep(1) signal_controller() LOG.info('Start the second collection, shut down the controller') @@ -352,7 +506,7 @@ def run_lhs(): # check disk usage if check_disk_usage() > MAX_DISK_USAGE: - LOG.WARN('Exceeds max disk usage %s', MAX_DISK_USAGE) + LOG.warning('Exceeds max disk usage %s', MAX_DISK_USAGE) # copy lhs-sampled config to the to-be-used config cmd = 'cp {} next_config'.format(sample) diff --git a/client/driver/upload_batch.py b/client/driver/upload_batch.py deleted file mode 100644 index 972ceff..0000000 --- a/client/driver/upload_batch.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# OtterTune - upload_batch.py -# -# Copyright (c) 2017-18, Carnegie Mellon University Database Group -# -import argparse -import glob -import logging -import os -import requests - - -# Logging -LOG = logging.getLogger(__name__) -LOG.addHandler(logging.StreamHandler()) -LOG.setLevel(logging.INFO) - - -# Upload all files in the datadir to OtterTune's server side. -# You may want to upload your training data to the non-tuning session. -def upload_batch(datadir, upload_code, url): - - samples = glob.glob(os.path.join(datadir, '*__summary.json')) - count = len(samples) - samples_prefix = [] - - LOG.info('Uploading %d samples in %s...', count, datadir) - for sample in samples: - prefix = sample.split('/')[-1].split('__')[0] - samples_prefix.append(prefix) - - for i in range(count): - prefix = samples_prefix[i] - params = { - 'summary': open(os.path.join(datadir, '{}__summary.json'.format(prefix)), 'rb'), - 'knobs': open(os.path.join(datadir, '{}__knobs.json'.format(prefix)), 'rb'), - 'metrics_before': open(os.path.join(datadir, - '{}__metrics_before.json'.format(prefix)), 'rb'), - 'metrics_after': open(os.path.join(datadir, - '{}__metrics_after.json'.format(prefix)), 'rb'), - } - - LOG.info('Upload %d-th sample %s__*.json', i + 1, prefix) - response = requests.post(url, - files=params, - data={'upload_code': upload_code}) - LOG.info(response.content) - - -def main(): - parser = argparse.ArgumentParser(description="Upload generated data to the website") - parser.add_argument('datadir', type=str, nargs=1, - help='Directory containing the generated data') - parser.add_argument('upload_code', type=str, nargs=1, - help='The website\'s upload code') - parser.add_argument('url', type=str, default='http://0.0.0.0:8000/new_result/', - nargs='?', help='The upload url: server_ip/new_result/') - args = parser.parse_args() - upload_batch(args.datadir[0], args.upload_code[0], args.url) - - -if __name__ == "__main__": - main() diff --git a/server/website/website/views.py b/server/website/website/views.py index a29fb4a..104a5bf 100644 --- a/server/website/website/views.py +++ b/server/website/website/views.py @@ -414,17 +414,19 @@ def new_result(request): if not form.is_valid(): LOG.warning("New result form is not valid: %s", str(form.errors)) - return HttpResponse("New result form is not valid: " + str(form.errors)) + return HttpResponse("New result form is not valid: " + str(form.errors), status=400) + upload_code = form.cleaned_data['upload_code'] try: session = Session.objects.get(upload_code=upload_code) except Session.DoesNotExist: LOG.warning("Invalid upload code: %s", upload_code) - return HttpResponse("Invalid upload code: " + upload_code) + return HttpResponse("Invalid upload code: " + upload_code, status=400) return handle_result_files(session, request.FILES) + LOG.warning("Request type was not POST") - return HttpResponse("Request type was not POST") + return HttpResponse("Request type was not POST", status=400) def handle_result_files(session, files): @@ -958,22 +960,26 @@ def give_result(request, upload_code): # pylint: disable=unused-argument session = Session.objects.get(upload_code=upload_code) except Session.DoesNotExist: LOG.warning("Invalid upload code: %s", upload_code) - return HttpResponse("Invalid upload code: " + upload_code) + return HttpResponse("Invalid upload code: " + upload_code, status=400) results = Result.objects.filter(session=session) lastest_result = results[len(results) - 1] tasks = TaskUtil.get_tasks(lastest_result.task_ids) - overall_status, _ = TaskUtil.get_task_status(tasks) + overall_status, num_completed = TaskUtil.get_task_status(tasks) - if overall_status in ['PENDING', 'RECEIVED', 'STARTED']: - return HttpResponse("Result not ready") - # unclear behaviors for REVOKED and RETRY, treat as failure - elif overall_status in ['FAILURE', 'REVOKED', 'RETRY']: - return HttpResponse("Fail") + if overall_status == 'SUCCESS': + res = Result.objects.get(pk=lastest_result.pk) + response = HttpResponse(JSONUtil.dumps(res.next_configuration), + content_type='application/json') + elif overall_status in ('PENDING', 'RECEIVED', 'STARTED'): + response = HttpResponse("{}: Result not ready".format(overall_status), status=202) + else: # overall_status in ('FAILURE', 'REVOKED', 'RETRY'): + failed_task_idx = min(len(tasks) - 1, num_completed + 1) + failed_task = tasks[failed_task_idx] + response = HttpResponse( + "{}: {}".format(overall_status, failed_task.traceback), status=400) - # success - res = Result.objects.get(pk=lastest_result.pk) - return HttpResponse(JSONUtil.dumps(res.next_configuration), content_type='application/json') + return response def train_ddpg_loops(request, session_id): # pylint: disable=unused-argument