- website.celery: create celery app
- website.admin: do not unregister the djcelery models
- website.settings: added some new celery settings and updated others. Added 'celery.tasks' to our logging config which resolved the missing celery log messages issue
- website.async_tasks: removed redundant abstract task classes. Removed the settings that configured retries since do not handle retries in the tasks
This commit is contained in:
dvanaken 2020-02-06 14:06:30 -05:00 committed by Dana Van Aken
parent 7339d07a98
commit d9e2806b9e
7 changed files with 81 additions and 75 deletions

View File

@ -3,3 +3,6 @@
#
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
#
from __future__ import absolute_import
from .celery import app as celery_app # noqa

View File

@ -204,19 +204,3 @@ admin.site.register(ExecutionTime, ExecutionTimeAdmin)
admin.site.unregister(StatusLog)
admin.site.register(StatusLog, CustomStatusLogAdmin)
admin.site.register(djcelery_models.TaskMeta, TaskMetaAdmin)
# Unregister empty djcelery models
UNUSED_DJCELERY_MODELS = (
djcelery_models.CrontabSchedule,
djcelery_models.IntervalSchedule,
djcelery_models.PeriodicTask,
djcelery_models.TaskState,
djcelery_models.WorkerState,
)
try:
for model in UNUSED_DJCELERY_MODELS:
if model.objects.count() == 0:
admin.site.unregister(model)
except ProgrammingError:
pass

View File

@ -0,0 +1,22 @@
from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')
from django.conf import settings # noqa, pylint: disable=wrong-import-position
app = Celery('website') # pylint: disable=invalid-name
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

View File

@ -9,8 +9,9 @@ Common Django settings for the OtterTune project.
"""
import os
from os.path import abspath, dirname, exists, join
import sys
from datetime import timedelta
from os.path import abspath, dirname, exists, join
import djcelery
@ -211,14 +212,33 @@ BROKER_URL = 'amqp://guest:guest@localhost:5672//'
# task is executed by a worker.
CELERY_TRACK_STARTED = True
# Do not let celery take over the root logger
CELERYD_HIJACK_ROOT_LOGGER = False
# Store celery results in the database
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# The celerybeat scheduler class
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# Defines the periodic task schedule for celerybeat
CELERYBEAT_SCHEDULE = {
'run-every-5m': {
'task': 'run_background_tasks',
'schedule': timedelta(minutes=5),
}
}
# The Celery documentation recommends disabling the rate limits
# if no tasks are using them
CELERY_DISABLE_RATE_LIMITS = True
# Worker will execute at most this many tasks before it's killed
# and replaced with a new worker. This helps with memory leaks.
CELERYD_MAX_TASKS_PER_CHILD = 50
CELERYD_MAX_TASKS_PER_CHILD = 20
# Number of concurrent workers.
CELERYD_CONCURRENCY = 8
CELERYD_HIJACK_ROOT_LOGGER = False
# Number of concurrent workers. Defaults to the number of CPUs.
# CELERYD_CONCURRENCY = 8
djcelery.setup_loader()
@ -306,6 +326,11 @@ LOGGING = {
'level': 'DEBUG',
'propogate': True,
},
'celery.task': {
'handlers': ['celery', 'dblog'],
'level': 'DEBUG',
'propogate': True,
},
# Uncomment to email admins after encountering an error (and debug=False)
# 'django.request': {
# 'handlers': ['mail_admins'],

View File

@ -8,7 +8,3 @@
# address categorical knobs (enum, boolean)
ENABLE_DUMMY_ENCODER = False
# ---PIPELINE CONSTANTS---
# how often to run the background tests, in seconds
RUN_EVERY = 300

View File

@ -11,7 +11,7 @@ import gpflow
from pyDOE import lhs
from scipy.stats import uniform
from celery.task import task, Task
from celery import shared_task, Task
from celery.utils.log import get_task_logger
from djcelery.models import TaskMeta
from sklearn.preprocessing import StandardScaler, MinMaxScaler
@ -36,41 +36,33 @@ from website.settings import ENABLE_DUMMY_ENCODER
LOG = get_task_logger(__name__)
class UpdateTask(Task): # pylint: disable=abstract-method
class BaseTask(Task): # pylint: disable=abstract-method
abstract = True
def __init__(self):
self.rate_limit = '50/m'
self.max_retries = 3
self.default_retry_delay = 60
self.max_retries = 0
class TrainDDPG(UpdateTask): # pylint: disable=abstract-method
class IgnoreResultTask(BaseTask): # pylint: disable=abstract-method
abstract = True
def on_success(self, retval, task_id, args, kwargs):
super(TrainDDPG, self).on_success(retval, task_id, args, kwargs)
super().on_success(retval, task_id, args, kwargs)
# Completely delete this result because it's huge and not
# interesting
# Completely delete this result because it's huge and not interesting
task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = None
task_meta.save()
class AggregateTargetResults(UpdateTask): # pylint: disable=abstract-method
class MapWorkloadTask(BaseTask): # pylint: disable=abstract-method
abstract = True
def on_success(self, retval, task_id, args, kwargs):
super(AggregateTargetResults, self).on_success(retval, task_id, args, kwargs)
super().on_success(retval, task_id, args, kwargs)
# Completely delete this result because it's huge and not
# interesting
task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = None
task_meta.save()
class MapWorkload(UpdateTask): # pylint: disable=abstract-method
def on_success(self, retval, task_id, args, kwargs):
super(MapWorkload, self).on_success(retval, task_id, args, kwargs)
new_res = None
# Replace result with formatted result
if not args[0][0]['bad']:
@ -78,22 +70,8 @@ class MapWorkload(UpdateTask): # pylint: disable=abstract-method
'scores': sorted(args[0][0]['scores'].items()),
'mapped_workload_id': args[0][0]['mapped_workload'],
}
task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = new_res # Only store scores
task_meta.save()
else:
task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = None
task_meta.save()
class ConfigurationRecommendation(UpdateTask): # pylint: disable=abstract-method
def on_success(self, retval, task_id, args, kwargs):
super(ConfigurationRecommendation, self).on_success(retval, task_id, args, kwargs)
task_meta = TaskMeta.objects.get(task_id=task_id)
task_meta.result = retval
task_meta.result = new_res
task_meta.save()
@ -183,7 +161,7 @@ def clean_metric_data(metric_matrix, metric_labels, session):
return matrix, metric_labels
@task(base=AggregateTargetResults, name='aggregate_target_results')
@shared_task(base=IgnoreResultTask, name='aggregate_target_results')
def aggregate_target_results(result_id, algorithm):
# Check that we've completed the background tasks at least once. We need
# this data in order to make a configuration recommendation (until we
@ -318,7 +296,7 @@ def gen_lhs_samples(knobs, nsamples):
return lhs_samples
@task(base=TrainDDPG, name='train_ddpg')
@shared_task(base=IgnoreResultTask, name='train_ddpg')
def train_ddpg(result_id):
LOG.info('Add training data to ddpg and train ddpg')
result = Result.objects.get(pk=result_id)
@ -439,7 +417,7 @@ def create_and_save_recommendation(recommended_knobs, result, status, **kwargs):
return retval
@task(base=ConfigurationRecommendation, name='configuration_recommendation_ddpg')
@shared_task(base=BaseTask, name='configuration_recommendation_ddpg')
def configuration_recommendation_ddpg(result_info): # pylint: disable=invalid-name
LOG.info('Use ddpg to recommend configuration')
result_id = result_info['newest_result_id']
@ -659,7 +637,7 @@ def combine_workload(target_data):
dummy_encoder, constraint_helper
@task(base=ConfigurationRecommendation, name='configuration_recommendation')
@shared_task(base=BaseTask, name='configuration_recommendation')
def configuration_recommendation(recommendation_input):
target_data, algorithm = recommendation_input
LOG.info('configuration_recommendation called')
@ -672,15 +650,14 @@ def configuration_recommendation(recommendation_input):
recommended_knobs=target_data['config_recommend'], result=newest_result,
status='bad', info='WARNING: no training data, the config is generated randomly',
pipeline_run=target_data['pipeline_run'])
LOG.debug('%s: Skipping configuration recommendation.\n\ndata=%s\n',
AlgorithmType.name(algorithm), JSONUtil.dumps(target_data, pprint=True))
LOG.debug('%s: Skipping configuration recommendation.\nData:\n%s\n\n',
AlgorithmType.name(algorithm), target_data)
return target_data_res
X_columnlabels, X_scaler, X_scaled, y_scaled, X_max, X_min,\
dummy_encoder, constraint_helper = combine_workload(target_data)
# FIXME: we should generate more samples and use a smarter sampling
# technique
# FIXME: we should generate more samples and use a smarter sampling technique
num_samples = params['NUM_SAMPLES']
X_samples = np.empty((num_samples, X_scaled.shape[1]))
for i in range(X_scaled.shape[1]):
@ -799,7 +776,7 @@ def load_data_helper(filtered_pipeline_data, workload, task_type):
return JSONUtil.loads(pipeline_data.data)
@task(base=MapWorkload, name='map_workload')
@shared_task(base=MapWorkloadTask, name='map_workload')
def map_workload(map_workload_input):
target_data, algorithm = map_workload_input

View File

@ -6,7 +6,7 @@
import copy
import numpy as np
from celery.task import periodic_task
from celery import shared_task
from celery.utils.log import get_task_logger
from django.utils.timezone import now
from sklearn.preprocessing import StandardScaler
@ -18,7 +18,7 @@ from analysis.preprocessing import (Bin, get_shuffle_indices,
DummyEncoder,
consolidate_columnlabels)
from website.models import PipelineData, PipelineRun, Result, Workload
from website.settings import RUN_EVERY, ENABLE_DUMMY_ENCODER
from website.settings import ENABLE_DUMMY_ENCODER
from website.types import PipelineTaskType, WorkloadStatusType
from website.utils import DataUtil, JSONUtil
@ -28,8 +28,7 @@ LOG = get_task_logger(__name__)
MIN_WORKLOAD_RESULTS_COUNT = 5
# Run the background tasks every 'RUN_EVERY' seconds
@periodic_task(run_every=RUN_EVERY, name="run_background_tasks")
@shared_task(name="run_background_tasks")
def run_background_tasks():
LOG.debug("Starting background tasks")
# Find modified and not modified workloads, we only have to calculate for the