본문 바로가기
Python

Flask에서 celery이용한 비동기 처리 실습

by shulk 2025. 1. 13.

이전에는 간단한 예시로해서 백그라운드 작업중 DB 사용 이런게 없어서 컨텍스트 개념 사용 안하니 문제없었다.

그러나 이제 실제 API에 적용해보려면 컨텍스트에 설정된 DB 객체등 가져다 사용해야해서 문제들이 일어났다.

1. Flask vs Celery 컨텍스트

일단 컨텍스트는 애플리케이션이 요청을 처리하는 동안 현재 작업과 관련된 데이터(예: 설정, 데이터베이스 연결, 사용자 정보 등)를 저장하고 관리하는 공간이다.

 

Flask는 요청 들어오면 자체 컨텍스트를 생성하고 응답하면 컨텍스트가 사라지니, Celery는 Flask와는 별도로 Celery 전용 프로세스에서 백그라운드 작업을 처리하니  Celery 작업에서도 Celery 전용 컨텍스트가 필요하다.

 

즉,별도의 프로세스에서 celery가 작업하니 Flask의 컨텍스트를 완전히 공유해서 사용할 수 없다!

2.Celery 컨텍스트 설정 방법

A)자체적인 Celery용 컨텍스트 설정하기 ( celeryContext.py)

celery -A celeryContext.celery worker --loglevel=debug --pool=solo

이제 워커를 실행하면 이렇게 밑에 코드가 실행하니 Celery용 자체적으로 설정한 컨텍스트가 생성된다. 

지금은 컨텍스트에 SqlAlchemy 객체만 초기화하고 설정해두었다. 

import os
from celery import Celery
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from config.Config import DevelopmentConfig
from dotenv import load_dotenv

# include는 Celery 작업(Task)이 정의된 파일 이름적고 만약 하위 디렉토리에 있다면 myapp.tasks.controllertest 이런식으로 적는다
celery = Celery(__name__, broker='redis://localhost:6379/0', backend='redis://localhost:6379/0',include=['controllertest'])

# Flask 애플리케이션 팩토리 함수
load_dotenv()

celeryDb = SQLAlchemy()

def create_celery_app():
  flask_app = Flask(__name__)
  flask_app.config.from_object(DevelopmentConfig)

  # SQLAlchemy 초기화
  celeryDb.init_app(flask_app)
  flask_app.config['celeryDb'] = celeryDb

  return flask_app

# Flask 애플리케이션 생성 및 Celery 설정
flask_app = create_celery_app()
celery.conf.update(flask_app.config)

# ContextTask 설정 (Flask 애플리케이션 컨텍스트 활성화)
# __call__은 파이썬에서 클래스의 객체가 함수처럼 호출될 때 실행되는 특별한 메소드이고 여기서  __call__는 Celery 작업이 실행될 때 실행되는 메소드를 오버라이드 하는거다
class ContextTask(celery.Task):
  def __call__(self, *args, **kwargs):
    with flask_app.app_context(): # Flask 앱 컨텍스트 활성화
      return self.run(*args, **kwargs)

# Celery의 기본 작업 클래스(celery.Task)를  위에서 만든 커스텀 클래스인 ContextTask로 교체하는 코드
# 이제 모든 Celery 작업이 ContextTask를 기반으로 동작하며, Flask 앱 컨텍스트 내에서 실행
celery.Task = ContextTask

 

B)Flask 앱 컨텍스트 설정한 내용만 그대로 가져다가 Celery컨텍스트에 넣어서 설정하기  

마찬가지로 워커 실행후 Flask 서버 실행하면

celery = make_celery(app)

이 부분에 의해 Celery 컨텍스트에는 flask서버의 컨텍스트에 설정한 gemini,크로마,임베딩등등 설정이 복사되는거로 보면된다.

import os
from flask import Flask, g
from dotenv import load_dotenv
import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from config.Config import Config, DevelopmentConfig
from flask_sqlalchemy import SQLAlchemy
from flask_restx import Api
from celery import Celery

# .env 파일 로드
load_dotenv()

# SQLAlchemy 초기화
db = SQLAlchemy()

# Celery 초기화 함수
def make_celery(app):
  celery = Celery(
      app.import_name,
      broker='redis://localhost:6379/0',
      backend='redis://localhost:6379/0',
      include=['controllertest']
  )
  celery.conf.update(app.config)

  # Flask 컨텍스트를 Celery 작업에서 사용하도록 설정
  class ContextTask(celery.Task):
    def __call__(self, *args, **kwargs):
      with app.app_context():
        return self.run(*args, **kwargs)

  celery.Task = ContextTask
  return celery

# Flask 애플리케이션 초기화 함수
def create_app():
  app = Flask(__name__)

  # Config 클래스 적용
  app.config.from_object(DevelopmentConfig)
  Config.init_app(app)

  # SQLAlchemy 초기화 코드

  # Flask-RESTX 초기화 코드
  
  # Generative AI 설정코드
  
  # LangChain 및 임베딩 모델 설정 코드

  # Chroma 설정 코드

  @app.before_request
  def before_request():
    g.db = db

  @app.teardown_request
  def teardown_request(exception=None):
    db.session.remove()

  # Namespace 등록
  from controllertest import fileNamespace
  api.add_namespace(fileNamespace, path='/video')

  return app

# 애플리케이션과 Celery 초기화
app = create_app()
celery = make_celery(app)

# Flask 애플리케이션 실행
if __name__ == '__main__':
  app.run(debug=True)

 

3. Celery에 필요한 리소스만 설정 (A방식) VS  Flask 컨텍스트 설정 Celery와 공유(B방식)

Flask 컨텍스트 설정을 Celery와 공유하는 경우 

사용하기 간편하고 유지보수가 쉽고, Celery 작업에서 Flask 애플리케이션의 여러 리소스와 설정에 접근해야 하는 경우 적합하다.

프로젝트가 간단하거나, 메모리 사용량이 크게 문제가 되지 않는 경우 권장

단점: Celery 작업에서 Flask 애플리케이션의 불필요한 설정과 리소스 Gemini, Chroma, Embeddings 등 포함해서 메모리 사용량이 약간 증가

 

Celery에 필요한 리소스만 설정하는 경우

Celery 작업에서 필요한 최소한의 리소스만 로드하므로 성능적으로 더 효율적이다.

프로젝트가 크고, Celery 작업의 리소스 사용을 명확히 관리하고 싶을 때 적합하다.

단점: Flask 애플리케이션과 Celery의 설정을 분리해서 관리해야 하므로 중복코드 발생이나 유지보수가 어려울 수도 있다.

4.Task 코드 

2번 설명에 A방법으로 할때는 그냥 바로 이렇게 celeryContext.py에 celery를 임포트 해오고 task 코드에서 바로 

dbInstance = current_app.config['celeryDb'] 이렇게 가져다가 사용하면 끝이다.

from celeryContext import celery

@celery.task()
def videoSplitBackground(save_path, originalFilename, segmentSaveDir, segment_duration):
    dbInstance = current_app.config['celeryDb']
    segments = fileService.videoSplit(
        save_path, originalFilename, segmentSaveDir, segment_duration,dbInstance
    )
    return {
      "message": "Video split successfully!",
      "splitVideos": segments
    }

 

 

2번 설명에 B방법으로 할시인데 현재 코드는 순환참조  에러 일어난다.

app.py와 FileController.py가 서로를 import하려다 보니 초기화되지 않은 상태에서 다른 모듈을 참조하려고 해서 그렇다.

나중 이 방식으로 할때 순환참조 문제는 따로 알아보고 해결하고, 순환 참조 해결했을경우 이제 하는방법은

with current_app.app_context():

Celery의 독립적인 컨텍스트에서 Flask 컨텍스트의 일부 정보만 가져다 사용하기 위해  Flask 애플리케이션 컨텍스트를 명시적으로 활성화하면 된다.

from flask import Flask, jsonify,Blueprint,request,current_app
from app import celery


@celery.task()
def videoSplitBackground(save_path, originalFilename, segmentSaveDir, segment_duration):
  with current_app.app_context():
    dbInstance = current_app.config['celeryDb']
    segments = fileService.videoSplit(
        save_path, originalFilename, segmentSaveDir, segment_duration,dbInstance
    )
    return {
      "message": "Video split successfully!",
      "splitVideos": segments
    }

 

5. SqlAlchemy의 모델 코드

이제 백그라운드 작업중 DB에 저장하는 작업한다했을때 SqlAlchemy는 모델파일(스프링의 엔티티 같은 역활) 사용하는데 이부분은 아까 2번 설명에 B 방식으로 하면 수정없이 그대로 해도 되나, A방식으로 할때

클래스 매개변수에  모델 클래스 넣어야해서 컨텍스트에 설정한 DB 객체 넣어주면 되는데, A방식 독립적인 설정한거 넣어줘야한다.

그래서 A 방식으로 할경우 기존 Flask 전용에 사용했던 VidelModel.py 의 코드 복사후 celery 전용 모델 CeleryVideoModel.py 생성후 코드 복붙하고 매개변수만 수정하면 된다. 

A 방식의 단점이 이 중복 코드가 발생한다.

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
from celeryContext import celeryDb


class Video(celeryDb.Model):
  __tablename__ = 'video'
  
   # Primary Key
  id = Column(Integer, primary_key=True, autoincrement=True, name="video_id")

  # Video title
  title = Column(String(255), nullable=False)
  
  ....

 

6.  추가) A 방식 코드  해석 GPT 답변 

class ContextTask(celery.Task):
  def __call__(self, *args, **kwargs):
    with flask_app.app_context(): 
      return self.run(*args, **kwargs)

celery.Task = ContextTask

(1). ContextTask 클래스가 무엇인가요?

ContextTask는 Celery의 기본 작업 클래스(celery.Task)를 확장한 커스텀 작업 클래스입니다. 여기서 목적은 Celery 작업이 실행될 때, Flask 애플리케이션의 "앱 컨텍스트(app context)"를 포함해서 실행되도록 하는 것입니다.

앱 컨텍스트가 뭔가요?

  • Flask는 특정 데이터나 환경 설정(예: 현재 사용자 정보, 데이터베이스 연결 등)을 "앱 컨텍스트" 안에서 관리합니다.
  • Flask 애플리케이션은 기본적으로 요청(request) 처리 중에만 앱 컨텍스트를 활성화합니다.
  • 하지만 Celery 작업은 Flask 애플리케이션 외부에서 실행되기 때문에 기본적으로 이 컨텍스트가 활성화되지 않습니다.
  • 따라서 Celery 작업이 Flask 애플리케이션 데이터를 제대로 사용하려면 앱 컨텍스트를 수동으로 활성화해야 합니다.

(2). __call__ 메서드의 역할

__call__은 파이썬에서 클래스의 객체가 함수처럼 호출될 때 실행되는 특별한 메서드입니다. 여기서 __call__은 Celery 작업이 실행될 때 실행되는 메서드를 오버라이드(재정의)합니다.

  • with flask_app.app_context(): Flask 애플리케이션의 앱 컨텍스트를 활성화합니다.
  • self.run(*args, **kwargs): Celery 작업의 실제 로직(할 일을 정의한 코드)을 실행합니다.

(3).celery.Task = ContextTask의 의미

이 줄은 Celery의 기본 작업 클래스(celery.Task)를 우리가 만든 커스텀 클래스인 ContextTask로 교체(replace)하는 코드입니다.
이제 모든 Celery 작업이 ContextTask를 기반으로 동작하며, Flask 앱 컨텍스트 내에서 실행됩니다.


(4).전체 코드의 의미

이 코드를 사용하면 Flask 애플리케이션의 설정, 데이터베이스 연결, 기타 컨텍스트 의존성 등을 Celery 작업에서 안전하게 사용할 수 있습니다.

'Python' 카테고리의 다른 글

Flask에서 celery+redis 이용한 비동기 처리  (0) 2025.01.10
파이썬 비동기 프로그래밍 asyncio  (0) 2025.01.08
Python Ray 라이브러리 (병렬처리)  (0) 2025.01.07
파이썬 복습  (0) 2023.11.11