diff --git a/server/website/website/__init__.py b/server/website/website/__init__.py index 4e851cb..0cadfa7 100644 --- a/server/website/website/__init__.py +++ b/server/website/website/__init__.py @@ -3,3 +3,6 @@ # # Copyright (c) 2017-18, Carnegie Mellon University Database Group # +from __future__ import absolute_import + +from .celery import app as celery_app # noqa diff --git a/server/website/website/admin.py b/server/website/website/admin.py index de5ddf5..6624ca2 100644 --- a/server/website/website/admin.py +++ b/server/website/website/admin.py @@ -204,19 +204,3 @@ admin.site.register(ExecutionTime, ExecutionTimeAdmin) admin.site.unregister(StatusLog) admin.site.register(StatusLog, CustomStatusLogAdmin) admin.site.register(djcelery_models.TaskMeta, TaskMetaAdmin) - -# Unregister empty djcelery models -UNUSED_DJCELERY_MODELS = ( - djcelery_models.CrontabSchedule, - djcelery_models.IntervalSchedule, - djcelery_models.PeriodicTask, - djcelery_models.TaskState, - djcelery_models.WorkerState, -) - -try: - for model in UNUSED_DJCELERY_MODELS: - if model.objects.count() == 0: - admin.site.unregister(model) -except ProgrammingError: - pass diff --git a/server/website/website/celery.py b/server/website/website/celery.py new file mode 100644 index 0000000..bdf4728 --- /dev/null +++ b/server/website/website/celery.py @@ -0,0 +1,22 @@ +from __future__ import absolute_import + +import os + +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings') + +from django.conf import settings # noqa, pylint: disable=wrong-import-position + +app = Celery('website') # pylint: disable=invalid-name + +# Using a string here means the worker will not have to +# pickle the object when using Windows. +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) + + +@app.task(bind=True) +def debug_task(self): + print('Request: {0!r}'.format(self.request)) diff --git a/server/website/website/settings/common.py b/server/website/website/settings/common.py index 4f53f3b..43e7628 100644 --- a/server/website/website/settings/common.py +++ b/server/website/website/settings/common.py @@ -9,8 +9,9 @@ Common Django settings for the OtterTune project. """ import os -from os.path import abspath, dirname, exists, join import sys +from datetime import timedelta +from os.path import abspath, dirname, exists, join import djcelery @@ -211,14 +212,33 @@ BROKER_URL = 'amqp://guest:guest@localhost:5672//' # task is executed by a worker. CELERY_TRACK_STARTED = True +# Do not let celery take over the root logger +CELERYD_HIJACK_ROOT_LOGGER = False + +# Store celery results in the database +CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' + +# The celerybeat scheduler class +CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' + +# Defines the periodic task schedule for celerybeat +CELERYBEAT_SCHEDULE = { + 'run-every-5m': { + 'task': 'run_background_tasks', + 'schedule': timedelta(minutes=5), + } +} + +# The Celery documentation recommends disabling the rate limits +# if no tasks are using them +CELERY_DISABLE_RATE_LIMITS = True + # Worker will execute at most this many tasks before it's killed # and replaced with a new worker. This helps with memory leaks. -CELERYD_MAX_TASKS_PER_CHILD = 50 +CELERYD_MAX_TASKS_PER_CHILD = 20 -# Number of concurrent workers. -CELERYD_CONCURRENCY = 8 - -CELERYD_HIJACK_ROOT_LOGGER = False +# Number of concurrent workers. Defaults to the number of CPUs. +# CELERYD_CONCURRENCY = 8 djcelery.setup_loader() @@ -306,6 +326,11 @@ LOGGING = { 'level': 'DEBUG', 'propogate': True, }, + 'celery.task': { + 'handlers': ['celery', 'dblog'], + 'level': 'DEBUG', + 'propogate': True, + }, # Uncomment to email admins after encountering an error (and debug=False) # 'django.request': { # 'handlers': ['mail_admins'], diff --git a/server/website/website/settings/constants.py b/server/website/website/settings/constants.py index 717c7b6..8a73d0c 100644 --- a/server/website/website/settings/constants.py +++ b/server/website/website/settings/constants.py @@ -8,7 +8,3 @@ # address categorical knobs (enum, boolean) ENABLE_DUMMY_ENCODER = False - -# ---PIPELINE CONSTANTS--- -# how often to run the background tests, in seconds -RUN_EVERY = 300 diff --git a/server/website/website/tasks/async_tasks.py b/server/website/website/tasks/async_tasks.py index 85c35d6..8277a22 100644 --- a/server/website/website/tasks/async_tasks.py +++ b/server/website/website/tasks/async_tasks.py @@ -11,7 +11,7 @@ import gpflow from pyDOE import lhs from scipy.stats import uniform -from celery.task import task, Task +from celery import shared_task, Task from celery.utils.log import get_task_logger from djcelery.models import TaskMeta from sklearn.preprocessing import StandardScaler, MinMaxScaler @@ -36,41 +36,33 @@ from website.settings import ENABLE_DUMMY_ENCODER LOG = get_task_logger(__name__) -class UpdateTask(Task): # pylint: disable=abstract-method +class BaseTask(Task): # pylint: disable=abstract-method + abstract = True def __init__(self): - self.rate_limit = '50/m' - self.max_retries = 3 - self.default_retry_delay = 60 + self.max_retries = 0 -class TrainDDPG(UpdateTask): # pylint: disable=abstract-method +class IgnoreResultTask(BaseTask): # pylint: disable=abstract-method + abstract = True + def on_success(self, retval, task_id, args, kwargs): - super(TrainDDPG, self).on_success(retval, task_id, args, kwargs) + super().on_success(retval, task_id, args, kwargs) - # Completely delete this result because it's huge and not - # interesting + # Completely delete this result because it's huge and not interesting task_meta = TaskMeta.objects.get(task_id=task_id) task_meta.result = None task_meta.save() -class AggregateTargetResults(UpdateTask): # pylint: disable=abstract-method +class MapWorkloadTask(BaseTask): # pylint: disable=abstract-method + abstract = True def on_success(self, retval, task_id, args, kwargs): - super(AggregateTargetResults, self).on_success(retval, task_id, args, kwargs) + super().on_success(retval, task_id, args, kwargs) - # Completely delete this result because it's huge and not - # interesting task_meta = TaskMeta.objects.get(task_id=task_id) - task_meta.result = None - task_meta.save() - - -class MapWorkload(UpdateTask): # pylint: disable=abstract-method - - def on_success(self, retval, task_id, args, kwargs): - super(MapWorkload, self).on_success(retval, task_id, args, kwargs) + new_res = None # Replace result with formatted result if not args[0][0]['bad']: @@ -78,22 +70,8 @@ class MapWorkload(UpdateTask): # pylint: disable=abstract-method 'scores': sorted(args[0][0]['scores'].items()), 'mapped_workload_id': args[0][0]['mapped_workload'], } - task_meta = TaskMeta.objects.get(task_id=task_id) - task_meta.result = new_res # Only store scores - task_meta.save() - else: - task_meta = TaskMeta.objects.get(task_id=task_id) - task_meta.result = None - task_meta.save() - -class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-method - - def on_success(self, retval, task_id, args, kwargs): - super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs) - - task_meta = TaskMeta.objects.get(task_id=task_id) - task_meta.result = retval + task_meta.result = new_res task_meta.save() @@ -183,7 +161,7 @@ def clean_metric_data(metric_matrix, metric_labels, session): return matrix, metric_labels -@task(base=AggregateTargetResults, name='aggregate_target_results') +@shared_task(base=IgnoreResultTask, name='aggregate_target_results') def aggregate_target_results(result_id, algorithm): # Check that we've completed the background tasks at least once. We need # this data in order to make a configuration recommendation (until we @@ -318,7 +296,7 @@ def gen_lhs_samples(knobs, nsamples): return lhs_samples -@task(base=TrainDDPG, name='train_ddpg') +@shared_task(base=IgnoreResultTask, name='train_ddpg') def train_ddpg(result_id): LOG.info('Add training data to ddpg and train ddpg') result = Result.objects.get(pk=result_id) @@ -439,7 +417,7 @@ def create_and_save_recommendation(recommended_knobs, result, status, **kwargs): return retval -@task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg') +@shared_task(base=BaseTask, name='configuration_recommendation_ddpg') def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name LOG.info('Use ddpg to recommend configuration') result_id = result_info['newest_result_id'] @@ -659,7 +637,7 @@ def combine_workload(target_data): dummy_encoder, constraint_helper -@task(base=ConfigurationRecommendation, name='configuration_recommendation') +@shared_task(base=BaseTask, name='configuration_recommendation') def configuration_recommendation(recommendation_input): target_data, algorithm = recommendation_input LOG.info('configuration_recommendation called') @@ -672,15 +650,14 @@ def configuration_recommendation(recommendation_input): recommended_knobs=target_data['config_recommend'], result=newest_result, status='bad', info='WARNING: no training data, the config is generated randomly', pipeline_run=target_data['pipeline_run']) - LOG.debug('%s: Skipping configuration recommendation.\n\ndata=%s\n', - AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True)) + LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n', + AlgorithmType.name(algorithm), target_data) return target_data_res X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\ dummy_encoder, constraint_helper = combine_workload(target_data) - # FIXME: we should generate more samples and use a smarter sampling - # technique + # FIXME: we should generate more samples and use a smarter sampling technique num_samples = params['NUM_SAMPLES'] X_samples = np.empty((num_samples, X_scaled.shape[1])) for i in range(X_scaled.shape[1]): @@ -799,7 +776,7 @@ def load_data_helper(filtered_pipeline_data, workload, task_type): return JSONUtil.loads(pipeline_data.data) -@task(base=MapWorkload, name='map_workload') +@shared_task(base=MapWorkloadTask, name='map_workload') def map_workload(map_workload_input): target_data, algorithm = map_workload_input diff --git a/server/website/website/tasks/periodic_tasks.py b/server/website/website/tasks/periodic_tasks.py index bb789fd..87cad2d 100644 --- a/server/website/website/tasks/periodic_tasks.py +++ b/server/website/website/tasks/periodic_tasks.py @@ -6,7 +6,7 @@ import copy import numpy as np -from celery.task import periodic_task +from celery import shared_task from celery.utils.log import get_task_logger from django.utils.timezone import now from sklearn.preprocessing import StandardScaler @@ -18,7 +18,7 @@ from analysis.preprocessing import (Bin, get_shuffle_indices, DummyEncoder, consolidate_columnlabels) from website.models import PipelineData, PipelineRun, Result, Workload -from website.settings import RUN_EVERY, ENABLE_DUMMY_ENCODER +from website.settings import ENABLE_DUMMY_ENCODER from website.types import PipelineTaskType, WorkloadStatusType from website.utils import DataUtil, JSONUtil @@ -28,8 +28,7 @@ LOG = get_task_logger(__name__) MIN_WORKLOAD_RESULTS_COUNT = 5 -# Run the background tasks every 'RUN_EVERY' seconds -@periodic_task(run_every=RUN_EVERY, name="run_background_tasks") +@shared_task(name="run_background_tasks") def run_background_tasks(): LOG.debug("Starting background tasks") # Find modified and not modified workloads, we only have to calculate for the