422 lines
12 KiB
Python
422 lines
12 KiB
Python
|
#
|
||
|
# OtterTune - fabfile.py
|
||
|
#
|
||
|
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
|
||
|
#
|
||
|
'''
|
||
|
Created on Mar 23, 2018
|
||
|
|
||
|
@author: bohan
|
||
|
'''
|
||
|
import sys
|
||
|
import json
|
||
|
import logging
|
||
|
import time
|
||
|
import os.path
|
||
|
import re
|
||
|
import glob
|
||
|
from multiprocessing import Process
|
||
|
from fabric.api import (env, local, task, lcd)
|
||
|
from fabric.state import output as fabric_output
|
||
|
|
||
|
LOG = logging.getLogger()
|
||
|
LOG.setLevel(logging.DEBUG)
|
||
|
Formatter = logging.Formatter( # pylint: disable=invalid-name
|
||
|
"%(asctime)s [%(levelname)s] %(message)s")
|
||
|
|
||
|
# print the log
|
||
|
ConsoleHandler = logging.StreamHandler(sys.stdout) # pylint: disable=invalid-name
|
||
|
ConsoleHandler.setFormatter(Formatter)
|
||
|
LOG.addHandler(ConsoleHandler)
|
||
|
|
||
|
# Fabric environment settings
|
||
|
env.hosts = ['localhost']
|
||
|
fabric_output.update({
|
||
|
'running': True,
|
||
|
'stdout': True,
|
||
|
})
|
||
|
|
||
|
# intervals of restoring the databse
|
||
|
RELOAD_INTERVAL = 10
|
||
|
# maximum disk usage
|
||
|
MAX_DISK_USAGE = 90
|
||
|
|
||
|
with open('driver_config.json', 'r') as f:
|
||
|
CONF = json.load(f)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def check_disk_usage():
|
||
|
partition = CONF['database_disk']
|
||
|
disk_use = 0
|
||
|
cmd = "df -h {}".format(partition)
|
||
|
out = local(cmd, capture=True).splitlines()[1]
|
||
|
m = re.search('\d+(?=%)', out) # pylint: disable=anomalous-backslash-in-string
|
||
|
if m:
|
||
|
disk_use = int(m.group(0))
|
||
|
LOG.info("Current Disk Usage: %s%s", disk_use, '%')
|
||
|
return disk_use
|
||
|
|
||
|
|
||
|
@task
|
||
|
def check_memory_usage():
|
||
|
cmd = 'free -m -h'
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def restart_database():
|
||
|
if CONF['database_type'] == 'postgres':
|
||
|
cmd = 'sudo service postgresql restart'
|
||
|
elif CONF['database_type'] == 'oracle':
|
||
|
cmd = 'sh oracleScripts/shutdownOracle.sh && sh oracleScripts/startupOracle.sh'
|
||
|
else:
|
||
|
raise Exception("Database Type {} Not Implemented !".format(CONF['database_type']))
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def drop_database():
|
||
|
if CONF['database_type'] == 'postgres':
|
||
|
cmd = "PGPASSWORD={} dropdb -e --if-exists {} -U {}".\
|
||
|
format(CONF['password'], CONF['database_name'], CONF['username'])
|
||
|
else:
|
||
|
raise Exception("Database Type {} Not Implemented !".format(CONF['database_type']))
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def create_database():
|
||
|
if CONF['database_type'] == 'postgres':
|
||
|
cmd = "PGPASSWORD={} createdb -e {} -U {}".\
|
||
|
format(CONF['password'], CONF['database_name'], CONF['username'])
|
||
|
else:
|
||
|
raise Exception("Database Type {} Not Implemented !".format(CONF['database_type']))
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def change_conf():
|
||
|
next_conf = 'next_config'
|
||
|
cmd = "sudo python3 ConfParser.py {} {} {}".\
|
||
|
format(CONF['database_type'], next_conf, CONF['database_conf'])
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def load_oltpbench():
|
||
|
cmd = "./oltpbenchmark -b {} -c {} --create=true --load=true".\
|
||
|
format(CONF['oltpbench_workload'], CONF['oltpbench_config'])
|
||
|
with lcd(CONF['oltpbench_home']): # pylint: disable=not-context-manager
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def run_oltpbench():
|
||
|
cmd = "./oltpbenchmark -b {} -c {} --execute=true -s 5 -o outputfile".\
|
||
|
format(CONF['oltpbench_workload'], CONF['oltpbench_config'])
|
||
|
with lcd(CONF['oltpbench_home']): # pylint: disable=not-context-manager
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def run_oltpbench_bg():
|
||
|
cmd = "./oltpbenchmark -b {} -c {} --execute=true -s 5 -o outputfile > {} 2>&1 &".\
|
||
|
format(CONF['oltpbench_workload'], CONF['oltpbench_config'], CONF['oltpbench_log'])
|
||
|
with lcd(CONF['oltpbench_home']): # pylint: disable=not-context-manager
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def run_controller():
|
||
|
cmd = 'sudo gradle run -PappArgs="-c {} -d output/" --no-daemon > {}'.\
|
||
|
format(CONF['controller_config'], CONF['controller_log'])
|
||
|
with lcd("../controller"): # pylint: disable=not-context-manager
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def signal_controller():
|
||
|
pid = int(open('../controller/pid.txt').read())
|
||
|
cmd = 'sudo kill -2 {}'.format(pid)
|
||
|
with lcd("../controller"): # pylint: disable=not-context-manager
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def save_dbms_result():
|
||
|
t = int(time.time())
|
||
|
files = ['knobs.json', 'metrics_after.json', 'metrics_before.json', 'summary.json']
|
||
|
for f_ in files:
|
||
|
f_prefix = f_.split('.')[0]
|
||
|
cmd = 'cp ../controller/output/{} {}/{}__{}.json'.\
|
||
|
format(f_, CONF['save_path'], t, f_prefix)
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def free_cache():
|
||
|
cmd = 'sync; sudo bash -c "echo 1 > /proc/sys/vm/drop_caches"'
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def upload_result():
|
||
|
cmd = 'python3 ../../server/website/script/upload/upload.py \
|
||
|
../controller/output/ {} {}/new_result/'.format(CONF['upload_code'],
|
||
|
CONF['upload_url'])
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def get_result():
|
||
|
cmd = 'python3 ../../script/query_and_get.py {} {} 5'.\
|
||
|
format(CONF['upload_url'], CONF['upload_code'])
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def add_udf():
|
||
|
cmd = 'sudo python3 ./LatencyUDF.py ../controller/output/'
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def upload_batch():
|
||
|
cmd = 'python3 ./upload_batch.py {} {} {}/new_result/'.format(CONF['save_path'],
|
||
|
CONF['upload_code'],
|
||
|
CONF['upload_url'])
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def dump_database():
|
||
|
db_file_path = '{}/{}.dump'.format(CONF['database_save_path'], CONF['database_name'])
|
||
|
if os.path.exists(db_file_path):
|
||
|
LOG.info('%s already exists ! ', db_file_path)
|
||
|
return False
|
||
|
else:
|
||
|
LOG.info('Dump database %s to %s', CONF['database_name'], db_file_path)
|
||
|
# You must create a directory named dpdata through sqlplus in your Oracle database
|
||
|
if CONF['database_type'] == 'oracle':
|
||
|
cmd = 'expdp {}/{}@{} schemas={} dumpfile={}.dump DIRECTORY=dpdata'.format(
|
||
|
'c##tpcc', 'oracle', 'orcldb', 'c##tpcc', 'orcldb')
|
||
|
elif CONF['database_type'] == 'postgres':
|
||
|
cmd = 'PGPASSWORD={} pg_dump -U {} -F c -d {} > {}'.format(CONF['password'],
|
||
|
CONF['username'],
|
||
|
CONF['database_name'],
|
||
|
db_file_path)
|
||
|
else:
|
||
|
raise Exception("Database Type {} Not Implemented !".format(CONF['database_type']))
|
||
|
local(cmd)
|
||
|
return True
|
||
|
|
||
|
|
||
|
@task
|
||
|
def restore_database():
|
||
|
if CONF['database_type'] == 'oracle':
|
||
|
# You must create a directory named dpdata through sqlplus in your Oracle database
|
||
|
# The following script assumes such directory exists.
|
||
|
# You may want to modify the username, password, and dump file name in the script
|
||
|
cmd = 'sh oracleScripts/restoreOracle.sh'
|
||
|
elif CONF['database_type'] == 'postgres':
|
||
|
db_file_path = '{}/{}.dump'.format(CONF['database_save_path'], CONF['database_name'])
|
||
|
drop_database()
|
||
|
create_database()
|
||
|
cmd = 'PGPASSWORD={} pg_restore -U {} -n public -j 8 -F c -d {} {}'.\
|
||
|
format(CONF['password'], CONF['username'], CONF['database_name'], db_file_path)
|
||
|
else:
|
||
|
raise Exception("Database Type {} Not Implemented !".format(CONF['database_type']))
|
||
|
LOG.info('Start restoring database')
|
||
|
local(cmd)
|
||
|
LOG.info('Finish restoring database')
|
||
|
|
||
|
|
||
|
def _ready_to_start_oltpbench():
|
||
|
return (os.path.exists(CONF['controller_log']) and
|
||
|
'Output the process pid to'
|
||
|
in open(CONF['controller_log']).read())
|
||
|
|
||
|
|
||
|
def _ready_to_start_controller():
|
||
|
return (os.path.exists(CONF['oltpbench_log']) and
|
||
|
'Warmup complete, starting measurements'
|
||
|
in open(CONF['oltpbench_log']).read())
|
||
|
|
||
|
|
||
|
def _ready_to_shut_down_controller():
|
||
|
pid_file_path = '../controller/pid.txt'
|
||
|
return (os.path.exists(pid_file_path) and os.path.exists(CONF['oltpbench_log']) and
|
||
|
'Output throughput samples into file' in open(CONF['oltpbench_log']).read())
|
||
|
|
||
|
|
||
|
def clean_logs():
|
||
|
# remove oltpbench log
|
||
|
cmd = 'rm -f {}'.format(CONF['oltpbench_log'])
|
||
|
local(cmd)
|
||
|
|
||
|
# remove controller log
|
||
|
cmd = 'rm -f {}'.format(CONF['controller_log'])
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def lhs_samples(count=10):
|
||
|
cmd = 'python3 lhs.py {} {} {}'.format(count, CONF['lhs_knob_path'], CONF['lhs_save_path'])
|
||
|
local(cmd)
|
||
|
|
||
|
|
||
|
@task
|
||
|
def loop():
|
||
|
|
||
|
# free cache
|
||
|
free_cache()
|
||
|
|
||
|
# remove oltpbench log and controller log
|
||
|
clean_logs()
|
||
|
|
||
|
# restart database
|
||
|
restart_database()
|
||
|
|
||
|
# check disk usage
|
||
|
if check_disk_usage() > MAX_DISK_USAGE:
|
||
|
LOG.WARN('Exceeds max disk usage %s', MAX_DISK_USAGE)
|
||
|
|
||
|
# run controller from another process
|
||
|
p = Process(target=run_controller, args=())
|
||
|
p.start()
|
||
|
LOG.info('Run the controller')
|
||
|
|
||
|
# run oltpbench as a background job
|
||
|
while not _ready_to_start_oltpbench():
|
||
|
pass
|
||
|
run_oltpbench_bg()
|
||
|
LOG.info('Run OLTP-Bench')
|
||
|
|
||
|
# the controller starts the first collection
|
||
|
while not _ready_to_start_controller():
|
||
|
pass
|
||
|
signal_controller()
|
||
|
LOG.info('Start the first collection')
|
||
|
|
||
|
# stop the experiment
|
||
|
while not _ready_to_shut_down_controller():
|
||
|
pass
|
||
|
signal_controller()
|
||
|
LOG.info('Start the second collection, shut down the controller')
|
||
|
|
||
|
p.join()
|
||
|
|
||
|
# add user defined target objective
|
||
|
# add_udf()
|
||
|
|
||
|
# save result
|
||
|
save_dbms_result()
|
||
|
|
||
|
# upload result
|
||
|
upload_result()
|
||
|
|
||
|
# get result
|
||
|
get_result()
|
||
|
|
||
|
# change config
|
||
|
change_conf()
|
||
|
|
||
|
|
||
|
@task
|
||
|
def run_lhs():
|
||
|
datadir = CONF['lhs_save_path']
|
||
|
samples = glob.glob(os.path.join(datadir, 'config_*'))
|
||
|
|
||
|
# dump database if it's not done before.
|
||
|
dump = dump_database()
|
||
|
|
||
|
for i, sample in enumerate(samples):
|
||
|
# reload database periodically
|
||
|
if RELOAD_INTERVAL > 0:
|
||
|
if i % RELOAD_INTERVAL == 0:
|
||
|
if i == 0 and dump is False:
|
||
|
restore_database()
|
||
|
elif i > 0:
|
||
|
restore_database()
|
||
|
# free cache
|
||
|
free_cache()
|
||
|
|
||
|
LOG.info('\n\n Start %s-th sample %s \n\n', i, sample)
|
||
|
# check memory usage
|
||
|
# check_memory_usage()
|
||
|
|
||
|
# check disk usage
|
||
|
if check_disk_usage() > MAX_DISK_USAGE:
|
||
|
LOG.WARN('Exceeds max disk usage %s', MAX_DISK_USAGE)
|
||
|
|
||
|
# copy lhs-sampled config to the to-be-used config
|
||
|
cmd = 'cp {} next_config'.format(sample)
|
||
|
local(cmd)
|
||
|
|
||
|
# remove oltpbench log and controller log
|
||
|
clean_logs()
|
||
|
|
||
|
# change config
|
||
|
change_conf()
|
||
|
|
||
|
# restart database
|
||
|
restart_database()
|
||
|
|
||
|
if CONF.get('oracle_awr_enabled', False):
|
||
|
# create snapshot for oracle AWR report
|
||
|
if CONF['database_type'] == 'oracle':
|
||
|
local('sh snapshotOracle.sh')
|
||
|
|
||
|
# run controller from another process
|
||
|
p = Process(target=run_controller, args=())
|
||
|
p.start()
|
||
|
|
||
|
# run oltpbench as a background job
|
||
|
while not _ready_to_start_oltpbench():
|
||
|
pass
|
||
|
run_oltpbench_bg()
|
||
|
LOG.info('Run OLTP-Bench')
|
||
|
|
||
|
while not _ready_to_start_controller():
|
||
|
pass
|
||
|
signal_controller()
|
||
|
LOG.info('Start the first collection')
|
||
|
|
||
|
while not _ready_to_shut_down_controller():
|
||
|
pass
|
||
|
# stop the experiment
|
||
|
signal_controller()
|
||
|
LOG.info('Start the second collection, shut down the controller')
|
||
|
|
||
|
p.join()
|
||
|
|
||
|
# save result
|
||
|
save_dbms_result()
|
||
|
|
||
|
# upload result
|
||
|
upload_result()
|
||
|
|
||
|
if CONF.get('oracle_awr_enabled', False):
|
||
|
# create oracle AWR report for performance analysis
|
||
|
if CONF['database_type'] == 'oracle':
|
||
|
local('sh oracleScripts/snapshotOracle.sh && sh oracleScripts/awrOracle.sh')
|
||
|
|
||
|
|
||
|
@task
|
||
|
def run_loops(max_iter=1):
|
||
|
# dump database if it's not done before.
|
||
|
dump = dump_database()
|
||
|
|
||
|
for i in range(int(max_iter)):
|
||
|
if RELOAD_INTERVAL > 0:
|
||
|
if i % RELOAD_INTERVAL == 0:
|
||
|
if i == 0 and dump is False:
|
||
|
restore_database()
|
||
|
elif i > 0:
|
||
|
restore_database()
|
||
|
|
||
|
LOG.info('The %s-th Loop Starts / Total Loops %s', i + 1, max_iter)
|
||
|
loop()
|
||
|
LOG.info('The %s-th Loop Ends / Total Loops %s', i + 1, max_iter)
|