queue — Eine synchronisierte Queue-Klasse

Quellcode: Lib/queue.py


Das Modul queue implementiert Multi-Producer, Multi-Consumer Queues. Es ist besonders nützlich in der Thread-Programmierung, wenn Informationen sicher zwischen mehreren Threads ausgetauscht werden müssen. Die Klasse Queue in diesem Modul implementiert alle erforderlichen Sperrsemantiken.

Das Modul implementiert drei Arten von Queues, die sich nur in der Reihenfolge unterscheiden, in der die Einträge abgerufen werden. In einer FIFO-Queue werden die zuerst hinzugefügten Aufgaben zuerst abgerufen. In einer LIFO-Queue wird der zuletzt hinzugefügte Eintrag zuerst abgerufen (funktioniert wie ein Stack). Bei einer Priority-Queue werden die Einträge sortiert gehalten (unter Verwendung des Moduls heapq) und der Eintrag mit dem niedrigsten Wert wird zuerst abgerufen.

Intern verwenden diese drei Arten von Queues Sperren, um konkurrierende Threads vorübergehend zu blockieren; sie sind jedoch nicht dafür ausgelegt, Reentrancy innerhalb eines Threads zu handhaben.

Zusätzlich implementiert das Modul einen „einfachen“ FIFO-Queue-Typ, SimpleQueue, dessen spezifische Implementierung zusätzliche Garantien im Austausch für die geringere Funktionalität bietet.

Das Modul queue definiert die folgenden Klassen und Ausnahmen

class queue.Queue(maxsize=0)

Konstruktor für eine FIFO-Queue. maxsize ist eine Ganzzahl, die die Obergrenze für die Anzahl der Elemente festlegt, die in die Queue gestellt werden können. Das Einfügen blockiert, sobald diese Größe erreicht ist, bis die Queue-Elemente verbraucht sind. Wenn maxsize kleiner oder gleich Null ist, ist die Queue-Größe unendlich.

class queue.LifoQueue(maxsize=0)

Konstruktor für eine LIFO-Queue. maxsize ist eine Ganzzahl, die die Obergrenze für die Anzahl der Elemente festlegt, die in die Queue gestellt werden können. Das Einfügen blockiert, sobald diese Größe erreicht ist, bis die Queue-Elemente verbraucht sind. Wenn maxsize kleiner oder gleich Null ist, ist die Queue-Größe unendlich.

class queue.PriorityQueue(maxsize=0)

Konstruktor für eine Priority-Queue. maxsize ist eine Ganzzahl, die die Obergrenze für die Anzahl der Elemente festlegt, die in die Queue gestellt werden können. Das Einfügen blockiert, sobald diese Größe erreicht ist, bis die Queue-Elemente verbraucht sind. Wenn maxsize kleiner oder gleich Null ist, ist die Queue-Größe unendlich.

Die Einträge mit dem niedrigsten Wert werden zuerst abgerufen (der Eintrag mit dem niedrigsten Wert ist derjenige, der von min(entries) zurückgegeben würde). Ein typisches Muster für Einträge ist ein Tupel in der Form: (prioritätszahl, daten).

Wenn die daten-Elemente nicht vergleichbar sind, können die Daten in einer Klasse verpackt werden, die das Datenelement ignoriert und nur die Prioritätszahl vergleicht.

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

Konstruktor für eine ungebundene FIFO-Queue. Simple Queues haben keine fortgeschrittenen Funktionen wie Task-Tracking.

Hinzugefügt in Version 3.7.

exception queue.Empty

Ausnahme ausgelöst, wenn nicht-blockierend get() (oder get_nowait()) auf ein Queue-Objekt aufgerufen wird, das leer ist.

exception queue.Full

Ausnahme ausgelöst, wenn nicht-blockierend put() (oder put_nowait()) auf ein Queue-Objekt aufgerufen wird, das voll ist.

exception queue.ShutDown

Ausnahme ausgelöst, wenn put() oder get() auf ein Queue-Objekt aufgerufen wird, das heruntergefahren wurde.

Hinzugefügt in Version 3.13.

Queue-Objekte

Queue-Objekte (Queue, LifoQueue oder PriorityQueue) bieten die unten beschriebenen öffentlichen Methoden.

Queue.qsize()

Gibt die ungefähre Größe der Queue zurück. Hinweis: qsize() > 0 garantiert nicht, dass ein nachfolgender get() nicht blockiert, noch garantiert qsize() < maxsize, dass put() nicht blockiert.

Queue.empty()

Gibt True zurück, wenn die Queue leer ist, andernfalls False. Wenn empty() True zurückgibt, garantiert dies nicht, dass ein nachfolgender Aufruf von put() nicht blockiert. Ebenso garantiert empty() <code class="docutils literal notranslate">False nicht, dass ein nachfolgender Aufruf von get() nicht blockiert.

Queue.full()

Gibt True zurück, wenn die Queue voll ist, andernfalls False. Wenn full() True zurückgibt, garantiert dies nicht, dass ein nachfolgender Aufruf von get() nicht blockiert. Ebenso garantiert full() <code class="docutils literal notranslate">False nicht, dass ein nachfolgender Aufruf von put() nicht blockiert.

Queue.put(item, block=True, timeout=None)

Setzt item in die Queue. Wenn die optionalen Argumente block wahr sind und timeout None (Standard) ist, wird bei Bedarf blockiert, bis ein freier Platz verfügbar ist. Wenn timeout eine positive Zahl ist, blockiert es höchstens timeout Sekunden und löst die Ausnahme Full aus, wenn innerhalb dieser Zeit kein freier Platz verfügbar war. Andernfalls (block ist falsch), wird ein Element in die Queue gelegt, wenn sofort ein freier Platz verfügbar ist, andernfalls wird die Ausnahme Full ausgelöst (timeout wird in diesem Fall ignoriert).

Löst ShutDown aus, wenn die Queue heruntergefahren wurde.

Queue.put_nowait(item)

Entspricht put(item, block=False).

Queue.get(block=True, timeout=None)

Entfernt ein Element aus der Queue und gibt es zurück. Wenn die optionalen Argumente block wahr sind und timeout None (Standard) ist, wird bei Bedarf blockiert, bis ein Element verfügbar ist. Wenn timeout eine positive Zahl ist, blockiert es höchstens timeout Sekunden und löst die Ausnahme Empty aus, wenn innerhalb dieser Zeit kein Element verfügbar war. Andernfalls (block ist falsch), wird ein Element zurückgegeben, wenn es sofort verfügbar ist, andernfalls wird die Ausnahme Empty ausgelöst (timeout wird in diesem Fall ignoriert).

Vor Version 3.0 auf POSIX-Systemen und für alle Versionen unter Windows, wenn block wahr ist und timeout None ist, geht diese Operation in eine nicht unterbrechbare Warteschleife an einem darunter liegenden Lock. Das bedeutet, dass keine Ausnahmen auftreten können und insbesondere ein SIGINT keine KeyboardInterrupt auslöst.

Löst ShutDown aus, wenn die Queue heruntergefahren wurde und leer ist oder wenn die Queue sofort heruntergefahren wurde.

Queue.get_nowait()

Entspricht get(False).

Zwei Methoden werden angeboten, um zu verfolgen, ob in die Queue gestellte Aufgaben von Daemon-Verbraucher-Threads vollständig verarbeitet wurden.

Queue.task_done()

Zeigt an, dass eine zuvor in die Queue gestellte Aufgabe abgeschlossen ist. Wird von den Queue-Verbraucher-Threads verwendet. Für jedes get(), das zum Abrufen einer Aufgabe verwendet wird, teilt ein nachfolgender Aufruf von task_done() der Queue mit, dass die Verarbeitung der Aufgabe abgeschlossen ist.

Wenn ein join() gerade blockiert, wird es fortgesetzt, sobald alle Elemente verarbeitet wurden (d. h. ein task_done()-Aufruf für jedes Element empfangen wurde, das in die Queue put() wurde).

Löst einen ValueError aus, wenn er öfter aufgerufen wird, als Elemente in die Queue gelegt wurden.

Queue.join()

Blockiert, bis alle Elemente in der Queue geholt und verarbeitet wurden.

Die Anzahl der unerledigten Aufgaben steigt jedes Mal, wenn ein Element zur Queue hinzugefügt wird. Die Anzahl sinkt jedes Mal, wenn ein Verbraucher-Thread task_done() aufruft, um anzuzeigen, dass das Element abgerufen wurde und alle Arbeiten daran abgeschlossen sind. Wenn die Anzahl der unerledigten Aufgaben auf Null sinkt, wird join() fortgesetzt.

Auf Aufgabenabschluss warten

Beispiel, wie man auf den Abschluss von in die Queue gestellten Aufgaben wartet

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

Queues beenden

Wenn nicht mehr benötigt, können Queue-Objekte bis zur Leerung oder durch sofortiges Beenden mit einem Hard Shutdown heruntergefahren werden.

Queue.shutdown(immediate=False)

Setzt eine Queue-Instanz in den Shutdown-Modus.

Die Queue kann nicht mehr wachsen. Zukünftige Aufrufe von put() lösen ShutDown aus. Aktuell blockierte Aufrufer von put() werden fortgesetzt und lösen ShutDown im zuvor blockierten Thread aus.

Wenn immediate falsch ist (Standard), kann die Queue normal mit get()-Aufrufen zum Extrahieren bereits geladener Aufgaben heruntergefahren werden.

Und wenn task_done() für jede verbleibende Aufgabe aufgerufen wird, wird ein ausstehendes join() normal fortgesetzt.

Sobald die Queue leer ist, lösen zukünftige Aufrufe von get() ShutDown aus.

Wenn immediate wahr ist, wird die Queue sofort beendet. Die Queue wird geleert, um vollständig leer zu sein. Alle Aufrufer von join() werden unabhängig von der Anzahl der unerledigten Aufgaben fortgesetzt. Blockierte Aufrufer von get() werden fortgesetzt und lösen ShutDown aus, da die Queue leer ist.

Seien Sie vorsichtig bei der Verwendung von join() mit immediate auf wahr gesetzt. Dies setzt den Join fort, auch wenn keine Arbeit an den Aufgaben geleistet wurde, was die übliche Invariante für das Beitreten einer Queue verletzt.

Hinzugefügt in Version 3.13.

SimpleQueue-Objekte

SimpleQueue-Objekte bieten die unten beschriebenen öffentlichen Methoden.

SimpleQueue.qsize()

Gibt die ungefähre Größe der Queue zurück. Hinweis: qsize() > 0 garantiert nicht, dass ein nachfolgender get() nicht blockiert.

SimpleQueue.empty()

Gibt True zurück, wenn die Queue leer ist, andernfalls False. Wenn empty() False zurückgibt, garantiert dies nicht, dass ein nachfolgender Aufruf von get() nicht blockiert.

SimpleQueue.put(item, block=True, timeout=None)

Setzt item in die Queue. Die Methode blockiert niemals und gelingt immer (außer bei potenziellen Low-Level-Fehlern wie dem Scheitern der Speicherzuweisung). Die optionalen Argumente block und timeout werden ignoriert und nur zur Kompatibilität mit Queue.put() bereitgestellt.

CPython-Implementierungsdetail: Diese Methode verfügt über eine C-Implementierung, die reentrant ist. Das heißt, ein Aufruf von put() oder get() kann durch einen anderen put()-Aufruf im selben Thread unterbrochen werden, ohne Deadlocks oder interne Zustandskorruption innerhalb der Queue zu verursachen. Dies macht sie für die Verwendung in Destruktoren wie __del__-Methoden oder weakref-Callbacks geeignet.

SimpleQueue.put_nowait(item)

Entspricht put(item, block=False), zur Kompatibilität mit Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)

Entfernt ein Element aus der Queue und gibt es zurück. Wenn die optionalen Argumente block wahr sind und timeout None (Standard) ist, wird bei Bedarf blockiert, bis ein Element verfügbar ist. Wenn timeout eine positive Zahl ist, blockiert es höchstens timeout Sekunden und löst die Ausnahme Empty aus, wenn innerhalb dieser Zeit kein Element verfügbar war. Andernfalls (block ist falsch), wird ein Element zurückgegeben, wenn es sofort verfügbar ist, andernfalls wird die Ausnahme Empty ausgelöst (timeout wird in diesem Fall ignoriert).

SimpleQueue.get_nowait()

Entspricht get(False).

Siehe auch

Klasse multiprocessing.Queue

Eine Queue-Klasse zur Verwendung in einem Multi-Processing- (anstelle eines Multi-Threading-) Kontext.

collections.deque ist eine alternative Implementierung von ungebundenen Queues mit schnellen atomaren append() und popleft() Operationen, die keine Sperren erfordern und auch Indizierung unterstützen.