asyncio를 사용합니다.생산자-소비자 흐름 대기열
어떻게 사용해야 할지 헷갈립니다.asyncio.Queue
생산자와 소비자 모두가 동시에 독립적으로 작동하는 특정 생산자-소비자 패턴의 경우.
먼저 의 문서에 나와 있는 다음 예제를 살펴보겠습니다.
import asyncio
import random
import time
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:0.2f} seconds')
async def main(n):
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(n):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
if __name__ == '__main__':
import sys
n = 3 if len(sys.argv) == 1 else sys.argv[1]
asyncio.run(main())
이 스크립트에 대한 자세한 내용은 항목이 동시에 대기열에 배치된다는 것입니다.queue.put_nowait(sleep_for)
전통적인 포 루프에서.
내 목표는 다음을 사용하는 스크립트를 만드는 것입니다.async def worker()
(또는)consumer()
) 및async def producer()
둘 다 동시에 실행되도록 예약해야 합니다.어떤 소비자 코루틴도 생산자와 명시적으로 연결되거나 연결되지 않습니다.
생산자가 소비자/근로자와 동시에 일정을 잡을 수 있는 자체 코루틴이 되도록 위의 프로그램을 수정하려면 어떻게 해야 합니까?
PYMOTW의 두 번째 예가 있습니다.그것은 생산자가 소비자의 수를 미리 알 것을 요구하고, 사용합니다.None
생산이 완료되었다는 신호로 소비자에게 전달됩니다.
생산자가 소비자/근로자와 동시에 일정을 잡을 수 있는 자체 코루틴이 되도록 위의 프로그램을 수정하려면 어떻게 해야 합니까?
이 예제는 기본 논리를 변경하지 않고 일반화할 수 있습니다.
- 삽입 루프를 별도의 생산자 코루틴으로 이동합니다.
- 소비자가 제품을 생산할 때 제품을 처리할 수 있도록 백그라운드에서 시작합니다.
- 소비자가 작동하는 상태에서 생산자를 시작하고 제품 생산이 끝날 때까지 기다립니다.
await producer()
또는await gather(*producers)
,기타. - 모든 생산자가 완료되면 소비자가 나머지 품목을 처리할 때까지 기다립니다.
await queue.join()
. - 소비자들을 취소하세요. 소비자들은 이제 생산자들이 다했다는 것을 알기 때문에 결코 도착하지 않을 다음 상품을 배달하기 위해 대기하고 있습니다.
다음은 위의 내용을 구현하는 예입니다.
import asyncio, random
async def rnd_sleep(t):
# sleep for T seconds on average
await asyncio.sleep(t * random.random() * 2)
async def producer(queue):
while True:
# produce a token and send it to a consumer
token = random.random()
print(f'produced {token}')
if token < .05:
break
await queue.put(token)
await rnd_sleep(.1)
async def consumer(queue):
while True:
token = await queue.get()
# process the token received from a producer
await rnd_sleep(.3)
queue.task_done()
print(f'consumed {token}')
async def main():
queue = asyncio.Queue()
# fire up the both producers and consumers
producers = [asyncio.create_task(producer(queue))
for _ in range(3)]
consumers = [asyncio.create_task(consumer(queue))
for _ in range(10)]
# with both producers and consumers running, wait for
# the producers to finish
await asyncio.gather(*producers)
print('---- done producing')
# wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
asyncio.run(main())
실제 생산자와 소비자, 특히 네트워크 액세스와 관련된 소비자의 경우 처리 중에 발생하는 IO 관련 예외를 포착하고 싶을 수 있습니다.대부분의 네트워크 관련 예외와 마찬가지로 예외를 복구할 수 있는 경우에는 예외를 탐지하고 오류를 기록하기만 하면 됩니다.계속 호출해야 합니다.task_done()
그렇지 않으면queue.join()
처리되지 않은 항목으로 인해 중단됩니다.항목을 다시 처리하는 것이 타당한 경우 호출하기 전에 해당 항목을 대기열로 되돌릴 수 있습니다.task_done()
예:
# like the above, but handling exceptions during processing:
async def consumer(queue):
while True:
token = await queue.get()
try:
# this uses aiohttp or whatever
await process(token)
except aiohttp.ClientError as e:
print(f"Error processing token {token}: {e}")
# If it makes sense, return the token to the queue to be
# processed again. (You can use a counter to avoid
# processing a faulty token infinitely.)
#await queue.put(token)
queue.task_done()
print(f'consumed {token}')
언급URL : https://stackoverflow.com/questions/52582685/using-asyncio-queue-for-producer-consumer-flow
'source' 카테고리의 다른 글
Xcode 4 스킴의 이름을 바꿀 수 있는 방법이 있습니까? (0) | 2023.05.24 |
---|---|
LINQ를 사용하여 개체 목록에서 고유한 속성 목록을 가져오려면 어떻게 해야 합니까? (0) | 2023.05.24 |
STA와 MTA에 대해 설명해 주시겠습니까? (0) | 2023.05.24 |
시스템 변환.그림그리기.색상에서 RGB 및 16진수 값으로 (0) | 2023.05.24 |
애플리케이션 통찰력 지연? (0) | 2023.05.24 |