diff --git a/server/analysis/ddpg/ddpg.py b/server/analysis/ddpg/ddpg.py index 0f2ac45..0cebf88 100644 --- a/server/analysis/ddpg/ddpg.py +++ b/server/analysis/ddpg/ddpg.py @@ -32,6 +32,7 @@ class Actor(nn.Module): nn.Linear(128, 128), nn.Tanh(), nn.Dropout(0.3), + nn.BatchNorm1d(128), nn.Linear(128, 64), nn.Tanh(), @@ -99,7 +100,7 @@ class Critic(nn.Module): class DDPG(object): def __init__(self, n_states, n_actions, model_name='', alr=0.001, clr=0.001, - gamma=0.9, batch_size=32, tau=0.002, memory_size=100000): + gamma=0.9, batch_size=32, tau=0.002, shift=0, memory_size=100000): self.n_states = n_states self.n_actions = n_actions self.alr = alr @@ -108,6 +109,7 @@ class DDPG(object): self.batch_size = batch_size self.gamma = gamma self.tau = tau + self.shift = shift self._build_network() @@ -184,7 +186,7 @@ class DDPG(object): target_next_actions = self.target_actor(batch_next_states).detach() target_next_value = self.target_critic(batch_next_states, target_next_actions).detach() current_value = self.critic(batch_states, batch_actions) - next_value = batch_rewards + target_next_value * self.gamma + next_value = batch_rewards + target_next_value * self.gamma + self.shift # update prioritized memory if isinstance(self.replay_memory, PrioritizedReplayMemory): diff --git a/server/analysis/simulation.py b/server/analysis/simulation.py index 9d7192f..90fc285 100644 --- a/server/analysis/simulation.py +++ b/server/analysis/simulation.py @@ -18,6 +18,7 @@ import torch sys.path.append("../") from analysis.util import get_analysis_logger # noqa from analysis.ddpg.ddpg import DDPG # noqa +from analysis.ddpg.ou_process import OUProcess # noqa from analysis.gp_tf import GPRGD # noqa from analysis.nn_tf import NeuralNet # noqa @@ -25,10 +26,15 @@ LOG = get_analysis_logger(__name__) class Environment(object): - def __init__(self, n_knob, n_metric, mode=0): - self.knob_dim = n_knob - self.metric_dim = n_metric - self.mode = mode + def __init__(self, knob_dim, metric_dim, modes=[0], reward_variance=0, + metrics_variance=0.2): + self.knob_dim = knob_dim + self.metric_dim = metric_dim + self.modes = modes + self.mode = np.random.choice(self.modes) + self.counter = 0 + self.reward_variance = reward_variance + self.metrics_variance = metrics_variance def identity_sqrt(self, knob_data): n1 = self.knob_dim // 4 @@ -36,8 +42,15 @@ class Environment(object): part1 = np.sum(knob_data[0: n1]) part2 = np.sum(np.sqrt(knob_data[n1: n1 + n2])) reward = np.array([part1 + part2]) / (self.knob_dim // 2) - metric_data = np.zeros(self.metric_dim) - return reward, metric_data + return reward + + def threshold(self, knob_data): + n1 = self.knob_dim // 4 + n2 = self.knob_dim // 4 + part1 = np.sum(knob_data[0: n1] > 0.9) + part2 = np.sum(knob_data[n1: n1 + n2] < 0.1) + reward = np.array([part1 + part2]) / (self.knob_dim // 2) + return reward def borehole(self, knob_data): # ref: http://www.sfu.ca/~ssurjano/borehole.html @@ -52,48 +65,64 @@ class Environment(object): Kw = knob_data[7] * (12045 - 9855) + 9855 frac = 2 * L * Tu / (np.log(r / rw) * rw ** 2 * Kw) - reward = 2 * np.pi * Tu * (Hu - Hl) / (np.log(r / rw) * (1 + frac + Tu / Tl)) - return np.array([reward]), np.zeros(self.metric_dim) + reward = 2 * np.pi * Tu * (Hu - Hl) / (np.log(r / rw) * (1 + frac + Tu / Tl)) / 310 + return np.array([reward]) - def threshold(self, knob_data): - n1 = self.knob_dim // 4 - n2 = self.knob_dim // 4 - part1 = np.sum(knob_data[0: n1] > 0.9) - part2 = np.sum(knob_data[n1: n1 + n2] < 0.1) - reward = np.array([part1 + part2]) - metric_data = np.zeros(self.metric_dim) - return reward, metric_data + def get_metrics(self, mode): + metrics = np.ones(self.metric_dim) * mode + metrics += np.random.rand(self.metric_dim) * self.metrics_variance + return metrics + + def simulate_mode(self, knob_data, mode): + if mode == 0: + reward = self.identity_sqrt(knob_data) + elif mode == 1: + reward = self.threshold(knob_data) + elif mode == 2: + reward = np.zeros(1) + for i in range(0, len(knob_data), 8): + reward += self.borehole(knob_data[i: i+8])[0] / len(knob_data) * 8 + reward = reward * (1.0 + self.reward_variance * np.random.rand(1)[0]) + return reward, self.get_metrics(mode) def simulate(self, knob_data): - if self.mode == 0: - return self.identity_sqrt(knob_data) - elif self.mode == 1: - return self.threshold(knob_data) - elif self.mode == 2: - return self.borehole(knob_data) + self.counter += 1 + k = 1 + # every k runs, sample a new workload + if self.counter >= k: + self.counter = 0 + self.mode = np.random.choice(self.modes) + return self.simulate_mode(knob_data, self.mode) -def ddpg(env, config, n_loops=1000): +def ddpg(env, config, n_loops=100): results = [] x_axis = [] gamma = config['gamma'] tau = config['tau'] - lr = config['lr'] - batch_size = config['batch_size'] + a_lr = config['a_lr'] + c_lr = config['c_lr'] n_epochs = config['n_epochs'] model_ddpg = DDPG(n_actions=env.knob_dim, n_states=env.metric_dim, gamma=gamma, tau=tau, - clr=lr, alr=lr, batch_size=batch_size) + clr=c_lr, alr=a_lr) knob_data = np.random.rand(env.knob_dim) prev_metric_data = np.zeros(env.metric_dim) + for i in range(n_loops): reward, metric_data = env.simulate(knob_data) - model_ddpg.add_sample(prev_metric_data, knob_data, reward, metric_data) + if i > 0: + model_ddpg.add_sample(prev_metric_data, prev_knob_data, prev_reward, metric_data) + prev_metric_data = metric_data + prev_knob_data = knob_data + prev_reward = reward + if i == 0: + continue for _ in range(n_epochs): model_ddpg.update() results.append(reward) - x_axis.append(i) - prev_metric_data = metric_data - knob_data = model_ddpg.choose_action(prev_metric_data) + x_axis.append(i+1) + LOG.info('loop: %d reward: %f', i, reward[0]) + knob_data = model_ddpg.choose_action(metric_data) return np.array(results), np.array(x_axis) @@ -116,8 +145,10 @@ def dnn(env, config, n_loops=100): x_axis = [] memory = ReplayMemory() num_samples = config['num_samples'] + ou_process = config['ou_process'] Xmin = np.zeros(env.knob_dim) Xmax = np.ones(env.knob_dim) + noise = OUProcess(env.knob_dim) for i in range(n_loops): X_samples = np.random.rand(num_samples, env.knob_dim) if i >= 10: @@ -128,23 +159,26 @@ def dnn(env, config, n_loops=100): X_samples = np.vstack((X_samples, np.array(entry[0]))) model_nn = NeuralNet(n_input=X_samples.shape[1], batch_size=X_samples.shape[0], - explore_iters=500, + learning_rate=0.01, + explore_iters=100, noise_scale_begin=0.1, noise_scale_end=0.0, debug=False, debug_interval=100) if i >= 5: actions, rewards = memory.get_all() - model_nn.fit(np.array(actions), -np.array(rewards), fit_epochs=500) - res = model_nn.recommend(X_samples, Xmin, Xmax, - explore=500, recommend_epochs=500) + model_nn.fit(np.array(actions), -np.array(rewards), fit_epochs=50) + res = model_nn.recommend(X_samples, Xmin, Xmax, recommend_epochs=10, explore=False) best_config_idx = np.argmin(res.minl.ravel()) best_config = res.minl_conf[best_config_idx, :] + if ou_process: + best_config += noise.noise() + best_config = best_config.clip(0, 1) reward, _ = env.simulate(best_config) memory.push(best_config, reward) LOG.info('loop: %d reward: %f', i, reward[0]) results.append(reward) - x_axis.append(i) + x_axis.append(i+1) return np.array(results), np.array(x_axis) @@ -152,16 +186,17 @@ def gprgd(env, config, n_loops=100): results = [] x_axis = [] memory = ReplayMemory() + num_collections = config['num_collections'] num_samples = config['num_samples'] X_min = np.zeros(env.knob_dim) X_max = np.ones(env.knob_dim) - for _ in range(5): + for _ in range(num_collections): action = np.random.rand(env.knob_dim) reward, _ = env.simulate(action) memory.push(action, reward) for i in range(n_loops): X_samples = np.random.rand(num_samples, env.knob_dim) - if i >= 5: + if i >= 10: actions, rewards = memory.get_all() tuples = tuple(zip(actions, rewards)) top10 = heapq.nlargest(10, tuples, key=lambda e: e[1]) @@ -171,13 +206,13 @@ def gprgd(env, config, n_loops=100): X_samples = np.vstack((X_samples, np.array(entry[0]) * 0.97 + 0.01)) model = GPRGD(length_scale=1.0, magnitude=1.0, - max_train_size=7000, - batch_size=3000, + max_train_size=100, + batch_size=100, num_threads=4, learning_rate=0.01, epsilon=1e-6, max_iter=500, - sigma_multiplier=3.0, + sigma_multiplier=30.0, mu_multiplier=1.0) actions, rewards = memory.get_all() @@ -193,62 +228,69 @@ def gprgd(env, config, n_loops=100): return np.array(results), np.array(x_axis) -def plotlines(x_axis, results, labels, title, path): +def plotlines(xs, results, labels, title, path): if plt: - for result, label in zip(results, labels): - plt.plot(x_axis, result, label=label) - plt.legend() - plt.xlabel("loops") - plt.ylabel("rewards") - plt.title(title) + figsize = 13, 10 + figure, ax = plt.subplots(figsize=figsize) + lines = [] + N = 20 + weights = np.ones(N) + for x_axis, result, label in zip(xs, results, labels): + result = np.convolve(weights/weights.sum(), result.flatten())[N-1:-N+1] + lines.append(plt.plot(x_axis[:-N+1], result, label=label, lw=4)[0]) + plt.legend(handles=lines, fontsize=30) + plt.title(title, fontsize=25) + plt.xticks(fontsize=25) + plt.yticks(fontsize=25) + ax.set_xlabel("loops", fontsize=30) + ax.set_ylabel("rewards", fontsize=30) plt.savefig(path) plt.clf() -def run(tuners, configs, labels, knob_dim, metric_dim, mode, n_loops, n_repeats): +def run(tuners, configs, labels, title, env, n_loops, n_repeats): if not plt: LOG.info("Cannot import matplotlib. Will write results to files instead of figures.") random.seed(0) - np.random.seed(0) + np.random.seed(1) torch.manual_seed(0) - env = Environment(knob_dim, metric_dim, mode=mode) results = [] - for i in range(n_repeats): - for j, _ in enumerate(tuners): + xs = [] + for j, _ in enumerate(tuners): + for i in range(n_repeats[j]): result, x_axis = tuners[j](env, configs[j], n_loops=n_loops) if i is 0: - results.append(result / n_repeats) + results.append(result / n_repeats[j]) + xs.append(x_axis) else: - results[j] += result / n_repeats - - title = "mode_{}_knob_{}".format(mode, knob_dim) + results[j] += result / n_repeats[j] if plt: - if not os.path.exists("figures"): - os.mkdir("figures") - filename = "figures/{}.pdf".format(title) - plotlines(x_axis, results, labels, title, filename) + if not os.path.exists("simulation_figures"): + os.mkdir("simulation_figures") + filename = "simulation_figures/{}.pdf".format(title) + plotlines(xs, results, labels, title, filename) + if not os.path.exists("simulation_results"): + os.mkdir("simulation_results") for j in range(len(tuners)): - with open(title + '_' + labels[j] + '.csv', 'w') as f: - for i, result in zip(x_axis, results[j]): + with open("simulation_results/" + title + '_' + labels[j] + '.csv', 'w') as f: + for i, result in zip(xs[j], results[j]): f.write(str(i) + ',' + str(result[0]) + '\n') def main(): - knob_dim = 192 - metric_dim = 60 - mode = 0 - n_loops = 2 - n_repeats = 1 - configs = [{'gamma': 0., 'tau': 0.002, 'lr': 0.001, 'batch_size': 32, 'n_epochs': 30}, - {'gamma': 0.99, 'tau': 0.002, 'lr': 0.001, 'batch_size': 32, 'n_epochs': 30}, - {'num_samples': 30}, - {'num_samples': 30}] - tuners = [ddpg, ddpg, dnn, gprgd] - labels = [tuner.__name__ for tuner in tuners] - labels[0] += '_gamma_0' - labels[1] += '_gamma_99' - run(tuners, configs, labels, knob_dim, metric_dim, mode, n_loops, n_repeats) + env = Environment(knob_dim=192, metric_dim=60, modes=[0, 1], reward_variance=0.05) + n_loops = 2000 + configs = [{'gamma': 0, 'tau': 0.002, 'a_lr': 0.01, 'c_lr': 0.01, 'n_epochs': 1}, + {'gamma': 0, 'tau': 0.002, 'a_lr': 0.01, 'c_lr': 0.001, 'n_epochs': 1}, + {'gamma': 0., 'tau': 0.002, 'a_lr': 0.001, 'c_lr': 0.001, 'n_epochs': 1}, + # {'num_samples': 100, 'ou_process': False}, + ] + tuners = [ddpg, ddpg, ddpg] + labels = ['1', '2', '3'] + title = 'varing_workloads' + n_repeats = [3, 3, 3] + run(tuners, configs, labels, title, env, n_loops, n_repeats) if __name__ == '__main__': diff --git a/server/website/website/settings/constants.py b/server/website/website/settings/constants.py index fd040e8..c52796f 100644 --- a/server/website/website/settings/constants.py +++ b/server/website/website/settings/constants.py @@ -80,4 +80,4 @@ DDPG_BATCH_SIZE = 32 ACTOR_LEARNING_RATE = 0.01 # Learning rate of critic network -CRITIC_LEARNING_RATE = 0.01 +CRITIC_LEARNING_RATE = 0.001 diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 72feb44..322348d 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -338,7 +338,8 @@ def train_ddpg(result_id): if session.ddpg_reply_memory: ddpg.replay_memory.set(session.ddpg_reply_memory) ddpg.add_sample(normalized_metric_data, knob_data, reward, normalized_metric_data) - ddpg.update() + for _ in range(25): + ddpg.update() session.ddpg_actor_model, session.ddpg_critic_model = ddpg.get_model() session.ddpg_reply_memory = ddpg.replay_memory.get() session.save()