Integrate Flask with Celery
Learn how to integrate Flask with Celery
This page explains how to integrate Flask with Celery, two powerful technologies used in many applications and production-ready projects. Integrating Celery with Flask is a common way to handle asynchronous tasks in a Flask web application.
In this example, I'll guide you through setting up Celery with Flask and executing simple bash or Python tasks, and then collecting the output. Here are the steps:
✅ Install Celery:​
You can install Celery using pip:
pip install celery
✅ Create a Flask App​
Start by creating a Flask application if you don't already have one. Here's a simple example:
from flask import Flask, request
app = Flask(__name__)
@app.route('/')
def index():
return 'Hello, Flask!'
if __name__ == '__main__':
app.run()
✅ Configure Celery​
Create a new Python file for Celery configuration (e.g., celery_config.py
):
from celery import Celery
celery = Celery(
'myapp',
broker='redis://localhost:6379/0', # Use your own broker URL
backend='redis://localhost:6379/0' # Use your own backend URL
)
Make sure you have Redis installed and running since we're using it as a message broker and result backend.
✅ Initialize Celery in Flask​
In your Flask app, import the Celery instance and configure it with your Flask app:
from flask import Flask
from celery_config import celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' # Use your broker URL
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' # Use your backend URL
celery.conf.update(app.config)
✅ Define a Celery Task​
Create a Celery task to execute the bash or Python command and collect the output. Here's an example:
import subprocess
@celery.task
def execute_command(command):
try:
result = subprocess.check_output(command, shell=True, text=True)
return result
except Exception as e:
return str(e)
✅ Run Celery Task in Flask​
Now, you can use the execute_command
task in your Flask routes to run commands asynchronously:
from flask import Flask, request, jsonify
from celery_config import celery
from tasks import execute_command
app = Flask(__name__)
@app.route('/run-command', methods=['POST'])
def run_command():
command = request.json.get('command')
task = execute_command.apply_async(args=[command])
return jsonify({'task_id': task.id}), 202
@app.route('/get-result/<task_id>', methods=['GET'])
def get_result(task_id):
task = execute_command.AsyncResult(task_id)
if task.ready():
result = task.result
else:
result = 'Task still in progress'
return jsonify({'result': result})
if __name__ == '__main__':
app.run()
✅ Run Flask Application​
Start your Flask app:
python app.py
Make sure your Celery worker is also running:
celery -A celery_config worker --loglevel=info
✅ Trigger a Task​
You can trigger a task by sending a POST request to /run-command
with a JSON payload containing the command to execute. For example:
{
"command": "ls -l"
}
The response will include a task_id
.
✅ Collect the Output​
To collect the output, you can make a GET request to /get-result/<task_id>
with the task_id
received earlier. This will return the result of the command execution.
✅ In Summary​
At this point we should have a Flask app that can execute simple bash or Python tasks asynchronously using Celery and collect the output when they are done.
Make sure to adapt the code and configurations to your specific needs and security requirements.
✅ Resources​
- 👉 Access AppSeed for more starters and support
- 👉 Deploy Projects on Aws, Azure and DO via DeployPRO
- 👉 Create landing pages with Simpllo, an open-source site builder
- 👉 Build apps with Django App Generator (free service)