Skip to main content

Integrate Flask with Celery

Learn how to integrate Flask with Celery

This page explains how to integrate Flask with Celery, two powerful technologies used in many applications and production-ready projects. Integrating Celery with Flask is a common way to handle asynchronous tasks in a Flask web application.

Integrate Flask with Celery - Tutorial provided by AppSeed

In this example, I'll guide you through setting up Celery with Flask and executing simple bash or Python tasks, and then collecting the output. Here are the steps:

✅ Install Celery:​

You can install Celery using pip:

pip install celery

✅ Create a Flask App​

Start by creating a Flask application if you don't already have one. Here's a simple example:

from flask import Flask, request

app = Flask(__name__)

@app.route('/')
def index():
return 'Hello, Flask!'

if __name__ == '__main__':
app.run()

✅ Configure Celery​

Create a new Python file for Celery configuration (e.g., celery_config.py):

from celery import Celery

celery = Celery(
'myapp',
broker='redis://localhost:6379/0', # Use your own broker URL
backend='redis://localhost:6379/0' # Use your own backend URL
)

Make sure you have Redis installed and running since we're using it as a message broker and result backend.

✅ Initialize Celery in Flask​

In your Flask app, import the Celery instance and configure it with your Flask app:

from flask import Flask
from celery_config import celery

app = Flask(__name__)

app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' # Use your broker URL
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' # Use your backend URL

celery.conf.update(app.config)

✅ Define a Celery Task​

Create a Celery task to execute the bash or Python command and collect the output. Here's an example:

import subprocess

@celery.task
def execute_command(command):
try:
result = subprocess.check_output(command, shell=True, text=True)
return result
except Exception as e:
return str(e)

✅ Run Celery Task in Flask​

Now, you can use the execute_command task in your Flask routes to run commands asynchronously:

from flask import Flask, request, jsonify
from celery_config import celery
from tasks import execute_command

app = Flask(__name__)

@app.route('/run-command', methods=['POST'])
def run_command():
command = request.json.get('command')
task = execute_command.apply_async(args=[command])
return jsonify({'task_id': task.id}), 202

@app.route('/get-result/<task_id>', methods=['GET'])
def get_result(task_id):
task = execute_command.AsyncResult(task_id)
if task.ready():
result = task.result
else:
result = 'Task still in progress'
return jsonify({'result': result})

if __name__ == '__main__':
app.run()

✅ Run Flask Application​

Start your Flask app:

python app.py

Make sure your Celery worker is also running:

celery -A celery_config worker --loglevel=info

✅ Trigger a Task​

You can trigger a task by sending a POST request to /run-command with a JSON payload containing the command to execute. For example:

{
"command": "ls -l"
}

The response will include a task_id.

✅ Collect the Output​

To collect the output, you can make a GET request to /get-result/<task_id> with the task_id received earlier. This will return the result of the command execution.

✅ In Summary​

At this point we should have a Flask app that can execute simple bash or Python tasks asynchronously using Celery and collect the output when they are done.

Make sure to adapt the code and configurations to your specific needs and security requirements.

✅ Resources​