ottertune/client/driver/fabfile.py

1220 lines
44 KiB
Python
Raw Permalink Normal View History

2019-08-23 08:47:19 -07:00
#
# OtterTune - fabfile.py
#
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
#
'''
Created on Mar 23, 2018
@author: bohan
'''
import glob
2019-08-23 08:47:19 -07:00
import json
2019-09-13 14:55:55 -07:00
import os
2019-08-23 08:47:19 -07:00
import re
import time
from collections import OrderedDict
2019-08-23 08:47:19 -07:00
from multiprocessing import Process
import logging
from logging.handlers import RotatingFileHandler
2019-08-23 08:47:19 -07:00
import requests
from fabric.api import env, lcd, local, settings, show, task, hide
from fabric.state import output as fabric_output
2019-08-23 08:47:19 -07:00
from utils import (file_exists, get, get_content, load_driver_conf, parse_bool,
put, run, run_sql_script, sudo, FabricException)
# Loads the driver config file (defaults to driver_config.py)
2019-11-08 03:42:32 -08:00
dconf = load_driver_conf() # pylint: disable=invalid-name
# Fabric settings
fabric_output.update({
'running': True,
'stdout': True,
})
env.abort_exception = FabricException
env.hosts = [dconf.LOGIN]
2020-04-23 00:46:58 -07:00
env.password = dconf.LOGIN_PASSWORD
# Create local directories
for _d in (dconf.RESULT_DIR, dconf.LOG_DIR, dconf.TEMP_DIR):
os.makedirs(_d, exist_ok=True)
2019-08-23 08:47:19 -07:00
# Configure logging
LOG = logging.getLogger(__name__)
LOG.setLevel(getattr(logging, dconf.LOG_LEVEL, logging.DEBUG))
Formatter = logging.Formatter( # pylint: disable=invalid-name
fmt='%(asctime)s [%(funcName)s:%(lineno)03d] %(levelname)-5s: %(message)s',
datefmt='%m-%d-%Y %H:%M:%S')
ConsoleHandler = logging.StreamHandler() # pylint: disable=invalid-name
ConsoleHandler.setFormatter(Formatter)
LOG.addHandler(ConsoleHandler)
FileHandler = RotatingFileHandler( # pylint: disable=invalid-name
dconf.DRIVER_LOG, maxBytes=50000, backupCount=2)
FileHandler.setFormatter(Formatter)
LOG.addHandler(FileHandler)
2019-08-23 08:47:19 -07:00
@task
def check_disk_usage():
partition = dconf.DATABASE_DISK
2019-08-23 08:47:19 -07:00
disk_use = 0
if partition:
cmd = "df -h {}".format(partition)
out = run(cmd).splitlines()[1]
m = re.search(r'\d+(?=%)', out)
if m:
disk_use = int(m.group(0))
LOG.info("Current Disk Usage: %s%s", disk_use, '%')
2019-08-23 08:47:19 -07:00
return disk_use
@task
def check_memory_usage():
run('free -m -h')
2019-08-23 08:47:19 -07:00
@task
def create_controller_config():
if dconf.DB_TYPE == 'postgres':
dburl_fmt = 'jdbc:postgresql://{host}:{port}/{db}'.format
elif dconf.DB_TYPE == 'oracle':
dburl_fmt = 'jdbc:oracle:thin:@{host}:{port}:{db}'.format
2020-04-23 00:46:58 -07:00
elif dconf.DB_TYPE == 'mysql':
if dconf.DB_VERSION in ['5.6', '5.7']:
dburl_fmt = 'jdbc:mysql://{host}:{port}/{db}?useSSL=false'.format
elif dconf.DB_VERSION == '8.0':
2020-05-21 22:07:22 -07:00
dburl_fmt = ('jdbc:mysql://{host}:{port}/{db}?'
'allowPublicKeyRetrieval=true&useSSL=false').format
else:
raise Exception("MySQL Database Version {} Not Implemented !".format(dconf.DB_VERSION))
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
config = dict(
database_type=dconf.DB_TYPE,
database_url=dburl_fmt(host=dconf.DB_HOST, port=dconf.DB_PORT, db=dconf.DB_NAME),
username=dconf.DB_USER,
password=dconf.DB_PASSWORD,
upload_code='DEPRECATED',
upload_url='DEPRECATED',
workload_name=dconf.OLTPBENCH_BENCH
)
with open(dconf.CONTROLLER_CONFIG, 'w') as f:
json.dump(config, f, indent=2)
2019-08-23 08:47:19 -07:00
@task
def restart_database():
if dconf.DB_TYPE == 'postgres':
if dconf.HOST_CONN == 'docker':
# Restarting the docker container here is the cleanest way to do it
# becaues there's no init system running and the only process running
# in the container is postgres itself
local('docker restart {}'.format(dconf.CONTAINER_NAME))
2020-04-23 00:46:58 -07:00
elif dconf.HOST_CONN == 'remote_docker':
run('docker restart {}'.format(dconf.CONTAINER_NAME), remote_only=True)
else:
2019-11-08 03:42:32 -08:00
sudo('pg_ctl -D {} -w -t 600 restart -m fast'.format(
2019-11-18 08:23:15 -08:00
dconf.PG_DATADIR), user=dconf.ADMIN_USER, capture=False)
2020-04-23 00:46:58 -07:00
elif dconf.DB_TYPE == 'mysql':
if dconf.HOST_CONN == 'docker':
local('docker restart {}'.format(dconf.CONTAINER_NAME))
elif dconf.HOST_CONN == 'remote_docker':
run('docker restart {}'.format(dconf.CONTAINER_NAME), remote_only=True)
else:
sudo('service mysql restart')
elif dconf.DB_TYPE == 'oracle':
db_log_path = os.path.join(os.path.split(dconf.DB_CONF)[0], 'startup.log')
local_log_path = os.path.join(dconf.LOG_DIR, 'startup.log')
local_logs_path = os.path.join(dconf.LOG_DIR, 'startups.log')
run_sql_script('restartOracle.sh', db_log_path)
get(db_log_path, local_log_path)
with open(local_log_path, 'r') as fin, open(local_logs_path, 'a') as fout:
lines = fin.readlines()
for line in lines:
if line.startswith('ORACLE instance started.'):
return True
if not line.startswith('SQL>'):
fout.write(line)
fout.write('\n')
return False
2019-08-23 08:47:19 -07:00
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
2019-12-03 13:25:32 -08:00
return True
2019-08-23 08:47:19 -07:00
@task
def drop_database():
if dconf.DB_TYPE == 'postgres':
run("PGPASSWORD={} dropdb -e --if-exists {} -U {} -h {}".format(
dconf.DB_PASSWORD, dconf.DB_NAME, dconf.DB_USER, dconf.DB_HOST))
2020-04-23 00:46:58 -07:00
elif dconf.DB_TYPE == 'mysql':
run("mysql --user={} --password={} -e 'drop database if exists {}'".format(
dconf.DB_USER, dconf.DB_PASSWORD, dconf.DB_NAME))
2019-08-23 08:47:19 -07:00
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
2019-08-23 08:47:19 -07:00
@task
def create_database():
if dconf.DB_TYPE == 'postgres':
run("PGPASSWORD={} createdb -e {} -U {} -h {}".format(
dconf.DB_PASSWORD, dconf.DB_NAME, dconf.DB_USER, dconf.DB_HOST))
2020-04-23 00:46:58 -07:00
elif dconf.DB_TYPE == 'mysql':
run("mysql --user={} --password={} -e 'create database {}'".format(
dconf.DB_USER, dconf.DB_PASSWORD, dconf.DB_NAME))
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
@task
def create_user():
if dconf.DB_TYPE == 'postgres':
sql = "CREATE USER {} SUPERUSER PASSWORD '{}';".format(dconf.DB_USER, dconf.DB_PASSWORD)
run("PGPASSWORD={} psql -c \\\"{}\\\" -U postgres -h {}".format(
2019-11-08 03:42:32 -08:00
dconf.DB_PASSWORD, sql, dconf.DB_HOST))
elif dconf.DB_TYPE == 'oracle':
run_sql_script('createUser.sh', dconf.DB_USER, dconf.DB_PASSWORD)
2019-08-23 08:47:19 -07:00
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
2019-08-23 08:47:19 -07:00
@task
def drop_user():
if dconf.DB_TYPE == 'postgres':
sql = "DROP USER IF EXISTS {};".format(dconf.DB_USER)
run("PGPASSWORD={} psql -c \\\"{}\\\" -U postgres -h {}".format(
dconf.DB_PASSWORD, sql, dconf.DB_HOST))
elif dconf.DB_TYPE == 'oracle':
run_sql_script('dropUser.sh', dconf.DB_USER)
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
2019-08-23 08:47:19 -07:00
@task
2020-04-23 00:46:58 -07:00
def reset_conf(always=True):
if always:
change_conf()
return
2020-04-23 00:46:58 -07:00
# reset the config only if it has not been changed by Ottertune,
# i.e. OtterTune signal line is not in the config file.
signal = "# configurations recommended by ottertune:\n"
tmp_conf_in = os.path.join(dconf.TEMP_DIR, os.path.basename(dconf.DB_CONF) + '.in')
get(dconf.DB_CONF, tmp_conf_in)
with open(tmp_conf_in, 'r') as f:
lines = f.readlines()
if signal not in lines:
change_conf()
@task
def change_conf(next_conf=None):
signal = "# configurations recommended by ottertune:\n"
next_conf = next_conf or {}
tmp_conf_in = os.path.join(dconf.TEMP_DIR, os.path.basename(dconf.DB_CONF) + '.in')
get(dconf.DB_CONF, tmp_conf_in)
with open(tmp_conf_in, 'r') as f:
lines = f.readlines()
if signal not in lines:
lines += ['\n', signal]
signal_idx = lines.index(signal)
lines = lines[0:signal_idx + 1]
2020-04-23 00:46:58 -07:00
if dconf.DB_TYPE == 'mysql':
lines.append('[mysqld]\n')
if dconf.BASE_DB_CONF:
assert isinstance(dconf.BASE_DB_CONF, dict), \
(type(dconf.BASE_DB_CONF), dconf.BASE_DB_CONF)
2020-04-23 00:46:58 -07:00
for name, value in sorted(dconf.BASE_DB_CONF.items()):
2020-05-21 12:21:46 -07:00
if value is None:
lines.append('{}\n'.format(name))
else:
lines.append('{} = {}\n'.format(name, value))
if isinstance(next_conf, str):
with open(next_conf, 'r') as f:
recommendation = json.load(
f, encoding="UTF-8", object_pairs_hook=OrderedDict)['recommendation']
else:
recommendation = next_conf
assert isinstance(recommendation, dict)
for name, value in recommendation.items():
if dconf.DB_TYPE == 'oracle' and isinstance(value, str):
value = value.strip('B')
2020-04-23 00:46:58 -07:00
# If innodb_flush_method is set to NULL on a Unix-like system,
# the fsync option is used by default.
if name == 'innodb_flush_method' and value == '':
value = "fsync"
lines.append('{} = {}\n'.format(name, value))
lines.append('\n')
tmp_conf_out = os.path.join(dconf.TEMP_DIR, os.path.basename(dconf.DB_CONF) + '.out')
with open(tmp_conf_out, 'w') as f:
f.write(''.join(lines))
2020-05-03 23:24:33 -07:00
sudo('cp {0} {0}.ottertune.bak'.format(dconf.DB_CONF), remote_only=True)
2020-05-21 12:21:46 -07:00
put(tmp_conf_out, dconf.DB_CONF, use_sudo=True)
local('rm -f {} {}'.format(tmp_conf_in, tmp_conf_out))
2019-08-23 08:47:19 -07:00
@task
def load_oltpbench():
2020-04-23 00:46:58 -07:00
if os.path.exists(dconf.OLTPBENCH_CONFIG) is False:
msg = 'oltpbench config {} does not exist, '.format(dconf.OLTPBENCH_CONFIG)
msg += 'please double check the option in driver_config.py'
raise Exception(msg)
2020-05-23 16:24:14 -07:00
set_oltpbench_config()
2019-08-23 08:47:19 -07:00
cmd = "./oltpbenchmark -b {} -c {} --create=true --load=true".\
format(dconf.OLTPBENCH_BENCH, dconf.OLTPBENCH_CONFIG)
with lcd(dconf.OLTPBENCH_HOME): # pylint: disable=not-context-manager
2019-08-23 08:47:19 -07:00
local(cmd)
@task
def run_oltpbench():
2020-04-23 00:46:58 -07:00
if os.path.exists(dconf.OLTPBENCH_CONFIG) is False:
msg = 'oltpbench config {} does not exist, '.format(dconf.OLTPBENCH_CONFIG)
msg += 'please double check the option in driver_config.py'
raise Exception(msg)
2020-05-23 16:24:14 -07:00
set_oltpbench_config()
2019-08-23 08:47:19 -07:00
cmd = "./oltpbenchmark -b {} -c {} --execute=true -s 5 -o outputfile".\
format(dconf.OLTPBENCH_BENCH, dconf.OLTPBENCH_CONFIG)
with lcd(dconf.OLTPBENCH_HOME): # pylint: disable=not-context-manager
2019-08-23 08:47:19 -07:00
local(cmd)
@task
def run_oltpbench_bg():
2020-04-23 00:46:58 -07:00
if os.path.exists(dconf.OLTPBENCH_CONFIG) is False:
msg = 'oltpbench config {} does not exist, '.format(dconf.OLTPBENCH_CONFIG)
msg += 'please double check the option in driver_config.py'
raise Exception(msg)
2020-05-23 16:24:14 -07:00
# set oltpbench config, including db username, password, url
set_oltpbench_config()
2019-08-23 08:47:19 -07:00
cmd = "./oltpbenchmark -b {} -c {} --execute=true -s 5 -o outputfile > {} 2>&1 &".\
format(dconf.OLTPBENCH_BENCH, dconf.OLTPBENCH_CONFIG, dconf.OLTPBENCH_LOG)
with lcd(dconf.OLTPBENCH_HOME): # pylint: disable=not-context-manager
2019-08-23 08:47:19 -07:00
local(cmd)
@task
2020-05-28 21:17:35 -07:00
def run_controller(interval_sec=-1):
2020-04-23 00:46:58 -07:00
LOG.info('Controller config path: %s', dconf.CONTROLLER_CONFIG)
create_controller_config()
2020-05-28 21:17:35 -07:00
cmd = 'gradle run -PappArgs="-c {} -t {} -d output/" --no-daemon > {}'.\
format(dconf.CONTROLLER_CONFIG, interval_sec, dconf.CONTROLLER_LOG)
with lcd(dconf.CONTROLLER_HOME): # pylint: disable=not-context-manager
2019-08-23 08:47:19 -07:00
local(cmd)
@task
def signal_controller():
pidfile = os.path.join(dconf.CONTROLLER_HOME, 'pid.txt')
with open(pidfile, 'r') as f:
pid = int(f.read())
cmd = 'kill -2 {}'.format(pid)
with lcd(dconf.CONTROLLER_HOME): # pylint: disable=not-context-manager
2019-08-23 08:47:19 -07:00
local(cmd)
@task
def save_dbms_result():
t = int(time.time())
files = ['knobs.json', 'metrics_after.json', 'metrics_before.json', 'summary.json']
2020-04-27 17:14:18 -07:00
if dconf.ENABLE_UDM:
files.append('user_defined_metrics.json')
2019-08-23 08:47:19 -07:00
for f_ in files:
srcfile = os.path.join(dconf.CONTROLLER_HOME, 'output', f_)
dstfile = os.path.join(dconf.RESULT_DIR, '{}__{}'.format(t, f_))
local('cp {} {}'.format(srcfile, dstfile))
return t
@task
def save_next_config(next_config, t=None):
if not t:
t = int(time.time())
with open(os.path.join(dconf.RESULT_DIR, '{}__next_config.json'.format(t)), 'w') as f:
json.dump(next_config, f, indent=2)
return t
2019-08-23 08:47:19 -07:00
@task
def free_cache():
2020-04-23 00:46:58 -07:00
if dconf.HOST_CONN not in ['docker', 'remote_docker']:
2019-11-08 08:08:06 -08:00
with show('everything'), settings(warn_only=True): # pylint: disable=not-context-manager
2020-04-29 00:34:29 -07:00
res = sudo("sh -c \"echo 3 > /proc/sys/vm/drop_caches\"")
if res.failed:
LOG.error('%s (return code %s)', res.stderr.strip(), res.return_code)
2020-04-29 00:34:29 -07:00
else:
res = sudo("sh -c \"echo 3 > /proc/sys/vm/drop_caches\"", remote_only=True)
2019-08-23 08:47:19 -07:00
@task
2019-10-23 19:46:33 -07:00
def upload_result(result_dir=None, prefix=None, upload_code=None):
result_dir = result_dir or os.path.join(dconf.CONTROLLER_HOME, 'output')
prefix = prefix or ''
upload_code = upload_code or dconf.UPLOAD_CODE
files = {}
2020-04-27 17:14:18 -07:00
bases = ['summary', 'knobs', 'metrics_before', 'metrics_after']
if dconf.ENABLE_UDM:
bases.append('user_defined_metrics')
for base in bases:
fpath = os.path.join(result_dir, prefix + base + '.json')
# Replaces the true db version with the specified version to allow for
# testing versions not officially supported by OtterTune
if base == 'summary' and dconf.OVERRIDE_DB_VERSION:
with open(fpath, 'r') as f:
summary = json.load(f)
summary['real_database_version'] = summary['database_version']
summary['database_version'] = dconf.OVERRIDE_DB_VERSION
with open(fpath, 'w') as f:
json.dump(summary, f, indent=1)
files[base] = open(fpath, 'rb')
response = requests.post(dconf.WEBSITE_URL + '/new_result/', files=files,
2019-10-23 19:46:33 -07:00
data={'upload_code': upload_code})
if response.status_code != 200:
raise Exception('Error uploading result.\nStatus: {}\nMessage: {}\n'.format(
response.status_code, get_content(response)))
for f in files.values(): # pylint: disable=not-an-iterable
f.close()
LOG.info(get_content(response))
return response
2019-08-23 08:47:19 -07:00
@task
2019-10-23 19:46:33 -07:00
def get_result(max_time_sec=180, interval_sec=5, upload_code=None):
max_time_sec = int(max_time_sec)
interval_sec = int(interval_sec)
upload_code = upload_code or dconf.UPLOAD_CODE
url = dconf.WEBSITE_URL + '/query_and_get/' + upload_code
elapsed = 0
response_dict = None
rout = ''
while elapsed <= max_time_sec:
rsp = requests.get(url)
response = get_content(rsp)
assert response != 'null'
rout = json.dumps(response, indent=4) if isinstance(response, dict) else response
LOG.debug('%s\n\n[status code: %d, type(response): %s, elapsed: %ds, %s]', rout,
rsp.status_code, type(response), elapsed,
', '.join(['{}: {}'.format(k, v) for k, v in rsp.headers.items()]))
if rsp.status_code == 200:
# Success
response_dict = response
break
elif rsp.status_code == 202:
# Not ready
time.sleep(interval_sec)
elapsed += interval_sec
elif rsp.status_code == 400:
# Failure
raise Exception(
"Failed to download the next config.\nStatus code: {}\nMessage: {}\n".format(
rsp.status_code, rout))
elif rsp.status_code == 500:
# Failure
msg = rout
if isinstance(response, str):
savepath = os.path.join(dconf.LOG_DIR, 'error.html')
with open(savepath, 'w') as f:
f.write(response)
msg = "Saved HTML error to '{}'.".format(os.path.relpath(savepath))
raise Exception(
"Failed to download the next config.\nStatus code: {}\nMessage: {}\n".format(
rsp.status_code, msg))
else:
raise NotImplementedError(
"Unhandled status code: '{}'.\nMessage: {}".format(rsp.status_code, rout))
if not response_dict:
assert elapsed > max_time_sec, \
'response={} but elapsed={}s <= max_time={}s'.format(
rout, elapsed, max_time_sec)
raise Exception(
'Failed to download the next config in {}s: {} (elapsed: {}s)'.format(
max_time_sec, rout, elapsed))
LOG.info('Downloaded the next config in %ds', elapsed)
return response_dict
2019-08-23 08:47:19 -07:00
@task
def download_debug_info(pprint=False):
pprint = parse_bool(pprint)
url = '{}/dump/{}'.format(dconf.WEBSITE_URL, dconf.UPLOAD_CODE)
params = {'pp': int(True)} if pprint else {}
rsp = requests.get(url, params=params)
if rsp.status_code != 200:
raise Exception('Error downloading debug info.')
filename = rsp.headers.get('Content-Disposition').split('=')[-1]
file_len, exp_len = len(rsp.content), int(rsp.headers.get('Content-Length'))
assert file_len == exp_len, 'File {}: content length != expected length: {} != {}'.format(
filename, file_len, exp_len)
with open(filename, 'wb') as f:
f.write(rsp.content)
LOG.info('Downloaded debug info to %s', filename)
return filename
2019-08-23 08:47:19 -07:00
@task
2020-04-27 17:14:18 -07:00
def add_udm(result_dir=None):
result_dir = result_dir or os.path.join(dconf.CONTROLLER_HOME, 'output')
with lcd(dconf.UDM_DIR): # pylint: disable=not-context-manager
local('python3 user_defined_metrics.py {}'.format(result_dir))
2019-08-23 08:47:19 -07:00
@task
2019-10-23 19:46:33 -07:00
def upload_batch(result_dir=None, sort=True, upload_code=None):
result_dir = result_dir or dconf.RESULT_DIR
sort = parse_bool(sort)
results = glob.glob(os.path.join(result_dir, '*__summary.json'))
if sort:
results = sorted(results)
count = len(results)
LOG.info('Uploading %d samples from %s...', count, result_dir)
for i, result in enumerate(results):
prefix = os.path.basename(result)
prefix_len = os.path.basename(result).find('_') + 2
prefix = prefix[:prefix_len]
2019-10-23 19:46:33 -07:00
upload_result(result_dir=result_dir, prefix=prefix, upload_code=upload_code)
LOG.info('Uploaded result %d/%d: %s__*.json', i + 1, count, prefix)
2019-08-23 08:47:19 -07:00
@task
def dump_database():
dumpfile = os.path.join(dconf.DB_DUMP_DIR, dconf.DB_NAME + '.dump')
2019-12-10 20:03:50 -08:00
if dconf.DB_TYPE == 'oracle':
if not dconf.ORACLE_FLASH_BACK and file_exists(dumpfile):
LOG.info('%s already exists ! ', dumpfile)
return False
else:
if file_exists(dumpfile):
LOG.info('%s already exists ! ', dumpfile)
return False
2020-04-23 00:46:58 -07:00
if dconf.DB_TYPE == 'oracle' and dconf.ORACLE_FLASH_BACK:
LOG.info('create restore point %s for database %s in %s', dconf.RESTORE_POINT,
dconf.DB_NAME, dconf.RECOVERY_FILE_DEST)
else:
LOG.info('Dump database %s to %s', dconf.DB_NAME, dumpfile)
2019-11-08 03:42:32 -08:00
if dconf.DB_TYPE == 'oracle':
if dconf.ORACLE_FLASH_BACK:
run_sql_script('createRestore.sh', dconf.RESTORE_POINT,
dconf.RECOVERY_FILE_DEST_SIZE, dconf.RECOVERY_FILE_DEST)
else:
run_sql_script('dumpOracle.sh', dconf.DB_USER, dconf.DB_PASSWORD,
dconf.DB_NAME, dconf.DB_DUMP_DIR)
2019-11-08 03:42:32 -08:00
elif dconf.DB_TYPE == 'postgres':
run('PGPASSWORD={} pg_dump -U {} -h {} -F c -d {} > {}'.format(
dconf.DB_PASSWORD, dconf.DB_USER, dconf.DB_HOST, dconf.DB_NAME,
dumpfile))
2020-04-23 00:46:58 -07:00
elif dconf.DB_TYPE == 'mysql':
sudo('mysqldump --user={} --password={} --databases {} > {}'.format(
dconf.DB_USER, dconf.DB_PASSWORD, dconf.DB_NAME, dumpfile))
2019-11-08 03:42:32 -08:00
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
return True
2019-08-23 08:47:19 -07:00
@task
def clean_recovery():
run_sql_script('removeRestore.sh', dconf.RESTORE_POINT)
cmds = ("""rman TARGET / <<EOF\nDELETE ARCHIVELOG ALL;\nexit\nEOF""")
run(cmds)
2019-08-23 08:47:19 -07:00
@task
def restore_database():
dumpfile = os.path.join(dconf.DB_DUMP_DIR, dconf.DB_NAME + '.dump')
if not dconf.ORACLE_FLASH_BACK and not file_exists(dumpfile):
raise FileNotFoundError("Database dumpfile '{}' does not exist!".format(dumpfile))
LOG.info('Start restoring database')
if dconf.DB_TYPE == 'oracle':
if dconf.ORACLE_FLASH_BACK:
run_sql_script('flashBack.sh', dconf.RESTORE_POINT)
clean_recovery()
else:
drop_user()
create_user()
run_sql_script('restoreOracle.sh', dconf.DB_USER, dconf.DB_NAME)
elif dconf.DB_TYPE == 'postgres':
2019-08-23 08:47:19 -07:00
drop_database()
create_database()
run('PGPASSWORD={} pg_restore -U {} -h {} -n public -j 8 -F c -d {} {}'.format(
dconf.DB_PASSWORD, dconf.DB_USER, dconf.DB_HOST, dconf.DB_NAME, dumpfile))
2020-04-23 00:46:58 -07:00
elif dconf.DB_TYPE == 'mysql':
run('mysql --user={} --password={} < {}'.format(dconf.DB_USER, dconf.DB_PASSWORD, dumpfile))
2019-08-23 08:47:19 -07:00
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
2019-08-23 08:47:19 -07:00
LOG.info('Finish restoring database')
@task
def is_ready_db(interval_sec=10):
if dconf.DB_TYPE == 'mysql':
cmd_fmt = "mysql --user={} --password={} -e 'exit'".format
else:
LOG.info('database %s connecting function is not implemented, sleep %s seconds and return',
dconf.DB_TYPE, dconf.RESTART_SLEEP_SEC)
return
with hide('everything'), settings(warn_only=True): # pylint: disable=not-context-manager
while True:
res = run(cmd_fmt(dconf.DB_USER, dconf.DB_PASSWORD))
if res.failed:
LOG.info('Database %s is not ready, wait for %s seconds',
dconf.DB_TYPE, interval_sec)
time.sleep(interval_sec)
else:
LOG.info('Database %s is ready.', dconf.DB_TYPE)
return
2019-08-23 08:47:19 -07:00
def _ready_to_start_oltpbench():
ready = False
if os.path.exists(dconf.CONTROLLER_LOG):
with open(dconf.CONTROLLER_LOG, 'r') as f:
content = f.read()
ready = 'Output the process pid to' in content
return ready
2019-08-23 08:47:19 -07:00
def _ready_to_start_controller():
ready = False
if os.path.exists(dconf.OLTPBENCH_LOG):
with open(dconf.OLTPBENCH_LOG, 'r') as f:
content = f.read()
ready = 'Warmup complete, starting measurements' in content
return ready
2019-08-23 08:47:19 -07:00
def _ready_to_shut_down_controller():
pidfile = os.path.join(dconf.CONTROLLER_HOME, 'pid.txt')
ready = False
if os.path.exists(pidfile) and os.path.exists(dconf.OLTPBENCH_LOG):
with open(dconf.OLTPBENCH_LOG, 'r') as f:
content = f.read()
2020-05-23 15:29:02 -07:00
if 'Failed' in content:
m = re.search('\n.*Failed.*\n', content)
error_msg = m.group(0)
2020-05-23 15:58:30 -07:00
LOG.error('OLTPBench Failed!')
2020-05-23 15:29:02 -07:00
return True, error_msg
ready = 'Output throughput samples into file' in content
2020-05-23 15:29:02 -07:00
return ready, None
2019-08-23 08:47:19 -07:00
def clean_logs():
# remove oltpbench and controller log files
local('rm -f {} {}'.format(dconf.OLTPBENCH_LOG, dconf.CONTROLLER_LOG))
2019-08-23 08:47:19 -07:00
2020-04-27 21:21:04 -07:00
@task
def clean_oltpbench_results():
# remove oltpbench result files
2020-05-28 21:17:35 -07:00
local('rm -f {}/results/outputfile*'.format(dconf.OLTPBENCH_HOME))
@task
def clean_controller_results():
# remove oltpbench result files
local('rm -f {}/output/*.json'.format(dconf.CONTROLLER_HOME))
2020-04-27 21:21:04 -07:00
2020-05-21 22:07:22 -07:00
def _set_oltpbench_property(name, line):
if name == 'username':
ss = line.split('username')
new_line = ss[0] + 'username>{}</username'.format(dconf.DB_USER) + ss[-1]
elif name == 'password':
ss = line.split('password')
new_line = ss[0] + 'password>{}</password'.format(dconf.DB_PASSWORD) + ss[-1]
elif name == 'DBUrl':
ss = line.split('DBUrl')
if dconf.DB_TYPE == 'postgres':
dburl_fmt = 'jdbc:postgresql://{host}:{port}/{db}'.format
elif dconf.DB_TYPE == 'oracle':
dburl_fmt = 'jdbc:oracle:thin:@{host}:{port}:{db}'.format
elif dconf.DB_TYPE == 'mysql':
if dconf.DB_VERSION in ['5.6', '5.7']:
dburl_fmt = 'jdbc:mysql://{host}:{port}/{db}?useSSL=false'.format
elif dconf.DB_VERSION == '8.0':
dburl_fmt = ('jdbc:mysql://{host}:{port}/{db}?'
'allowPublicKeyRetrieval=true&amp;useSSL=false').format
else:
raise Exception("MySQL Database Version {} "
"Not Implemented !".format(dconf.DB_VERSION))
else:
raise Exception("Database Type {} Not Implemented !".format(dconf.DB_TYPE))
database_url = dburl_fmt(host=dconf.DB_HOST, port=dconf.DB_PORT, db=dconf.DB_NAME)
new_line = ss[0] + 'DBUrl>{}</DBUrl'.format(database_url) + ss[-1]
else:
raise Exception("OLTPBench Config Property {} Not Implemented !".format(name))
return new_line
@task
def set_oltpbench_config():
# set database user, password, and connection url in oltpbench config
lines = None
text = None
with open(dconf.OLTPBENCH_CONFIG, 'r') as f:
lines = f.readlines()
text = ''.join(lines)
lid = 10
for i, line in enumerate(lines):
if 'dbtype' in line:
dbtype = line.split('dbtype')[1][1:-2].strip()
if dbtype != dconf.DB_TYPE:
raise Exception("dbtype {} in OLTPBench config != DB_TYPE {}"
"in driver config !".format(dbtype, dconf.DB_TYPE))
if 'username' in line:
lines[i] = _set_oltpbench_property('username', line)
elif 'password' in line:
lines[i] = _set_oltpbench_property('password', line)
lid = i + 1
elif 'DBUrl' in line:
lines[i] = _set_oltpbench_property('DBUrl', line)
if dconf.ENABLE_UDM:
# add the empty uploadCode and uploadUrl so that OLTPBench will output the summary file,
# which contains throughput, 99latency, 95latency, etc.
if 'uploadUrl' not in text:
line = ' <uploadUrl></uploadUrl>\n'
lines.insert(lid, line)
if 'uploadCode' not in text:
line = ' <uploadCode></uploadCode>\n'
lines.insert(lid, line)
text = ''.join(lines)
with open(dconf.OLTPBENCH_CONFIG, 'w') as f:
f.write(text)
LOG.info('oltpbench config is set: %s', dconf.OLTPBENCH_CONFIG)
2019-08-23 08:47:19 -07:00
@task
2019-10-28 17:07:34 -07:00
def loop(i):
i = int(i)
2019-08-23 08:47:19 -07:00
# free cache
free_cache()
# remove oltpbench log and controller log
clean_logs()
2020-05-05 00:26:06 -07:00
if dconf.ENABLE_UDM is True:
clean_oltpbench_results()
2019-08-23 08:47:19 -07:00
# check disk usage
if check_disk_usage() > dconf.MAX_DISK_USAGE:
LOG.warning('Exceeds max disk usage %s', dconf.MAX_DISK_USAGE)
2019-08-23 08:47:19 -07:00
# run controller from another process
p = Process(target=run_controller, args=())
p.start()
LOG.info('Run the controller')
# run oltpbench as a background job
while not _ready_to_start_oltpbench():
time.sleep(1)
2019-08-23 08:47:19 -07:00
run_oltpbench_bg()
LOG.info('Run OLTP-Bench')
# the controller starts the first collection
while not _ready_to_start_controller():
time.sleep(1)
2019-08-23 08:47:19 -07:00
signal_controller()
LOG.info('Start the first collection')
# stop the experiment
2020-05-23 15:29:02 -07:00
ready_to_shut_down = False
error_msg = None
while not ready_to_shut_down:
ready_to_shut_down, error_msg = _ready_to_shut_down_controller()
time.sleep(1)
2019-10-28 20:32:34 -07:00
2019-08-23 08:47:19 -07:00
signal_controller()
LOG.info('Start the second collection, shut down the controller')
p.join()
2020-05-23 15:29:02 -07:00
if error_msg:
raise Exception('OLTPBench Failed: ' + error_msg)
2020-04-27 17:14:18 -07:00
# add user defined metrics
if dconf.ENABLE_UDM is True:
add_udm()
2019-08-23 08:47:19 -07:00
# save result
result_timestamp = save_dbms_result()
2019-08-23 08:47:19 -07:00
if i >= dconf.WARMUP_ITERATIONS:
2019-10-28 17:07:34 -07:00
# upload result
upload_result()
2019-08-23 08:47:19 -07:00
2019-10-28 17:07:34 -07:00
# get result
response = get_result()
2019-10-28 20:32:34 -07:00
2019-10-28 17:07:34 -07:00
# save next config
save_next_config(response, t=result_timestamp)
2019-10-28 20:32:34 -07:00
2019-10-28 17:07:34 -07:00
# change config
change_conf(response['recommendation'])
2019-08-23 08:47:19 -07:00
2020-05-29 18:59:30 -07:00
2020-05-29 18:27:34 -07:00
@task
def set_dynamic_knobs(recommendation, context):
DYNAMIC = 'dynamic' # pylint: disable=invalid-name
RESTART = 'restart' # pylint: disable=invalid-name
UNKNOWN = 'unknown' # pylint: disable=invalid-name
if dconf.DB_TYPE == 'mysql':
cmd_fmt = "mysql --user={} --password={} -e 'SET GLOBAL {}={}'".format
else:
raise Exception('database {} set dynamic knob function is not implemented.'.format(
dconf.DB_TYPE))
LOG.info('Start setting knobs dynamically.')
with hide('everything'), settings(warn_only=True): # pylint: disable=not-context-manager
for knob, value in recommendation.items():
2020-05-30 00:49:54 -07:00
if value is None:
LOG.warning('Cannot set knob %s dynamically, skip this.', knob)
continue
2020-05-29 18:27:34 -07:00
mode = context.get(knob, UNKNOWN)
if mode == DYNAMIC:
res = run(cmd_fmt(dconf.DB_USER, dconf.DB_PASSWORD, knob, value))
elif mode == RESTART:
LOG.error('Knob %s cannot be set dynamically, restarting database is required, '
2020-05-30 00:49:54 -07:00
'skip this knob.', knob)
2020-05-29 18:27:34 -07:00
continue
elif mode == UNKNOWN:
LOG.warning('It is unclear whether knob %s can be set dynamically or not, '
2020-05-30 00:49:54 -07:00
'still set it to value %s', knob, value)
2020-05-29 18:27:34 -07:00
res = run(cmd_fmt(dconf.DB_USER, dconf.DB_PASSWORD, knob, value))
if res.failed:
LOG.error('Failed to set knob %s to value %s', knob, value)
LOG.error(res)
LOG.info('Finish setting knobs dynamically.')
2019-08-23 08:47:19 -07:00
@task
2019-12-10 20:03:50 -08:00
def run_loops(max_iter=10):
2019-08-23 08:47:19 -07:00
# dump database if it's not done before.
dump = dump_database()
2020-04-23 00:46:58 -07:00
# put the BASE_DB_CONF in the config file
# e.g., mysql needs to set innodb_monitor_enable to track innodb metrics
reset_conf(False)
2019-08-23 08:47:19 -07:00
for i in range(int(max_iter)):
# restart database
restart_succeeded = restart_database()
if not restart_succeeded:
files = {'summary': b'{"error": "DB_RESTART_ERROR"}',
'knobs': b'{}',
'metrics_before': b'{}',
'metrics_after': b'{}'}
2020-04-27 17:14:18 -07:00
if dconf.ENABLE_UDM:
files['user_defined_metrics'] = b'{}'
response = requests.post(dconf.WEBSITE_URL + '/new_result/', files=files,
data={'upload_code': dconf.UPLOAD_CODE})
response = get_result()
result_timestamp = int(time.time())
save_next_config(response, t=result_timestamp)
change_conf(response['recommendation'])
continue
2019-12-03 18:08:55 -08:00
# reload database periodically
if dconf.RELOAD_INTERVAL > 0:
if i % dconf.RELOAD_INTERVAL == 0:
2020-05-05 00:26:06 -07:00
is_ready_db(interval_sec=10)
2019-08-23 08:47:19 -07:00
if i == 0 and dump is False:
restore_database()
elif i > 0:
restore_database()
2020-04-23 00:46:58 -07:00
LOG.info('Wait %s seconds after restarting database', dconf.RESTART_SLEEP_SEC)
is_ready_db(interval_sec=10)
2019-08-23 08:47:19 -07:00
LOG.info('The %s-th Loop Starts / Total Loops %s', i + 1, max_iter)
loop(i % dconf.RELOAD_INTERVAL if dconf.RELOAD_INTERVAL > 0 else i)
2019-08-23 08:47:19 -07:00
LOG.info('The %s-th Loop Ends / Total Loops %s', i + 1, max_iter)
2019-10-23 19:46:33 -07:00
2020-05-28 21:17:35 -07:00
@task
def monitor(max_iter=1):
2020-05-29 18:27:34 -07:00
# Monitor the database, without tuning. OLTPBench is also disabled
2020-05-28 21:17:35 -07:00
for i in range(int(max_iter)):
LOG.info('The %s-th Monitor Loop Starts / Total Loops %s', i + 1, max_iter)
clean_controller_results()
run_controller(interval_sec=dconf.CONTROLLER_OBSERVE_SEC)
upload_result()
LOG.info('The %s-th Monitor Loop Ends / Total Loops %s', i + 1, max_iter)
2020-05-29 18:27:34 -07:00
@task
def monitor_tune(max_iter=1):
# Monitor the database, with tuning. OLTPBench is also disabled
2020-05-30 00:49:54 -07:00
# Set base config
set_dynamic_knobs(dconf.BASE_DB_CONF, {})
2020-05-29 18:27:34 -07:00
for i in range(int(max_iter)):
LOG.info('The %s-th Monitor Loop (with Tuning) Starts / Total Loops %s', i + 1, max_iter)
clean_controller_results()
run_controller(interval_sec=dconf.CONTROLLER_OBSERVE_SEC)
upload_result()
response = get_result()
set_dynamic_knobs(response['recommendation'], response['context'])
LOG.info('The %s-th Monitor Loop (with Tuning) Ends / Total Loops %s', i + 1, max_iter)
2019-10-24 08:59:36 -07:00
@task
2019-10-23 19:46:33 -07:00
def rename_batch(result_dir=None):
result_dir = result_dir or dconf.RESULT_DIR
2019-10-23 19:46:33 -07:00
results = glob.glob(os.path.join(result_dir, '*__summary.json'))
results = sorted(results)
for i, result in enumerate(results):
prefix = os.path.basename(result)
prefix_len = os.path.basename(result).find('_') + 2
prefix = prefix[:prefix_len]
new_prefix = str(i) + '__'
2020-04-27 17:14:18 -07:00
bases = ['summary', 'knobs', 'metrics_before', 'metrics_after']
if dconf.ENABLE_UDM:
bases.append('user_defined_metrics')
for base in bases:
2019-10-23 19:46:33 -07:00
fpath = os.path.join(result_dir, prefix + base + '.json')
rename_path = os.path.join(result_dir, new_prefix + base + '.json')
os.rename(fpath, rename_path)
def _http_content_to_json(content):
if isinstance(content, bytes):
content = content.decode('utf-8')
try:
json_content = json.loads(content)
decoded = True
except (TypeError, json.decoder.JSONDecodeError):
json_content = None
decoded = False
return json_content, decoded
def _modify_website_object(obj_name, action, verbose=False, **kwargs):
2020-02-08 15:05:05 -08:00
verbose = parse_bool(verbose)
if obj_name == 'project':
valid_actions = ('create', 'edit')
elif obj_name == 'session':
valid_actions = ('create', 'edit')
elif obj_name == 'user':
valid_actions = ('create', 'delete')
else:
raise ValueError('Invalid object: {}. Valid objects: project, session'.format(obj_name))
if action not in valid_actions:
raise ValueError('Invalid action: {}. Valid actions: {}'.format(
action, ', '.join(valid_actions)))
data = {}
for k, v in kwargs.items():
if isinstance(v, (dict, list, tuple)):
v = json.dumps(v)
data[k] = v
url_path = '/{}/{}/'.format(action, obj_name)
response = requests.post(dconf.WEBSITE_URL + url_path, data=data)
content = response.content.decode('utf-8')
if response.status_code != 200:
raise Exception("Failed to {} {}.\nStatus: {}\nMessage: {}\n".format(
action, obj_name, response.status_code, content))
json_content, decoded = _http_content_to_json(content)
if verbose:
if decoded:
LOG.info('\n%s_%s = %s', action.upper(), obj_name.upper(),
json.dumps(json_content, indent=4))
else:
LOG.warning("Content could not be decoded.\n\n%s\n", content)
return response, json_content, decoded
@task
def create_website_user(**kwargs):
return _modify_website_object('user', 'create', **kwargs)
@task
def delete_website_user(**kwargs):
return _modify_website_object('user', 'delete', **kwargs)
@task
def create_website_project(**kwargs):
return _modify_website_object('project', 'create', **kwargs)
@task
def edit_website_project(**kwargs):
return _modify_website_object('project', 'edit', **kwargs)
@task
def create_website_session(**kwargs):
return _modify_website_object('session', 'create', **kwargs)
@task
def edit_website_session(**kwargs):
return _modify_website_object('session', 'edit', **kwargs)
2019-10-24 08:59:36 -07:00
def wait_pipeline_data_ready(max_time_sec=800, interval_sec=10):
max_time_sec = int(max_time_sec)
interval_sec = int(interval_sec)
elapsed = 0
ready = False
2019-10-24 08:59:36 -07:00
while elapsed <= max_time_sec:
response = requests.get(dconf.WEBSITE_URL + '/test/pipeline/')
content = get_content(response)
2019-11-26 01:47:42 -08:00
LOG.info("%s (elapsed: %ss)", content, elapsed)
if 'False' in content:
2019-10-24 08:59:36 -07:00
time.sleep(interval_sec)
elapsed += interval_sec
else:
ready = True
break
return ready
2019-10-24 08:59:36 -07:00
2019-10-23 19:46:33 -07:00
@task
def integration_tests_simple():
2019-10-23 19:46:33 -07:00
# Create test website
response = requests.get(dconf.WEBSITE_URL + '/test/create/')
LOG.info(get_content(response))
2019-10-23 19:46:33 -07:00
# Upload training data
LOG.info('Upload training data to no tuning session')
upload_batch(result_dir='./integrationTests/data/', upload_code='ottertuneTestNoTuning')
2019-10-23 19:46:33 -07:00
2020-05-23 15:58:30 -07:00
# periodic tasks haven't ran, lhs result returns.
LOG.info('Test no pipeline data, LHS returned')
upload_result(result_dir='./integrationTests/data/', prefix='0__',
upload_code='ottertuneTestTuningGPR')
response = get_result(upload_code='ottertuneTestTuningGPR')
assert response['status'] == 'lhs'
2019-10-24 09:28:36 -07:00
# wait celery periodic task finishes
assert wait_pipeline_data_ready(), "Pipeline data failed"
2019-10-24 08:59:36 -07:00
2019-10-23 19:46:33 -07:00
# Test DNN
LOG.info('Test DNN (deep neural network)')
upload_result(result_dir='./integrationTests/data/', prefix='0__',
2019-10-24 08:59:36 -07:00
upload_code='ottertuneTestTuningDNN')
2019-10-23 19:46:33 -07:00
response = get_result(upload_code='ottertuneTestTuningDNN')
assert response['status'] == 'good'
# Test GPR
LOG.info('Test GPR (gaussian process regression)')
upload_result(result_dir='./integrationTests/data/', prefix='0__',
2019-10-24 08:59:36 -07:00
upload_code='ottertuneTestTuningGPR')
2019-10-23 19:46:33 -07:00
response = get_result(upload_code='ottertuneTestTuningGPR')
2019-10-24 08:59:36 -07:00
assert response['status'] == 'good'
2019-12-16 19:27:14 -08:00
2020-02-23 23:51:11 -08:00
# Test DDPG
LOG.info('Test DDPG (deep deterministic policy gradient)')
upload_result(result_dir='./integrationTests/data/', prefix='0__',
upload_code='ottertuneTestTuningDDPG')
response = get_result(upload_code='ottertuneTestTuningDDPG')
assert response['status'] == 'good'
2019-12-16 21:14:26 -08:00
# Test DNN: 2rd iteration
upload_result(result_dir='./integrationTests/data/', prefix='1__',
upload_code='ottertuneTestTuningDNN')
response = get_result(upload_code='ottertuneTestTuningDNN')
assert response['status'] == 'good'
# Test GPR: 2rd iteration
2019-12-16 19:27:14 -08:00
upload_result(result_dir='./integrationTests/data/', prefix='1__',
upload_code='ottertuneTestTuningGPR')
response = get_result(upload_code='ottertuneTestTuningGPR')
assert response['status'] == 'good'
2020-02-23 23:51:11 -08:00
# Test DDPG: 2rd iteration
upload_result(result_dir='./integrationTests/data/', prefix='1__',
upload_code='ottertuneTestTuningDDPG')
response = get_result(upload_code='ottertuneTestTuningDDPG')
assert response['status'] == 'good'
LOG.info("\n\nIntegration Tests: PASSED!!\n")
2020-03-29 21:40:03 -07:00
2020-03-29 22:07:39 -07:00
# Test task status UI
task_status_ui_test()
2020-03-29 21:40:03 -07:00
2020-03-29 22:33:09 -07:00
2020-03-29 21:40:03 -07:00
@task
def task_status_ui_test():
# Test GPR
upload_code = 'ottertuneTestTuningGPR'
response = requests.get(dconf.WEBSITE_URL + '/test/task_status/' + upload_code)
2020-03-29 22:07:39 -07:00
LOG.info(get_content(response))
2020-03-29 21:40:03 -07:00
assert 'Success:' in get_content(response)
# Test DNN:
upload_code = 'ottertuneTestTuningDNN'
response = requests.get(dconf.WEBSITE_URL + '/test/task_status/' + upload_code)
2020-03-29 22:07:39 -07:00
LOG.info(get_content(response))
2020-03-29 21:40:03 -07:00
assert 'Success:' in get_content(response)
# Test DDPG:
upload_code = 'ottertuneTestTuningDDPG'
response = requests.get(dconf.WEBSITE_URL + '/test/task_status/' + upload_code)
2020-03-29 22:07:39 -07:00
LOG.info(get_content(response))
2020-03-29 21:40:03 -07:00
assert 'Success:' in get_content(response)
LOG.info("\n\nTask Status UI Tests: PASSED!!\n")
def simulate_db_run(i, next_conf):
# Using 1 knob to decide performance; simple but effective
2020-05-25 09:28:20 -07:00
gain = int(next_conf['effective_cache_size'].replace('MB', '000').replace('kB', ''))
with open('./integrationTests/data/x__metrics_after.json', 'r') as fin:
metrics_after = json.load(fin)
metrics_after['local']['database']['pg_stat_database']['tpcc']['xact_commit'] = gain
with open('./integrationTests/data/x__metrics_after.json', 'w') as fout:
json.dump(metrics_after, fout)
with open('./integrationTests/data/x__knobs.json', 'r') as fin:
knobs = json.load(fin)
for knob in next_conf:
knobs['global']['global'][knob] = next_conf[knob]
with open('./integrationTests/data/x__knobs.json', 'w') as fout:
json.dump(knobs, fout)
with open('./integrationTests/data/x__summary.json', 'r') as fin:
summary = json.load(fin)
summary['start_time'] = i * 20000000
summary['observation_time'] = 100
summary['end_time'] = (i + 1) * 20000000
with open('./integrationTests/data/x__summary.json', 'w') as fout:
json.dump(summary, fout)
return gain
@task
def integration_tests():
# Create test website
response = requests.get(dconf.WEBSITE_URL + '/test/create/')
LOG.info(get_content(response))
# Upload training data
LOG.info('Upload training data to no tuning session')
upload_batch(result_dir='./integrationTests/data/', upload_code='ottertuneTestNoTuning')
# periodic tasks haven't ran, lhs result returns.
LOG.info('Test no pipeline data, LHS returned')
2020-05-25 09:02:43 -07:00
upload_result(result_dir='./integrationTests/data/', prefix='1__',
upload_code='ottertuneTestTuningGPR')
response = get_result(upload_code='ottertuneTestTuningGPR')
assert response['status'] == 'lhs'
# wait celery periodic task finishes
assert wait_pipeline_data_ready(), "Pipeline data failed"
2020-05-25 11:12:08 -07:00
total_n = 20
2020-05-24 22:36:50 -07:00
last_n = 10
2020-05-25 11:12:08 -07:00
max_gain = 0
simulate_db_run(1, {'effective_cache_size': '0kB'})
for i in range(2, total_n + 2):
LOG.info('Test GPR (gaussian process regression)')
upload_result(result_dir='./integrationTests/data/', prefix='x__',
upload_code='ottertuneTestTuningGPR')
response = get_result(upload_code='ottertuneTestTuningGPR')
assert response['status'] == 'good'
gain = simulate_db_run(i, response['recommendation'])
2020-05-25 11:12:08 -07:00
if max_gain < gain:
max_gain = gain
elif i > total_n - last_n + 2:
2020-05-25 11:12:08 -07:00
assert gain > max_gain / 2.0
2020-05-25 11:12:08 -07:00
max_gain = 0
simulate_db_run(1, {'effective_cache_size': '0kB'})
for i in range(2, total_n + 2):
LOG.info('Test DNN (deep neural network)')
upload_result(result_dir='./integrationTests/data/', prefix='x__',
upload_code='ottertuneTestTuningDNN')
response = get_result(upload_code='ottertuneTestTuningDNN')
assert response['status'] == 'good'
gain = simulate_db_run(i, response['recommendation'])
2020-05-25 11:12:08 -07:00
if max_gain < gain:
max_gain = gain
elif i > total_n - last_n + 2:
2020-05-25 11:12:08 -07:00
assert gain > max_gain / 2.0
2020-05-25 11:12:08 -07:00
max_gain = 0
simulate_db_run(1, {'effective_cache_size': '0kB'})
for i in range(2, total_n + 2):
upload_result(result_dir='./integrationTests/data/', prefix='x__',
upload_code='ottertuneTestTuningDDPG')
response = get_result(upload_code='ottertuneTestTuningDDPG')
assert response['status'] == 'good'
gain = simulate_db_run(i, response['recommendation'])
2020-05-25 11:12:08 -07:00
if max_gain < gain:
max_gain = gain
elif i > total_n - last_n + 2:
2020-05-25 11:12:08 -07:00
assert gain > max_gain / 2.0
2020-05-25 13:33:36 -07:00
2020-05-25 12:54:32 -07:00
# ------------------- concurrency test ----------------------
upload_result(result_dir='./integrationTests/data/', prefix='2__',
upload_code='ottertuneTestTuningDNN')
upload_result(result_dir='./integrationTests/data/', prefix='3__',
upload_code='ottertuneTestTuningDNN')
upload_result(result_dir='./integrationTests/data/', prefix='2__',
upload_code='ottertuneTestTuningGPR')
upload_result(result_dir='./integrationTests/data/', prefix='3__',
upload_code='ottertuneTestTuningGPR')
upload_result(result_dir='./integrationTests/data/', prefix='2__',
upload_code='ottertuneTestTuningDDPG')
upload_result(result_dir='./integrationTests/data/', prefix='3__',
upload_code='ottertuneTestTuningDDPG')
response = get_result(upload_code='ottertuneTestTuningDNN')
assert response['status'] == 'good'
response = get_result(upload_code='ottertuneTestTuningGPR')
assert response['status'] == 'good'
response = get_result(upload_code='ottertuneTestTuningDDPG')
assert response['status'] == 'good'
2020-05-25 09:02:43 -07:00
LOG.info("\n\nIntegration Tests: PASSED!!\n")
# Test task status UI
task_status_ui_test()