본문 바로가기
Python

Python Ray 라이브러리 (병렬처리)

by shulk 2025. 1. 7.

1.병렬처리 이전의 방법 multiprocessing에 대해

multiprocessing라이브러리 경우 프로세스 스포닝(Process Spawning)을 지원하여 자원 내에서 사용 가능한 다중 프로세서를 활용 가능하게 하고 생성한 프로세스 풀을 제어하는 프로세스 풀 객체(pool)을 통해 병렬 처리를 한다.  
[프로세스 스포닝: 부모 프로세스가 운영 체제에 요청하여 자식 프로세스를 생성하는 과정]

그러나 multiprocessing는 자식 프로세스한테 작업해야하는 큰 데이터를 전달할때 pickle을 사용해 직렬화(데이터를 바이트로 변환)한 뒤 전달한다.

만약 20개 프로세스에서 작업하도록 설정했다면 그 만큼 직렬화한 데이터 복사본을 만들어야 하므로 큰 메모리 사용하고,직렬화 데이터 받은 프로세스는 원래 데이터로 변환인 역직렬화도 해야하니 오버헤드가 발생한다.

multiprocessing의 문제들을 해결 해주는 라이브러리 Ray라는게 있다. 병렬 처리를 위해 코드를 뜯어 고칠 필요가 없고, multiprocessing에서 발생하는 직렬화 오버헤드 문제가 발생하지 않고 게다가 속도도 훨배 빠르다.

 

2. Ray 예시 코드

일단 쉽게 이해하기 위해 개념적보다 예시로 먼저 본다.

import ray
import time
import numpy as np

ray.init(num_cpus=10)

@ray.remote
def mul(x):
  return x * 10

# 10만 개의 랜덤 정수 생성 (0부터 100 사이)
arr = np.random.randint(0, 101, size=100000000)

start_time = time.time()

arr = ray.put(arr)
result = ray.get(mul.remote(arr))

end_time = time.time()
elapsed_time = end_time - start_time

print(f"결과: {result[:20]}...") # 결과가 기므로 앞 20개만 출력
print(f"실행 시간: {elapsed_time:.6f} 초")

ray.shutdown()

예시 코드를 보면서 알아보면 일단 arr에 100000000개의 요소가 들어 있는데 이 각 요소에 10을 곱하는 작업을 병렬로 하려한다.

[예시코드 해석]

ray.init(num_cpus=10)

Ray 시스템을 초기화 한다. 이 코드는 병렬 처리를 위한 작업 환경을 설정하고, @ray.remote 표시가 붙은 함수들은 여러 CPU 코어에서 동시에 실행된다.

@ray.remote
def mul(x):
  return x * 10

@ray.remote: mul 함수를 Ray 원격 함수로 만든다. 즉, 이 함수는 별도의 프로세스에서 병렬로 실행될 수 있다.

arr = ray.put(arr)

arr = ray.put(arr): 배열 arr를 Ray의 공유 객체 저장소(shared object store)에 저장한다. ray.put()은 객체의 복사본을 공유 메모리에 생성하고, 해당 객체를 가리키는 ObjectRef (객체 참조)를 반환한다.

이제 arr 변수는 실제 데이터가 아닌 ObjectRef를 가지고 있어서, 이렇게 함으로써, 다른 Ray 작업들이 이 데이터에 접근할 수 있다.

result = ray.get(mul.remote(arr))

mul.remote(arr): mul 함수를 원격으로 실행하고 arr(ObjectRef)가 인자로 전달된다.

Ray는 arr이 참조하는 데이터를 mul 함수가 실행되는 프로세스로 자동 전송하고, 이 함수 호출은 즉시 ObjectRef를 반환하며, 실제 계산은 백그라운드에서 비동기적으로 수행된다

ray.get(...): mul.remote(arr) 작업을 완료한 결과를 가져오는것이다.

mul 함수의 실행이 완료될 때까지 기다린 후, 그 결과(여기서는 10이 곱해진 배열)를 result 변수에 저장한다.

ray.shutdown()

 

ray.shutdown(): 병렬 처리를 위해 만들었던 작업 환경을 정리하고, 사용했던 자원(CPU, 메모리 등)을 반납한다.

 

[Ray의 개념]

Ray는 multiprocessing 에서 발생하는 직렬화 오버헤드를 해결하기 위해 Apache Arrow를 사용한다.. Apache Arrow는 행(Row) 기반이 아닌 컬럼 기반의 인메모리 포맷으로 Zero-Copy 직렬화를 수행하고 또한 직렬화된 데이터를 인메모리 객체 저장소 (In-Memory Object Store)인 Plasma를 이용해 직렬화된 데이터를 빠르게 공유한다.

 

3. Ray의 구성 요소 Task, Object, Actor

1. Task (작업):

  • Ray에서 Task는 하나의 함수 실행 단위이다. @ray.remote 데코레이터로 표시된 함수를 호출하면 Task가 생성된다.
  • 쉽게 말해, 병렬로 처리하고 싶은 하나의 작업이라고 생각하면 된다. 마치 요리할 때 "감자 썰기", "양파 볶기" 같은 개별 작업과 유사하다.
  • 예를 들어, mul.remote(arr)는 mul 함수를 병렬로 실행하는 Task를 생성한다.

2. Object (객체):

  • Ray에서 Object는 Task의 입력 또는 출력으로 사용되는 데이터이다. ray.put()으로 생성된 값이나, Task 실행 결과로 생성된 값을 말한다.[변경 불가능(immutable)]
  • 공유 메모리에 저장되어 여러 Task에서 접근할 수 있는 데이터 조각이라고 생각하면 된다. 요리 재료 중 "썰어둔 감자", "볶은 양파"와 같이 여러 요리에 사용될 수 있는 중간 결과물과 유사하다.
  • 아까 위해 예시코드를 생각해보면 ray.put(arr)는 arr 배열을 Ray Object로 만들고, result = ray.get(mul.remote(arr))에서 result는 mul Task의 실행 결과인 Object이다..

3. Actor (액터):

  • Ray에서 Actor는 상태(state)를 가지는 객체이다. 클래스에 @ray.remote 데코레이터를 붙여 Actor를 정의할 수 있다.
  • 쉽게 말해, 여러 Task에서 공유하고 변경할 수 있는 변수를 가진 객체라고 생각하면 된다. 마치 요리사가 사용하는 "믹서기"나 "오븐"처럼 여러 요리에 사용되지만, 사용 후에는 상태가 변할 수 있는 도구와 유사 하다.
  • Actor는 여러 번 호출될 수 있고, 각 호출은 병렬로 실행되며, Actor 내부의 상태를 변경할 수 있다.
  • 위에 예시 코드에는 액터를 사용 안해서 모르니 예시 코드 보면
import ray

ray.init()

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

# 액터 상태를 여러 프로세스에서 사용하는 경우(순차적 처리)
# 10개의 작업을 실행
results = [counter.increment.remote() for _ in range(10)]

# 작업 결과를 가져옴
final_results = ray.get(results)
print(final_results)  # [1, 2, 3, ..., 10]

#만약 여러 액터를 생성한다면(병렬적 처리)
# 여러 액터 생성
counters = [Counter.remote() for _ in range(10)]

# 각각의 액터에서 작업 실행
results = [counter.increment.remote() for counter in counters]
final_results = ray.get(results)
print(final_results)  # [1, 1, 1, ..., 1]

 

 

참고 사이트