ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • concurrent.futures
    Data Engineer 2023. 12. 7. 00:29
    728x90

    현재 수행중인 프로젝트에서 Azure Blob Storage로부터 WAS에 이미지 데이터를 로드해야 하는 데 시간이 너무 오래 걸리는 현상이 발생했다. 로드하는 함수를 직렬 처리를 하는 대신에 병렬로 하면 어떨까하는 의문이 들어서 적용을 해보기로 했다.

    그래서 이 참에, python의 병렬 함수 처리를 가능케 하는 모듈에 대해 알아보기로 했다.

    concurrent.futures 모듈은 비동기적으로 callable을 실행하는 고수준 인터페이스를 제공한다.

    Python에서 비동기 실행은 Thread 혹은 별도의 Process로 수행된다.

    이 때, Thread로 실행되는 ThreadPoolExecutor와 Process로 실행되는 ProcessPoolExecutor가 존재한다.

    Executor 객체

    class concurrent.futures.Executor

    비동기적으로 호출을 실행하는 메서드를 제공하는 추상 클래스며, 직접 사용하지 않고 구체적인 하위 클래스를 통해 사용한다.

    submit

    : callable을 스케줄링하고, Future 객체를 반환한다.

    with ThreadPoolExecutor(max_workers=1) as executor:
    	future = executor.submit(pow, 323, 1235)
        	print(future.result())

    map(func, *iterables, timeout=None, chunksize=1)

    - map(func, *iterables)

    iterables는 느긋하게 처리되는 것이 아니라 즉시 수집되면, func는 비동기적으로 실행되며 func에 대한 여러 호출이 동시에 이뤄진다.

    returned iterator는 TimeoutError를 발생시킨다.

    ProcessPoolExecutor를 사용할 때, 이 메서드는 iterables를 다수의 덩어리로 잘라서 별도의 작업으로 풀에 제출한다. 이러한 덩어리의 대략적인 크기는 chunksize를 양의 정수로 설정하여 지정할 수 있다. 매우 긴 이터러블의 경우 chunksize에 큰 값을 사용하면 기본 크기인 1에 비해 성능이 크게 향상될 수 있다. ThreadPoolExecutor의 경우, chunksize는 아무 효과가 없다.

    shutdown(wait=True, *, cancel_futures=False)

    현재 계류 중인 future가 실행 완료될 때, 사용중인 모든 자원을 해제해야 한다는 것을 실행기에 알린다. 종료 후에 이루어지는 Executor.submit()과 Executor.map() 호출은 RuntimeError를 시킨다.

    wait가 True면, 계류 중인 모든 Future가 실행을 마치고 실행기와 관련된 자원이 해제될 때까지 이 메서드는 돌아오지 않는다.

    wait가 False면, 이 메서드는 즉시 돌아오고 실행기와 연관된 자원은 계류 중인 모든 future가 실행을 마칠 때 해제된다. wait의 값과 관ㅋ계없이 모든 계류 중인 퓨처가 실행을 마칠 때까지 전체 파이썬 프로그램이 종료되지 않는다.

    import time
    def wait_on_b():
        time.sleep(5)
        print(b.result())  # b will never complete because it is waiting on a.
        return 5
    
    def wait_on_a():
        time.sleep(5)
        print(a.result())  # a will never complete because it is waiting on b.
        return 6
    
    
    executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)
    
    -------------------------------------------------------------------------------------------------------------------------
    
    def wait_on_future():
        f = executor.submit(pow, 5, 2)
        # This will never complete because there is only one worker thread and
        # it is executing this function.
        print(f.result())
    
    executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(wait_on_future)

    ThreadPoolExecutor 

    class concurrent.futures.ThreadPoolExecutor(max_workers=Nonethread_name_prefix=''initializer=Noneinitargs=())

    threadPoolExecutor는 스레드 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스이다.

    Future와 관련된 Callable 객체가 다른 Future의 결과를 기다릴 때 교착 상태가 발생할 수 있다.

    ThreadPoolExecutor에 큐잉된 모든 스레드는 인터프리터가 끝나기 전에 모두 join 될 것이다.

    exit 핸들러가 다른 exit 핸들러들이 atexit를 추가하기 전에 실행된다는 점에 유의하자. 이 뜻은 메인 스레드의 예외는  스레드들에게 exit하라는 신호를 보내기 위해서 반드시 잡힐 것이다. 이러한 이율, ThreadPoolExecutor는 긴 실행시간이 필요한 task들에게는 사용되지 않는다.

    initializer는 각 작업 스레드의 시작 부분에서 호출되는 선택적 콜러블이다.

    interargs : initializer에 전달하려는 인자들의 tuple

    initializer가 예외를 발생할 경우, 현재 계류중인 모든 작업과 풀에 추가로 작업을 제출하려는 시도에 대해 BrokenThreadPool을 발생시킨다.

    max_workers : 기본값은 min(32, os.cpu_count() + 4)을 사용하며, 해당 Executor가 CPU 작업보다는 I/O 작업을 동시에 진행하는 데 자주 쓰이고, 작업자의 수가 Process

    import concurrent.futures
    import urllib.request
    
    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'http://nonexistant-subdomain.python.org/']
    
    # Retrieve a single page and report the URL and contents
    def load_url(url, timeout):
        with urllib.request.urlopen(url, timeout=timeout) as conn:
            return conn.read()
    
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                print('%r page is %d bytes' % (url, len(data)))

    PoolExecutor보다 많아야 한다고 가정하고 있다.

     

    ProcessPoolExecutor

    해당 클래스는 프로세스 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스이다. multiprocessing 모듈을 사용하며, 전역 인터프리터 lock을 피할 수 있으나 오직 pickle 가능한 객체만 실행되고 반환될 수 있음을 의미한다.

    ProcessPoolExecutor에 제출된 Callable에서 Executor나 Future 메서드를 호출하면 교착 상태가 발생한다.

    class concurrent.futures.ProcessPoolExecutor(max_workers=Nonemp_context=Noneinitializer=Noneinitargs=()max_tasks_per_child=None)

    실행되는 Executor 서브 클래스는  많아야 max_workers 개수의 풀을 사용하여 비동기적으로 호출된다. max_workers가 주어지지 않으면, 디폴트로 기기의 프로세서 수가 주어진다. 만약 max_workers가 0 이하라면 ValueError를 발생시킨다.

    mp_context는 multiprocessing context 혹은 None이 될 수 있다. 이것들은 worker들을 실행시키는 데 사용되며, mp_context가 주어지지 않았을 때 디폴트로 multiprocessing context가 사용된다.

    max_tasks_per_child는 선택적 인자로, 다른 프로세스로 대체되거나 종료되기 전에 해당 싱글 프로세스가 실행할 수 있는 최대 작업의 수를 구체화한다. None으로 주어진다면 워커 프로세스는 pool만큼 오래 살아있을 것이라는 뜻이다.

    import concurrent.futures
    import math
    
    PRIMES = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419]
    
    def is_prime(n):
        if n < 2:
            return False
        if n == 2:
            return True
        if n % 2 == 0:
            return False
    
        sqrt_n = int(math.floor(math.sqrt(n)))
        for i in range(3, sqrt_n + 1, 2):
            if n % i == 0:
                return False
        return True
    
    def main():
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
                print('%d is prime: %s' % (number, prime))
    
    if __name__ == '__main__':
        main()

     

    Future?

    Future 클래스는 콜러블 객체의 비동기 실행을 캡슐화한다. future 인스턴스는 Excutor.submit()에 의해 생성된다.

    class concurrent.futures.Future

    •  cancel() : 호출을 취소하려고 노력한다. 호출이 실행중이거나 실행 종료 후 취소할 수 없는 경우 False를 반환, 이 경우가 아니라면 호출이 취소되고 메서드는 True를 반환한다.
    • cancelled() : 호출이 성공적으로 취소되었을 시 True를 반환
    • running() : 호출이 현재 실행 중이고 취소할 수 없는 경우 True를 반환한다
    • done() : 호출이 성공적으로 취소되었거나 실행이 완료되었으면 True를 반환한다.
    • result(timeout=none) : 호출의 결과값을 반환한다. 만약 호출이 완료되지 않은경우 timeout 동안 기다리지만, 그 시간 내에 완료되지 않은 경우 TimeoutError를 발생시킨다. None으로 지정시 해당 timeout은 무제한으로 처리된다.
    • exception(timeout=none) : 호출에 의해 발생된 예외를 반환하고, 마찬가지로 timeout 인자에 의해 결과값이 좌우된다. 
    • add_done_callback : 퓨처가 취소되거나 실행이 종료될 때 퓨처를 유일한 인자로 호출한다. 추가한 콜러블은 추가된 순서대로 호출되며, 항상 콜러블을 추가한 프로세스에 속하는 스레드에서 호출된다.
    • set_running_or_notify_cancel() : Future와 관련된 작업을 실행하기 전에 Executor 구현에 의해서만 호출되거나 단위 테스트에서만 호출되어야 한다. Future가 취소되었을 때 False를 반환한다. 이 메서드는 Future.set_result() 또는 Future.set_exception()이 호출된 후에는 호출할 수 없다.
    • set_result(result) : Future와 관련된 작업 결과를 result로 설정한다. Future가 이미 완료되었다면 concurrent.futures.InvalidStateError를 발생 
    • set_exception(result) : Future와 관련된 작업 결과를 Exception으로 설정한다. Future가 이미 완료되었다면 concurrent.futures.InvalidStateError를 발생 

    참고 : https://docs.python.org/ko/3/library/concurrent.futures.html

     

    concurrent.futures — Launching parallel tasks

    Source code: Lib/concurrent/futures/thread.py and Lib/concurrent/futures/process.py The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchr...

    docs.python.org

     

    728x90

    'Data Engineer' 카테고리의 다른 글

    Git add / commit / push 취소  (0) 2023.12.15
    정형, 비정형, 반정형  (0) 2023.12.15
    Copy() 메서드  (1) 2023.12.03
    동기 작업과 비동기 작업  (1) 2023.12.03
    Airflow 연결 오류  (0) 2023.11.08
Designed by Tistory.