source

asyncio를 사용합니다.생산자-소비자 흐름 대기열

factcode 2023. 5. 24. 22:27
반응형

asyncio를 사용합니다.생산자-소비자 흐름 대기열

어떻게 사용해야 할지 헷갈립니다.asyncio.Queue생산자와 소비자 모두가 동시에 독립적으로 작동하는 특정 생산자-소비자 패턴의 경우.

먼저 의 문서에 나와 있는 다음 예제를 살펴보겠습니다.

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:0.2f} seconds')

async def main(n):
    queue = asyncio.Queue()
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    tasks = []
    for i in range(n):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

if __name__ == '__main__':
    import sys
    n = 3 if len(sys.argv) == 1 else sys.argv[1]
    asyncio.run(main())

이 스크립트에 대한 자세한 내용은 항목이 동시에 대기열에 배치된다는 것입니다.queue.put_nowait(sleep_for)전통적인 포 루프에서.

내 목표는 다음을 사용하는 스크립트를 만드는 것입니다.async def worker()(또는)consumer()) 및async def producer()둘 다 동시에 실행되도록 예약해야 합니다.어떤 소비자 코루틴도 생산자와 명시적으로 연결되거나 연결되지 않습니다.

생산자가 소비자/근로자와 동시에 일정을 잡을 수 있는 자체 코루틴이 되도록 위의 프로그램을 수정하려면 어떻게 해야 합니까?


PYMOTW의 두 번째 예가 있습니다.그것은 생산자가 소비자의 수를 미리 알 것을 요구하고, 사용합니다.None생산이 완료되었다는 신호로 소비자에게 전달됩니다.

생산자가 소비자/근로자와 동시에 일정을 잡을 수 있는 자체 코루틴이 되도록 위의 프로그램을 수정하려면 어떻게 해야 합니까?

이 예제는 기본 논리를 변경하지 않고 일반화할 수 있습니다.

  • 삽입 루프를 별도의 생산자 코루틴으로 이동합니다.
  • 소비자가 제품을 생산할 때 제품을 처리할 수 있도록 백그라운드에서 시작합니다.
  • 소비자가 작동하는 상태에서 생산자를 시작하고 제품 생산이 끝날 때까지 기다립니다.await producer()또는await gather(*producers),기타.
  • 모든 생산자가 완료되면 소비자가 나머지 품목을 처리할 때까지 기다립니다.await queue.join().
  • 소비자들을 취소하세요. 소비자들은 이제 생산자들이 다했다는 것을 알기 때문에 결코 도착하지 않을 다음 상품을 배달하기 위해 대기하고 있습니다.

다음은 위의 내용을 구현하는 예입니다.

import asyncio, random
 
async def rnd_sleep(t):
    # sleep for T seconds on average
    await asyncio.sleep(t * random.random() * 2)
 
async def producer(queue):
    while True:
        # produce a token and send it to a consumer
        token = random.random()
        print(f'produced {token}')
        if token < .05:
            break
        await queue.put(token)
        await rnd_sleep(.1)
 
async def consumer(queue):
    while True:
        token = await queue.get()
        # process the token received from a producer
        await rnd_sleep(.3)
        queue.task_done()
        print(f'consumed {token}')
 
async def main():
    queue = asyncio.Queue()
 
    # fire up the both producers and consumers
    producers = [asyncio.create_task(producer(queue))
                 for _ in range(3)]
    consumers = [asyncio.create_task(consumer(queue))
                 for _ in range(10)]
 
    # with both producers and consumers running, wait for
    # the producers to finish
    await asyncio.gather(*producers)
    print('---- done producing')
 
    # wait for the remaining tasks to be processed
    await queue.join()
 
    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()
 
asyncio.run(main())

실제 생산자와 소비자, 특히 네트워크 액세스와 관련된 소비자의 경우 처리 중에 발생하는 IO 관련 예외를 포착하고 싶을 수 있습니다.대부분의 네트워크 관련 예외와 마찬가지로 예외를 복구할 수 있는 경우에는 예외를 탐지하고 오류를 기록하기만 하면 됩니다.계속 호출해야 합니다.task_done()그렇지 않으면queue.join()처리되지 않은 항목으로 인해 중단됩니다.항목을 다시 처리하는 것이 타당한 경우 호출하기 전에 해당 항목을 대기열로 되돌릴 수 있습니다.task_done()예:

# like the above, but handling exceptions during processing:
async def consumer(queue):
    while True:
        token = await queue.get()
        try:
            # this uses aiohttp or whatever
            await process(token)
        except aiohttp.ClientError as e:
            print(f"Error processing token {token}: {e}")
            # If it makes sense, return the token to the queue to be
            # processed again. (You can use a counter to avoid
            # processing a faulty token infinitely.)
            #await queue.put(token)
        queue.task_done()
        print(f'consumed {token}')

언급URL : https://stackoverflow.com/questions/52582685/using-asyncio-queue-for-producer-consumer-flow

반응형