Moved LHS to Server

This commit is contained in:
arifiorino 2019-10-21 20:58:13 +00:00 committed by Dana Van Aken
parent a2234d08cc
commit b215b156a4
3 changed files with 58 additions and 3 deletions

View File

@ -35,7 +35,7 @@ RELOAD_INTERVAL = 10
# maximum disk usage # maximum disk usage
MAX_DISK_USAGE = 90 MAX_DISK_USAGE = 90
# Postgres datadir # Postgres datadir
PG_DATADIR = '/var/lib/postgresql/11/main' PG_DATADIR = '/var/lib/postgresql/9.6/main'
# Load config # Load config
with open('driver_config.json', 'r') as _f: with open('driver_config.json', 'r') as _f:

View File

@ -134,7 +134,8 @@ class Session(BaseModel):
TUNING_OPTIONS = OrderedDict([ TUNING_OPTIONS = OrderedDict([
("tuning_session", "Tuning Session"), ("tuning_session", "Tuning Session"),
("no_tuning_session", "No Tuning"), ("no_tuning_session", "No Tuning"),
("randomly_generate", "Randomly Generate") ("randomly_generate", "Randomly Generate"),
("lhs", "Run LHS")
]) ])
user = models.ForeignKey(User) user = models.ForeignKey(User)
@ -144,6 +145,7 @@ class Session(BaseModel):
hardware = models.ForeignKey(Hardware) hardware = models.ForeignKey(Hardware)
algorithm = models.IntegerField(choices=AlgorithmType.choices(), algorithm = models.IntegerField(choices=AlgorithmType.choices(),
default=AlgorithmType.GPR) default=AlgorithmType.GPR)
lhs_samples = models.TextField(default="[]")
ddpg_actor_model = models.BinaryField(null=True, blank=True) ddpg_actor_model = models.BinaryField(null=True, blank=True)
ddpg_critic_model = models.BinaryField(null=True, blank=True) ddpg_critic_model = models.BinaryField(null=True, blank=True)
ddpg_reply_memory = models.BinaryField(null=True, blank=True) ddpg_reply_memory = models.BinaryField(null=True, blank=True)

View File

@ -6,6 +6,8 @@
import random import random
import queue import queue
import numpy as np import numpy as np
from pyDOE import lhs
from scipy.stats import uniform
from celery.task import task, Task from celery.task import task, Task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
@ -147,7 +149,25 @@ def aggregate_target_results(result_id, algorithm):
# implement a sampling technique to generate new training data). # implement a sampling technique to generate new training data).
newest_result = Result.objects.get(pk=result_id) newest_result = Result.objects.get(pk=result_id)
has_pipeline_data = PipelineData.objects.filter(workload=newest_result.workload).exists() has_pipeline_data = PipelineData.objects.filter(workload=newest_result.workload).exists()
if not has_pipeline_data or newest_result.session.tuning_session == 'randomly_generate': if newest_result.session.tuning_session == 'lhs':
all_samples = JSONUtil.loads(newest_result.session.lhs_samples)
if len(all_samples) == 0:
knobs = SessionKnob.objects.get_knobs_for_session(newest_result.session)
all_samples = gen_lhs_samples(knobs, 100)
LOG.debug('%s: Generated LHS.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(all_samples[:5], pprint=True))
samples = all_samples.pop()
result = Result.objects.filter(pk=result_id)
agg_data = DataUtil.aggregate_data(result)
agg_data['newest_result_id'] = result_id
agg_data['bad'] = True
agg_data['config_recommend'] = samples
newest_result.session.lhs_samples = JSONUtil.dumps(all_samples)
newest_result.session.save()
LOG.debug('%s: Got LHS config.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(agg_data, pprint=True))
elif not has_pipeline_data or newest_result.session.tuning_session == 'randomly_generate':
if not has_pipeline_data and newest_result.session.tuning_session == 'tuning_session': if not has_pipeline_data and newest_result.session.tuning_session == 'tuning_session':
LOG.debug("Background tasks haven't ran for this workload yet, picking random data.") LOG.debug("Background tasks haven't ran for this workload yet, picking random data.")
@ -219,6 +239,39 @@ def gen_random_data(knobs):
return random_knob_result return random_knob_result
def gen_lhs_samples(knobs, nsamples):
names = []
maxvals = []
minvals = []
types = []
for knob in knobs:
names.append(knob['name'])
maxvals.append(float(knob['maxval']))
minvals.append(float(knob['minval']))
types.append(knob['vartype'])
nfeats = len(knobs)
samples = lhs(nfeats, samples=nsamples, criterion='maximin')
maxvals = np.array(maxvals)
minvals = np.array(minvals)
scales = maxvals - minvals
for fidx in range(nfeats):
samples[:, fidx] = uniform(loc=minvals[fidx], scale=scales[fidx]).ppf(samples[:, fidx])
lhs_samples = []
for sidx in range(nsamples):
lhs_samples.append(dict())
for fidx in range(nfeats):
if types[fidx] == VarType.INTEGER:
lhs_samples[-1][names[fidx]] = int(round(samples[sidx][fidx]))
elif types[fidx] == VarType.REAL:
lhs_samples[-1][names[fidx]] = float(samples[sidx][fidx])
else:
LOG.debug("LHS type not supported: %s", types[fidx])
return lhs_samples
@task(base=TrainDDPG, name='train_ddpg') @task(base=TrainDDPG, name='train_ddpg')
def train_ddpg(result_id): def train_ddpg(result_id):
LOG.info('Add training data to ddpg and train ddpg') LOG.info('Add training data to ddpg and train ddpg')