check celery status before running tasks
This commit is contained in:
		
							parent
							
								
									42fc5ebe3e
								
							
						
					
					
						commit
						d0cf8b597d
					
				|  | @ -6,5 +6,8 @@ | |||
| 
 | ||||
| # These parameters are not specified for any session, so they can only be set here | ||||
| 
 | ||||
| # If this flag is set, we check if celery is running, and restart celery if it is not. | ||||
| CHECK_CELERY = True | ||||
| 
 | ||||
| # address categorical knobs (enum, boolean) | ||||
| ENABLE_DUMMY_ENCODER = False | ||||
|  |  | |||
|  | @ -17,6 +17,7 @@ from random import choice | |||
| 
 | ||||
| import numpy as np | ||||
| from django.contrib.auth.models import User | ||||
| from django.core.management import call_command | ||||
| from django.db.models import Case, When | ||||
| from django.utils.text import capfirst | ||||
| from django_db_logger.models import StatusLog | ||||
|  | @ -494,3 +495,23 @@ def model_to_dict2(m, exclude=None): | |||
|         if fname not in exclude: | ||||
|             d[fname] = getattr(m, fname, None) | ||||
|     return d | ||||
| 
 | ||||
| 
 | ||||
| def check_and_run_celery(): | ||||
|     celery_status = os.popen('python manage.py celery inspect ping').read() | ||||
|     if 'OK' in celery_status: | ||||
|         return 'celery is running' | ||||
| 
 | ||||
|     retries = 0 | ||||
|     while retries < 5: | ||||
|         LOG.warning('Celery is not running.') | ||||
|         retries += 1 | ||||
|         call_command('stopcelery') | ||||
|         os.popen('python manage.py startcelery &') | ||||
|         time.sleep(5 * retries) | ||||
|         celery_status = os.popen('python manage.py celery inspect ping').read() | ||||
|         if 'OK' in celery_status: | ||||
|             LOG.info('Successfully start celery.') | ||||
|             return 'celery stopped but is restarted successfully' | ||||
|     LOG.warning('Cannot restart celery.') | ||||
|     return 'celery stopped and cannot be restarted' | ||||
|  |  | |||
|  | @ -45,7 +45,7 @@ from .tasks import (aggregate_target_results, map_workload, train_ddpg, | |||
| from .types import (DBMSType, KnobUnitType, MetricType, | ||||
|                     TaskType, VarType, WorkloadStatusType, AlgorithmType) | ||||
| from .utils import (JSONUtil, LabelUtil, MediaUtil, TaskUtil) | ||||
| from .settings import LOG_DIR, TIME_ZONE | ||||
| from .settings import LOG_DIR, TIME_ZONE, CHECK_CELERY | ||||
| 
 | ||||
| from .set_default_knobs import set_default_knobs | ||||
| 
 | ||||
|  | @ -635,6 +635,9 @@ def handle_result_files(session, files, execution_times=None): | |||
|     if session.tuning_session == 'no_tuning_session': | ||||
|         return HttpResponse("Result stored successfully!") | ||||
| 
 | ||||
|     celery_status = 'celery status is unknown' | ||||
|     if CHECK_CELERY: | ||||
|         celery_status = utils.check_and_run_celery() | ||||
|     result_id = result.pk | ||||
|     response = None | ||||
|     if session.algorithm == AlgorithmType.GPR: | ||||
|  | @ -683,8 +686,8 @@ def handle_result_files(session, files, execution_times=None): | |||
|         except Exception:  # pylint: disable=broad-except | ||||
|             LOG.warning("Error parsing execution times:\n%s", execution_times, exc_info=True) | ||||
| 
 | ||||
|     return HttpResponse("Result stored successfully! Running tuner...(status={})  Result ID:{} " | ||||
|                         .format(response.status, result_id)) | ||||
|     return HttpResponse("Result stored successfully! Running tuner...({}, status={}) Result ID:{}" | ||||
|                         .format(celery_status, response.status, result_id)) | ||||
| 
 | ||||
| 
 | ||||
| @login_required(login_url=reverse_lazy('login')) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue