본문 바로가기
Python

Flask에서 celery+redis 이용한 비동기 처리

by shulk 2025. 1. 10.

일단 원본 영상을 설정한 단위로 분리하고 저장하는 API가 있는데 영상의 크기가 클수록 오래 걸리니 생각나서 해보는거다.

Celery는 Python으로 작성된 비동기 작업 큐 이기 때문에 Flask와 같은 Python Web Framework에 붙여서 사용하기 수월하다.

Celery의 구성

 

Celery 예제

Celery Workers인 celery를 설치하고 Message Broker역활인 redis를 설치

pip install celery
pip install redis

 

이제 Celery는 flask와 별도의 프로세스 생성후 그거로 백그라운드 작업해야해서,  Celery worker 프로세스를 따로 생성하도록하는 설정 코드 작성한다.

# Celery 설정
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379/0',  # Redis 서버 주소 설정 (메시지 브로커)
    CELERY_RESULT_BACKEND='redis://localhost:6379/0' # Redis 서버 주소 설정 (결과 저장소)
)

# Celery 애플리케이션 인스턴스 생성
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# Celery 설정 업데이트
celery.conf.update(app.config)

#위에 처럼 하거나 아니면 다 주석후 밑에

celery = Celery(app.name, broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

 

근데 이렇게 설정 코드만 작성후 Flask 실행 한다고 되는게 아니다. 

실제로 Celery worker 프로세스를 생성하려면 별도의 명령어를 실행 해야한다!

--pool=solo 옵션은 Celery worker가 단일 프로세스 내에서 task를 순차적으로 실행하도록 지정하는 설정이다.

(pool의 종류는 (prefork, eventlet, gevent, solo) 이렇게들 있으나 지금은 예시라 순차적으로 처리하도록 단일로 설정)

celery -A Celery설정 코드 작성한 파일이름.celery worker --loglevel=debug --pool=solo


# 현재 예시 코드는 apptest.py 파일에 작성한거다
celery -A apptest.celery worker --loglevel=debug --pool=solo

 

이거는 이제 백그라운드 작업하는 코드이다. 말하자면 영상을 분리하고 작업하는 코드를 말하는거다.

@celery는 아까 위에  Celery worker 설정에 Celery 객체 생성한 변수 이름이랑 같아야한다.

@celery.task()
def long_task():
  """오래 걸리는 작업을 시뮬레이션하는 백그라운드 작업. (60초 소요)"""
  sleep(60)  # 60초 동안 대기
  return {'status': '작업 완료!', 'result': 42}  # 작업 완료 메시지와 결과 반환

 

 

이거는 일반 ApI인데 요청 들어오면 위에 @celery.task()로 등록한 long_task 메소드를 Celery 작업에 등록하고, 바로 클라한테 응답한다.

자세히 알아보면

delay() 메소드는 Celery task를 비동기적으로 실행하도록 큐에 추가하는 역할을 한다. 즉, long_task 함수를 즉시 실행하지 않고, Celery worker에게 나중에 실행하도록 요청하는 것이다.

@app.route('/start_task')
def start_task():
  """
  '/start_task' 경로로 요청이 오면 long_task를 비동기적으로 실행하고,
  task_id를 반환하는 API 엔드포인트.
  """
  task = long_task.delay()  # long_task를 비동기적으로 실행 (Celery에 작업 등록)
  return jsonify({'task_id': task.id}), 202  # task_id와 HTTP 상태 코드 202 (Accepted) 반환

 

delay() 메소드 알아보기

delay() 작동 방식:

  1. 메시지 생성: long_task.delay()를 호출하면, Celery는 long_task를 실행하는 데 필요한 정보(함수 이름, 인자 등)를 담은 메시지를 생성한다.
  2. 메시지 브로커에 전송: 생성된 메시지는 Celery설정된 broker (예: Redis)로 전송된다.
  3. Worker가 메시지 수신 및 처리: 실행 중인 Celery worker는 메시지 브로커에서 메시지를 수신하고, 메시지에 포함된 정보를 바탕으로 long_task 함수를 실행한다.
  4. 결과 저장 (선택적): long_task 함수의 실행 결과는 backend에 지정된 저장소(예: Redis)에 저장된다.

delay()의 반환값:

delay()는 AsyncResult 객체를 반환한다. 이 객체를 사용하여 task의 상태를 확인하거나, 결과를 가져올 수 있다.

  • task.id: task의 고유 ID
  • task.state: task의 현재 상태 ('PENDING', 'STARTED', 'SUCCESS', 'FAILURE', 'RETRY', 'REVOKED' 등)
  • task.get(): task가 완료될 때까지 기다린 후, 결과를 반환한다. (결과가 준비되지 않았으면, 기본적으로 계속 기다린다)
  • task.info: task의 메타데이터 (예: update_state()를 통해 설정한 정보)

그래서 @app.route('/task_status/<task_id>') 이 api 보면 저렇게 task.state에 따라 클라한테 응답하는데, 폴링방식이다.

 

예시코드 실행&결과

먼저 레디스 서버를 실행시키고, 그다음은 아까 말했던 Celery worker도 실행 시키고,이제 마지막으로 flask 애플리케이션 실행하면 된다.

 

요청시 이렇게 task_id를 응답으로 받고

 

폴링방식인 방금 받은 task_id를 포함시켜서 서버에 요청해서 작업이 완료 됬나 확인하는거다.

(지금은 서버 백그라운드에서 작업중이라 못 끝냈으니 대기중) 

 

1분 기다리고 다시 요청해보니 성공!

 

예시코드 전체

from time import sleep
from flask import Flask, jsonify
from celery import Celery

# Flask 애플리케이션 인스턴스 생성
app = Flask(__name__)

# Celery 설정
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',  # Redis 서버 주소 설정 (메시지 브로커)
    CELERY_RESULT_BACKEND='redis://localhost:6379' # Redis 서버 주소 설정 (결과 저장소)
)

# Celery 애플리케이션 인스턴스 생성
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# Celery 설정 업데이트
celery.conf.update(app.config)

@celery.task()
def long_task():
  """오래 걸리는 작업을 시뮬레이션하는 백그라운드 작업. (60초 소요)"""
  sleep(60)  # 60초 동안 대기
  return {'status': '작업 완료!', 'result': 42}  # 작업 완료 메시지와 결과 반환

@app.route('/start_task')
def start_task():
  """
  '/start_task' 경로로 요청이 오면 long_task를 비동기적으로 실행하고,
  task_id를 반환하는 API 엔드포인트.
  """
  task = long_task.delay()  # long_task를 비동기적으로 실행 (Celery에 작업 등록)
  return jsonify({'task_id': task.id}), 202  # task_id와 HTTP 상태 코드 202 (Accepted) 반환


@app.route('/task_status/<task_id>')
def task_status(task_id):
  """
  '/task_status/<task_id>' 경로로 요청이 오면 해당 task_id의 상태를 조회하여 반환하는 API 엔드포인트.
  """
  task = celery.AsyncResult(task_id)  # task_id를 사용하여 Celery 작업 객체 가져오기
  if task.state == 'PENDING':  # 작업 상태가 PENDING (대기 중)인 경우
    response = {
      'state': task.state,  # 작업 상태
      'status': '대기 중...'  # 상태 메시지
    }
  elif task.state != 'FAILURE':  # 작업 상태가 FAILURE (실패)가 아닌 경우 (SUCCESS 등)
    response = {
      'state': task.state,  # 작업 상태
      'status': task.info.get('status', ''),  # 작업 정보(status) 가져오기, 없으면 빈 문자열
    }
    if 'result' in task.info:  # 작업 정보에 result가 있으면
      response['result'] = task.info['result']  # 결과(result) 추가
  else:  # 작업 상태가 FAILURE (실패)인 경우
    response = {
      'state': task.state,  
      'status': f'오류 발생: {task.info}', 
    }
  return jsonify(response)  


if __name__ == '__main__':
  app.run(debug=True)

'Python' 카테고리의 다른 글

Flask에서 celery이용한 비동기 처리 실습  (0) 2025.01.13
파이썬 비동기 프로그래밍 asyncio  (0) 2025.01.08
Python Ray 라이브러리 (병렬처리)  (0) 2025.01.07
파이썬 복습  (0) 2023.11.11