Forums

Celery tasks not picking up the changes in the local git repo.

Hi everyone.

I am using celery tasks to implement a code which picks up the git code for execution. The task gets the code from a git repo (Master branch) clone it into the machine in which celery is running and executes the code. If a local copy of the code is present on the machine then the task executes the local copy or if there are any changes in the master branch it pull the changes and then executes it.

The problem which i am facing currently is that if i make a change to the repo locally and execute the task, it is not picking up the changes made locally. If i restart celery only then, i am able to execute the locally modified code.

My celery settings.

celery.py

"""Celery entrypoint"""

from celery import Celery
from waves.wavequeue import celery_config

app = Celery()
app.config_from_object(celery_config)

celery_config.py

"""Celery configuration"""

broker_url = 'redis://localhost'
result_backend = 'redis://localhost'

include = ['waves.core.execute', 'waves.waveengine.tasks']
worker_redirect_stdouts = False
result_expires = 3600
worker_hijack_root_logger = True

# Fix Hard time limit (300.0s) exceeded for task
task_time_limit = 86400  # 1 day
task_soft_time_limit = 86400  # 1 day
task_serializer = 'json'

I have defined a custom task as below:

class WaveTask(celery.Task):

@abstractmethod
def run(self, *args, **kwargs):
    pass

def on_success(self, retval, task_id, args, kwargs):
    logger.info('on_success task_id=%s retval=%s args=%s kwargs=%s', task_id, retval, args, kwargs)

def on_retry(self, exc, task_id, args, kwargs, einfo):
    logger.error('on_retry task_id=%s exc=%s args=%s kwargs=%s einfo=%s', task_id, exc, args, kwargs, einfo)

def on_failure(self, exc, task_id, args, kwargs, einfo):
    logger.error('on_failure task_id=%s exc=%s args=%s kwargs=%s einfo=%s', task_id, exc, args, kwargs, einfo)

waveengine/tasks.py

from waves.waveengine.wave_engine import run_wavelet
from waves.wavequeue.celery import app
from waves.wavequeue.wave_task import WaveTask


@app.task(base=WaveTask)
def run_wavelet_async(instance_id, wavelet_id, last_wavelet_id, wavelet: dict) -> bool:
return run_wavelet(instance_id, wavelet_id, last_wavelet_id, wavelet)


@app.task(base=WaveTask)
def run_wavelet_sync(instance_id, wavelet_id, last_wavelet_id, wavelet: dict) -> bool:
return run_wavelet(instance_id, wavelet_id, last_wavelet_id, wavelet)

The above tasks internally run a function run_wavelet() function, which executes the picked up code.

Any help would be appreciated in fixing the above issue.

how are you running this on PythonAnywhere?