From b215b156a4649cfe858e54e1a4d5ecd8c7130795 Mon Sep 17 00:00:00 2001 From: arifiorino Date: Mon, 21 Oct 2019 20:58:13 +0000 Subject: [PATCH] Moved LHS to Server --- client/driver/fabfile.py | 2 +- server/website/website/models.py | 4 +- server/website/website/tasks/async_tasks.py | 55 ++++++++++++++++++++- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/client/driver/fabfile.py b/client/driver/fabfile.py index d69d4b3..b537d77 100644 --- a/client/driver/fabfile.py +++ b/client/driver/fabfile.py @@ -35,7 +35,7 @@ RELOAD_INTERVAL = 10 # maximum disk usage MAX_DISK_USAGE = 90 # Postgres datadir -PG_DATADIR = '/var/lib/postgresql/11/main' +PG_DATADIR = '/var/lib/postgresql/9.6/main' # Load config with open('driver_config.json', 'r') as _f: diff --git a/server/website/website/models.py b/server/website/website/models.py index 61503ce..b75be27 100644 --- a/server/website/website/models.py +++ b/server/website/website/models.py @@ -134,7 +134,8 @@ class Session(BaseModel): TUNING_OPTIONS = OrderedDict([ ("tuning_session", "Tuning Session"), ("no_tuning_session", "No Tuning"), - ("randomly_generate", "Randomly Generate") + ("randomly_generate", "Randomly Generate"), + ("lhs", "Run LHS") ]) user = models.ForeignKey(User) @@ -144,6 +145,7 @@ class Session(BaseModel): hardware = models.ForeignKey(Hardware) algorithm = models.IntegerField(choices=AlgorithmType.choices(), default=AlgorithmType.GPR) + lhs_samples = models.TextField(default="[]") ddpg_actor_model = models.BinaryField(null=True, blank=True) ddpg_critic_model = models.BinaryField(null=True, blank=True) ddpg_reply_memory = models.BinaryField(null=True, blank=True) diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 483bcce..24fd993 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -6,6 +6,8 @@ import random import queue import numpy as np +from pyDOE import lhs +from scipy.stats import uniform from celery.task import task, Task from celery.utils.log import get_task_logger @@ -147,7 +149,25 @@ def aggregate_target_results(result_id, algorithm): # implement a sampling technique to generate new training data). newest_result = Result.objects.get(pk=result_id) has_pipeline_data = PipelineData.objects.filter(workload=newest_result.workload).exists() - if not has_pipeline_data or newest_result.session.tuning_session == 'randomly_generate': + if newest_result.session.tuning_session == 'lhs': + all_samples = JSONUtil.loads(newest_result.session.lhs_samples) + if len(all_samples) == 0: + knobs = SessionKnob.objects.get_knobs_for_session(newest_result.session) + all_samples = gen_lhs_samples(knobs, 100) + LOG.debug('%s: Generated LHS.\n\ndata=%s\n', + AlgorithmType.name(algorithm), JSONUtil.dumps(all_samples[:5], pprint=True)) + samples = all_samples.pop() + result = Result.objects.filter(pk=result_id) + agg_data = DataUtil.aggregate_data(result) + agg_data['newest_result_id'] = result_id + agg_data['bad'] = True + agg_data['config_recommend'] = samples + newest_result.session.lhs_samples = JSONUtil.dumps(all_samples) + newest_result.session.save() + LOG.debug('%s: Got LHS config.\n\ndata=%s\n', + AlgorithmType.name(algorithm), JSONUtil.dumps(agg_data, pprint=True)) + + elif not has_pipeline_data or newest_result.session.tuning_session == 'randomly_generate': if not has_pipeline_data and newest_result.session.tuning_session == 'tuning_session': LOG.debug("Background tasks haven't ran for this workload yet, picking random data.") @@ -219,6 +239,39 @@ def gen_random_data(knobs): return random_knob_result +def gen_lhs_samples(knobs, nsamples): + names = [] + maxvals = [] + minvals = [] + types = [] + + for knob in knobs: + names.append(knob['name']) + maxvals.append(float(knob['maxval'])) + minvals.append(float(knob['minval'])) + types.append(knob['vartype']) + + nfeats = len(knobs) + samples = lhs(nfeats, samples=nsamples, criterion='maximin') + maxvals = np.array(maxvals) + minvals = np.array(minvals) + scales = maxvals - minvals + for fidx in range(nfeats): + samples[:, fidx] = uniform(loc=minvals[fidx], scale=scales[fidx]).ppf(samples[:, fidx]) + lhs_samples = [] + for sidx in range(nsamples): + lhs_samples.append(dict()) + for fidx in range(nfeats): + if types[fidx] == VarType.INTEGER: + lhs_samples[-1][names[fidx]] = int(round(samples[sidx][fidx])) + elif types[fidx] == VarType.REAL: + lhs_samples[-1][names[fidx]] = float(samples[sidx][fidx]) + else: + LOG.debug("LHS type not supported: %s", types[fidx]) + + return lhs_samples + + @task(base=TrainDDPG, name='train_ddpg') def train_ddpg(result_id): LOG.info('Add training data to ddpg and train ddpg')