Synchronisationsprimitive

Quellcode: Lib/asyncio/locks.py


asyncio-Synchronisationsprimitive sind so konzipiert, dass sie denen des threading-Moduls ähneln, mit zwei wichtigen Einschränkungen:

  • asyncio-Primitive sind nicht threadsicher, daher sollten sie nicht zur OS-Thread-Synchronisation verwendet werden (verwenden Sie dafür threading);

  • Methoden dieser Synchronisationsprimitive akzeptieren nicht das Argument timeout; verwenden Sie die Funktion asyncio.wait_for(), um Operationen mit Timeouts durchzuführen.

asyncio verfügt über die folgenden grundlegenden Synchronisationsprimitive:


Lock

class asyncio.Lock

Implementiert einen Mutex-Lock für asyncio-Tasks. Nicht threadsicher.

Ein asyncio-Lock kann verwendet werden, um exklusiven Zugriff auf eine gemeinsam genutzte Ressource zu gewährleisten.

Die bevorzugte Methode zur Verwendung eines Locks ist eine async with-Anweisung

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

die äquivalent ist zu

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

Geändert in Version 3.10: Das Argument loop wurde entfernt.

async acquire()

Erwirbt das Lock.

Diese Methode wartet, bis das Lock entsperrt ist, setzt es auf gesperrt und gibt True zurück.

Wenn mehr als eine Koroutine in acquire() blockiert ist und darauf wartet, dass das Lock entsperrt wird, wird nur eine Koroutine schließlich fortgesetzt.

Das Erwerben eines Locks ist fair: Die Koroutine, die fortgesetzt wird, ist die erste Koroutine, die mit dem Warten auf das Lock begonnen hat.

release()

Gibt das Lock frei.

Wenn das Lock gesperrt ist, wird es auf entsperrt zurückgesetzt und die Methode kehrt zurück.

Wenn das Lock entsperrt ist, wird ein RuntimeError ausgelöst.

locked()

Gibt True zurück, wenn das Lock gesperrt ist.

Event

class asyncio.Event

Ein Event-Objekt. Nicht threadsicher.

Ein asyncio-Event kann verwendet werden, um mehrere asyncio-Tasks darüber zu informieren, dass ein bestimmtes Ereignis eingetreten ist.

Ein Event-Objekt verwaltet ein internes Flag, das mit der Methode set() auf true gesetzt und mit der Methode clear() auf false zurückgesetzt werden kann. Die Methode wait() blockiert, bis das Flag auf true gesetzt ist. Das Flag ist anfänglich auf false gesetzt.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Beispiel

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
async wait()

Wartet, bis das Event gesetzt ist.

Wenn das Event gesetzt ist, gibt die Methode sofort True zurück. Andernfalls wird blockiert, bis eine andere Aufgabe set() aufruft.

set()

Setzt das Event.

Alle auf das Setzen des Events wartenden Aufgaben werden sofort geweckt.

clear()

Löscht (setzt auf false) das Event.

Nachfolgende Aufgaben, die auf wait() warten, werden nun blockiert, bis die Methode set() erneut aufgerufen wird.

is_set()

Gibt True zurück, wenn das Event gesetzt ist.

Condition

class asyncio.Condition(lock=None)

Ein Condition-Objekt. Nicht threadsicher.

Ein asyncio-Condition-Primitive kann von einer Aufgabe verwendet werden, um auf ein bestimmtes Ereignis zu warten und dann exklusiven Zugriff auf eine gemeinsam genutzte Ressource zu erhalten.

Im Wesentlichen kombiniert ein Condition-Objekt die Funktionalität eines Event und eines Lock. Es ist möglich, dass mehrere Condition-Objekte sich ein Lock teilen, was die koordinierte exklusive Zugriffsverwaltung auf eine gemeinsam genutzte Ressource zwischen verschiedenen Aufgaben ermöglicht, die an bestimmten Zuständen dieser Ressource interessiert sind.

Das optionale Argument lock muss ein Lock-Objekt oder None sein. Im letzteren Fall wird automatisch ein neues Lock-Objekt erstellt.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Die bevorzugte Methode zur Verwendung eines Conditions ist eine async with-Anweisung.

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

die äquivalent ist zu

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
async acquire()

Erwirbt das zugrunde liegende Lock.

Diese Methode wartet, bis das zugrunde liegende Lock entsperrt ist, setzt es auf gesperrt und gibt True zurück.

notify(n=1)

Weckt n wartende Aufgaben auf (standardmäßig 1). Wenn weniger als n Aufgaben warten, werden sie alle geweckt.

Das Lock muss erworben sein, bevor diese Methode aufgerufen wird, und kurz danach wieder freigegeben werden. Wenn die Methode mit einem entsperrten Lock aufgerufen wird, wird ein RuntimeError ausgelöst.

locked()

Gibt True zurück, wenn das zugrunde liegende Lock erworben wurde.

notify_all()

Weckt alle Aufgaben auf, die auf diese Bedingung warten.

Diese Methode verhält sich wie notify(), weckt jedoch alle wartenden Aufgaben auf.

Das Lock muss erworben sein, bevor diese Methode aufgerufen wird, und kurz danach wieder freigegeben werden. Wenn die Methode mit einem entsperrten Lock aufgerufen wird, wird ein RuntimeError ausgelöst.

release()

Gibt das zugrunde liegende Lock frei.

Wenn die Methode auf einem entsperrten Lock aufgerufen wird, wird ein RuntimeError ausgelöst.

async wait()

Wartet, bis eine Benachrichtigung erfolgt.

Wenn die aufrufende Aufgabe das Lock nicht erworben hat, wenn diese Methode aufgerufen wird, wird ein RuntimeError ausgelöst.

Diese Methode gibt das zugrunde liegende Lock frei und blockiert dann, bis sie durch einen Aufruf von notify() oder notify_all() geweckt wird. Nach dem Aufwecken erwirbt die Bedingung ihr Lock erneut und diese Methode gibt True zurück.

Beachten Sie, dass eine Aufgabe möglicherweise aus diesem Aufruf scheinbar grundlos zurückkehrt. Daher sollte der Aufrufer den Zustand immer erneut prüfen und bereit sein, erneut zu wait()en. Aus diesem Grund ist es möglicherweise besser, stattdessen wait_for() zu verwenden.

async wait_for(predicate)

Wartet, bis eine Bedingung wahr wird.

Die Bedingung muss ein aufrufbares Objekt sein, dessen Ergebnis als boolescher Wert interpretiert wird. Die Methode wird wiederholt wait()en, bis die Bedingung wahr ergibt. Der endgültige Wert ist der Rückgabewert.

Semaphore

class asyncio.Semaphore(value=1)

Ein Semaphore-Objekt. Nicht threadsicher.

Ein Semaphore verwaltet einen internen Zähler, der bei jedem acquire()-Aufruf dekrementiert und bei jedem release()-Aufruf inkrementiert wird. Der Zähler kann niemals unter Null fallen; wenn acquire() feststellt, dass er Null ist, blockiert er und wartet, bis eine andere Aufgabe release() aufruft.

Das optionale Argument value gibt den Anfangswert für den internen Zähler an (standardmäßig 1). Wenn der angegebene Wert kleiner als 0 ist, wird ein ValueError ausgelöst.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Die bevorzugte Methode zur Verwendung einer Semaphore ist eine async with-Anweisung.

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

die äquivalent ist zu

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
async acquire()

Erwirbt eine Semaphor.

Wenn der interne Zähler größer als Null ist, wird er um eins dekrementiert und die Methode gibt sofort True zurück. Wenn er Null ist, wird gewartet, bis ein release() aufgerufen wird, und dann wird True zurückgegeben.

locked()

Gibt True zurück, wenn die Semaphor nicht sofort erworben werden kann.

release()

Gibt eine Semaphor frei und inkrementiert den internen Zähler um eins. Kann eine Aufgabe aufwecken, die darauf wartet, die Semaphor zu erwerben.

Im Gegensatz zu BoundedSemaphore erlaubt Semaphore mehr release()-Aufrufe als acquire()-Aufrufe.

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

Ein Bounded-Semaphore-Objekt. Nicht threadsicher.

Bounded Semaphore ist eine Version von Semaphore, die in release() einen ValueError auslöst, wenn der interne Zähler über den anfänglichen value hinaus erhöht wird.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Barrier

class asyncio.Barrier(parties)

Ein Barrier-Objekt. Nicht threadsicher.

Eine Barriere ist ein einfaches Synchronisationsprimitive, das es ermöglicht, zu blockieren, bis eine bestimmte Anzahl von Parteien darauf wartet. Aufgaben können auf die Methode wait() warten und würden blockiert, bis die angegebene Anzahl von Aufgaben auf wait() wartet. Zu diesem Zeitpunkt werden alle wartenden Aufgaben gleichzeitig freigeschaltet.

async with kann als Alternative zur Wartezeit auf wait() verwendet werden.

Die Barriere kann beliebig oft wiederverwendet werden.

Beispiel

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Das Ergebnis dieses Beispiels ist

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

Hinzugefügt in Version 3.11.

async wait()

Passiert die Barriere. Wenn alle Parteien der Barriere diese Funktion aufgerufen haben, werden sie alle gleichzeitig freigeschaltet.

Wenn eine wartende oder blockierte Aufgabe in der Barriere abgebrochen wird, verlässt diese Aufgabe die Barriere, die im gleichen Zustand verbleibt. Wenn der Zustand der Barriere "füllend" ist, verringert sich die Anzahl der wartenden Aufgaben um 1.

Der Rückgabewert ist eine Ganzzahl im Bereich von 0 bis parties-1, die für jede Aufgabe unterschiedlich ist. Dies kann verwendet werden, um eine Aufgabe für spezielle Aufräumarbeiten auszuwählen, z. B.

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

Diese Methode kann eine Ausnahme vom Typ BrokenBarrierError auslösen, wenn die Barriere zurückgesetzt oder gebrochen wird, während eine Aufgabe wartet. Sie kann eine CancelledError auslösen, wenn eine Aufgabe abgebrochen wird.

async reset()

Setzt die Barriere in den Standardzustand (leer) zurück. Alle darauf wartenden Aufgaben erhalten die Ausnahme BrokenBarrierError.

Wenn eine Barriere gebrochen ist, ist es besser, sie einfach zu belassen und eine neue zu erstellen.

async abort()

Setzt die Barriere in einen gebrochenen Zustand. Dies bewirkt, dass alle aktiven oder zukünftigen Aufrufe von wait() mit der Ausnahme BrokenBarrierError fehlschlagen. Verwenden Sie dies zum Beispiel, wenn eine der Aufgaben abgebrochen werden muss, um unendliches Warten zu vermeiden.

parties

Die Anzahl der Aufgaben, die für das Passieren der Barriere erforderlich sind.

n_waiting

Die Anzahl der Aufgaben, die sich derzeit in der Barriere im Füllzustand befinden.

broken

Ein boolescher Wert, der True ist, wenn die Barriere im gebrochenen Zustand ist.

exception asyncio.BrokenBarrierError

Diese Ausnahme, eine Unterklasse von RuntimeError, wird ausgelöst, wenn das Barrier-Objekt zurückgesetzt oder gebrochen wird.


Geändert in Version 3.9: Das Erwerben eines Locks mit await lock oder yield from lock und/oder der with-Anweisung (with await lock, with (yield from lock)) wurde entfernt. Verwenden Sie stattdessen async with lock.