Scheduling thousands of one-off (non-reoccuring) tasks for near-simultaneous execution via Django-celery

Some context: I’m building a Django App that allows a user to pre-save an action, and schedule the exact date/time in the future they want said action to execute. E.g, scheduling a post to be programmatically pushed to ones Facebook wall next week at 5:30am.

I’m looking for a task scheduling system that could handle a thousand instances of a one-off task, all set to execute near-simultaneously (error margin plus or minus a minute).

I’m considering Django-celery/Rabbitmq for this, but I noticed the Celery docs do not address tasks meant for one-time use. Is Django-celery the right choice here (perhaps by subclassing CrontabSchedule) or is my energy better spent researching some other approach? Perhaps hacking together something with the Sched Module and Cron.

Best answer

Edit 2:

For some reason, my head was originally stuck in the realm of recurring tasks. Here is a simpler solution.

All you really need is to define one task for each user action. You can skip storing tasks to be executed in your database–that’s what celery is here for!

Reusing your facebook post example again, and again assuming you have a function post_to_facebook somewhere, which takes a user and some text, does some magic, and posts the text to that user’s facebook, you can just define it to be a task like this:

# Task to send one update.
@celery.task(ignore_result=True)
def post_to_facebook(user, text):
    # perform magic
    return whatever_you_want

When a user is ready to enqueue such a post, you just tell celery when to run the task:

post_to_facebook.apply_async(
    (user, text),   # args
    eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440)  # pass execution options as kwargs
)

This is all detailed here, among a whole bunch of available call options: http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown

If you need the result of the call, you can skip the ignore_result param in the task definition and get an AsyncResult object back, and then check it for the results of the call. More here: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

Some of the answer below is still relevant. You still want a task for each user action, you still want to think about task design, etc., but this is a much simpler road to doing what you asked about.

Original answer using recurring tasks follows:

Dannyroa has the right idea. I’ll build upon that a bit here.

Edit / TLDR:
The answer is Yes, celery is suited to your needs. You just may need to rethink your task definition.

I assume you aren’t allowing your users to write arbitrary Python code to define their tasks. Short of that, you will have to predefine some actions users can schedule, and then allow them to schedule those actions as they like. Then, you can just run one scheduled task for each user action, checking for entries and performing the action for each entry.

One user action:

Using your Facebook example, you would store users’ updates in a table:

class ScheduledPost(Model):
    user = ForeignKey('auth.User')
    text = TextField()
    time = DateTimeField()
    sent = BooleanField(default=False)

Then you would run a task every minute, checking for entries in that table scheduled to be posted in the last minute (based on the error margin you mentioned). If it is very important that you hit your one minute window, you might schedule the task more often, say, every 30 seconds. The task might look like this (in myapp/tasks.py):

@celery.task
def post_scheduled_updates():
    from celery import current_task
    scheduled_posts = ScheduledPost.objects.filter(
        sent=False,
        time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
        time__lte=timezone.now()
    )
    for post in scheduled_posts:
        if post_to_facebook(post.text):
            post.sent = True
            post.save()

The config might look like this:

CELERYBEAT_SCHEDULE = {
    'fb-every-30-seconds': {
        'task': 'tasks.post_scheduled_updates',
        'schedule': timedelta(seconds=30),
    },
}

Additional user actions:

For each user action in addition to posting to Facebook, you can define a new table and a new task:

class EmailToMom(Model):
    user = ForeignKey('auth.User')
    text = TextField()
    subject = CharField(max_length=255)
    sent = BooleanField(default=False)
    time = DateTimeField()

@celery.task
def send_emails_to_mom():
    scheduled_emails = EmailToMom.objects.filter(
        sent=False,
        time__lt=timezone.now()
    )
    for email in scheduled_emails:
        sent = send_mail(
            email.subject,
            email.text,
            email.user.email,
            [email.user.mom.email],
        )
        if sent:
            email.sent = True
            email.save()

    CELERYBEAT_SCHEDULE = {
        'fb-every-30-seconds': {
            'task': 'tasks.post_scheduled_updates',
            'schedule': timedelta(seconds=30),
        },
        'mom-every-30-seconds': {
            'task': 'tasks.send_emails_to_mom',
            'schedule': timedelta(seconds=30),
        },
    }

Speed and optimization:

To get more throughput, instead of iterating over the updates to post and sending them serially during a post_scheduled_updates call, you could spawn up a bunch of subtasks and do them in parallel (given enough workers). Then the call to post_scheduled_updates runs very quickly and schedules a whole bunch of tasks–one for each fb update–to run asap. That would look something like this:

# Task to send one update. This will be called by post_scheduled_updates.
@celery.task
def post_one_update(update_id):
    try:
        update = ScheduledPost.objects.get(id=update_id)
    except ScheduledPost.DoesNotExist:
        raise
    else:
        sent = post_to_facebook(update.text)
        if sent:
            update.sent = True
            update.save()
        return sent

@celery.task
def post_scheduled_updates():
    from celery import current_task
    scheduled_posts = ScheduledPost.objects.filter(
        sent=False,
        time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
        time__lte=timezone.now()
    )
    for post in scheduled_posts:
        post_one_update.delay(post.id)

The code I have posted is not tested and certainly not optimized, but it should get you on the right track. In your question you implied some concern about throughput, so you’ll want to look closely at places to optimize. One obvious one is bulk updates instead of iteratively calling post.sent=True;post.save().

More info:

More info on periodic tasks: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html.

A section on task design strategies: http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies

There is a whole page about optimizing celery here: http://docs.celeryproject.org/en/latest/userguide/optimizing.html.

This page about subtasks may also be interesting: http://docs.celeryproject.org/en/latest/userguide/canvas.html.

In fact, I recommend reading all the celery docs.