Warteschlangen¶
Quellcode: Lib/asyncio/queues.py
asyncio-Warteschlangen sind so konzipiert, dass sie den Klassen des queue-Moduls ähneln. Obwohl asyncio-Warteschlangen nicht threadsicher sind, sind sie speziell für die Verwendung in async/await-Code konzipiert.
Beachten Sie, dass die Methoden von asyncio-Warteschlangen keinen timeout-Parameter haben; verwenden Sie die Funktion asyncio.wait_for(), um Warteschlangenoperationen mit einem Timeout durchzuführen.
Siehe auch den Abschnitt Beispiele unten.
Queue¶
- class asyncio.Queue(maxsize=0)¶
Eine First-In, First-Out (FIFO)-Warteschlange.
Wenn maxsize kleiner oder gleich Null ist, ist die Warteschlangengröße unendlich. Wenn es sich um eine ganze Zahl größer als
0handelt, blockiertawait put(), wenn die Warteschlange maxsize erreicht, bis ein Element vonget()entfernt wird.Im Gegensatz zur Standardbibliothek-Threading
queueist die Größe der Warteschlange immer bekannt und kann durch Aufrufen der Methodeqsize()zurückgegeben werden.Geändert in Version 3.10: Das Argument loop wurde entfernt.
Diese Klasse ist nicht threadsicher.
- maxsize¶
Anzahl der in der Warteschlange zulässigen Elemente.
- empty()¶
Gibt
Truezurück, wenn die Warteschlange leer ist, andernfallsFalse.
- full()¶
Gibt
Truezurück, wenn sichmaxsizeElemente in der Warteschlange befinden.Wenn die Warteschlange mit
maxsize=0(Standard) initialisiert wurde, gibtfull()niemalsTruezurück.
- async get()¶
Entfernt ein Element aus der Warteschlange und gibt es zurück. Wenn die Warteschlange leer ist, wird gewartet, bis ein Element verfügbar ist.
Löst
QueueShutDownaus, wenn die Warteschlange heruntergefahren wurde und leer ist, oder wenn die Warteschlange sofort heruntergefahren wurde.
- get_nowait()¶
Gibt ein Element zurück, wenn eines sofort verfügbar ist, andernfalls wird
QueueEmptyausgelöst.
- async join()¶
Blockiert, bis alle Elemente in der Warteschlange empfangen und verarbeitet wurden.
Die Anzahl der unfertigen Aufgaben erhöht sich jedes Mal, wenn ein Element zur Warteschlange hinzugefügt wird. Die Anzahl verringert sich jedes Mal, wenn eine Konsumenten-Coroutine
task_done()aufruft, um anzuzeigen, dass das Element abgerufen und die gesamte Arbeit daran abgeschlossen wurde. Wenn die Anzahl der unfertigen Aufgaben auf Null sinkt, wirdjoin()aufgehoben.
- async put(item)¶
Fügt ein Element in die Warteschlange ein. Wenn die Warteschlange voll ist, wird gewartet, bis ein freier Platz verfügbar ist, bevor das Element hinzugefügt wird.
Löst
QueueShutDownaus, wenn die Warteschlange heruntergefahren wurde.
- put_nowait(item)¶
Fügt ein Element in die Warteschlange ein, ohne zu blockieren.
Wenn kein freier Platz sofort verfügbar ist, wird
QueueFullausgelöst.
- qsize()¶
Gibt die Anzahl der Elemente in der Warteschlange zurück.
- shutdown(immediate=False)¶
Versetzt eine
Queue-Instanz in den Herunterfahr-Modus.Die Warteschlange kann nicht mehr wachsen. Zukünftige Aufrufe von
put()lösenQueueShutDownaus. Aktuell blockierte Aufrufer vonput()werden aufgehoben und lösenQueueShutDownim vormals blockierten Thread aus.Wenn immediate falsch ist (Standard), kann die Warteschlange normal mit
get()-Aufrufen abgewickelt werden, um bereits geladene Aufgaben abzurufen.Und wenn
task_done()für jede verbleibende Aufgabe aufgerufen wird, wird ein ausstehendesjoin()normal aufgehoben.Sobald die Warteschlange leer ist, lösen zukünftige Aufrufe von
get()QueueShutDownaus.Wenn immediate wahr ist, wird die Warteschlange sofort beendet. Die Warteschlange wird geleert, um vollständig leer zu sein. Alle Aufrufer von
join()werden aufgehoben, unabhängig von der Anzahl der unfertigen Aufgaben. Blockierte Aufrufer vonget()werden aufgehoben und lösenQueueShutDownaus, da die Warteschlange leer ist.Seien Sie vorsichtig bei der Verwendung von
join()mit immediate auf wahr gesetzt. Dies hebt die Sperre von join auf, auch wenn keine Arbeit an den Aufgaben geleistet wurde, was gegen die übliche Invariante für das Beitreten einer Warteschlange verstößt.Hinzugefügt in Version 3.13.
- task_done()¶
Zeigt an, dass ein zuvor in die Warteschlange gestelltes Arbeitselement abgeschlossen ist.
Wird von Warteschlangen-Konsumenten verwendet. Für jeden Aufruf von
get()zum Abrufen eines Arbeitselements gibt ein nachfolgender Aufruf vontask_done()an die Warteschlange weiter, dass die Verarbeitung des Arbeitselements abgeschlossen ist.Wenn ein
join()derzeit blockiert, wird es fortgesetzt, wenn alle Elemente verarbeitet wurden (was bedeutet, dass für jedes Element, das in die Warteschlangeput()wurde, eintask_done()-Aufruf empfangen wurde).Löst
ValueErroraus, wenn sie öfter aufgerufen wird, als Elemente in die Warteschlange gelegt wurden.
Prioritätswarteschlange¶
LIFO-Warteschlange¶
Ausnahmen¶
- exception asyncio.QueueEmpty¶
Diese Ausnahme wird ausgelöst, wenn die Methode
get_nowait()für eine leere Warteschlange aufgerufen wird.
- exception asyncio.QueueFull¶
Ausnahme, die ausgelöst wird, wenn die Methode
put_nowait()für eine Warteschlange aufgerufen wird, die ihre maxsize erreicht hat.
Beispiele¶
Warteschlangen können verwendet werden, um Arbeitslasten zwischen mehreren gleichzeitigen Aufgaben zu verteilen
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())