multiprocessing — Prozessbasierte Parallelität

Quellcode: Lib/multiprocessing/


Verfügbarkeit: nicht Android, nicht iOS, nicht WASI.

Dieses Modul wird auf mobilen Plattformen oder WebAssembly-Plattformen nicht unterstützt.

Einleitung

multiprocessing ist ein Paket, das das Erzeugen von Prozessen mit einer API unterstützt, die der des threading-Moduls ähnelt. Das Paket multiprocessing bietet sowohl lokale als auch entfernte Nebenläufigkeit und umgeht effektiv den Global Interpreter Lock, indem es Unterprozesse anstelle von Threads verwendet. Aus diesem Grund ermöglicht das Modul multiprocessing dem Programmierer, mehrere Prozessoren auf einem gegebenen Computer vollständig auszunutzen. Es läuft sowohl unter POSIX als auch unter Windows.

Das Modul multiprocessing führt auch APIs ein, die keine Entsprechungen im Modul threading haben. Ein Paradebeispiel dafür ist das Objekt Pool, das eine bequeme Möglichkeit bietet, die Ausführung einer Funktion über mehrere Eingabewerte hinweg zu parallelisieren, indem die Eingabedaten auf Prozesse verteilt werden (Datenparallelität). Das folgende Beispiel demonstriert die gängige Praxis, solche Funktionen in einem Modul zu definieren, damit Kindprozesse dieses Modul erfolgreich importieren können. Dieses grundlegende Beispiel für Datenparallelität mit Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

wird auf die Standardausgabe ausgegeben.

[1, 4, 9]

Siehe auch

concurrent.futures.ProcessPoolExecutor bietet eine Schnittstelle auf höherer Ebene, um Aufgaben an einen Hintergrundprozess zu übergeben, ohne die Ausführung des aufrufenden Prozesses zu blockieren. Verglichen mit der direkten Verwendung der Pool-Schnittstelle ermöglicht die API von concurrent.futures die Einreichung von Arbeit an den zugrunde liegenden Prozesspool leichter von der Wartezeit auf die Ergebnisse zu trennen.

Die Klasse Process

In multiprocessing werden Prozesse erzeugt, indem ein Process-Objekt erstellt und dann dessen Methode start() aufgerufen wird. Process folgt der API von threading.Thread. Ein triviales Beispiel für ein Multiprocess-Programm ist

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Um die beteiligten einzelnen Prozess-IDs anzuzeigen, hier ein erweitertes Beispiel.

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Eine Erklärung, warum der Teil if __name__ == '__main__' notwendig ist, finden Sie unter Programmierrichtlinien.

Die Argumente für Process müssen im Kindprozess normalerweise nicht serialisierbar sein. Wenn Sie versuchen, das obige Beispiel direkt in einem REPL einzugeben, kann dies zu einem AttributeError im Kindprozess führen, der versucht, die Funktion *f* im Modul __main__ zu finden.

Kontexte und Startmethoden

Abhängig von der Plattform unterstützt multiprocessing drei Möglichkeiten, einen Prozess zu starten. Diese *Startmethoden* sind

spawn

Der Elternprozess startet einen neuen Python-Interpreter-Prozess. Der Kindprozess erbt nur die Ressourcen, die für die Ausführung der Methode run() des Prozessobjekts notwendig sind. Insbesondere werden unnötige Dateideskriptoren und Handles vom Elternprozess nicht geerbt. Das Starten eines Prozesses mit dieser Methode ist im Vergleich zur Verwendung von *fork* oder *forkserver* ziemlich langsam.

Verfügbar auf POSIX- und Windows-Plattformen. Standard unter Windows und macOS.

fork

Der Elternprozess verwendet os.fork(), um den Python-Interpreter zu verzweigen. Der Kindprozess ist, wenn er beginnt, praktisch identisch mit dem Elternprozess. Alle Ressourcen des Elternteils werden vom Kindprozess geerbt. Beachten Sie, dass das sichere Verzweigen eines Multithread-Prozesses problematisch ist.

Verfügbar auf POSIX-Systemen.

Geändert in Version 3.14: Dies ist auf keiner Plattform mehr die Standard-Startmethode. Code, der *fork* benötigt, muss dies explizit über get_context() oder set_start_method() angeben.

Geändert in Version 3.12: Wenn Python feststellen kann, dass Ihr Prozess mehrere Threads hat, löst die Funktion os.fork(), die diese Startmethode intern aufruft, eine DeprecationWarning aus. Verwenden Sie eine andere Startmethode. Weitere Erläuterungen finden Sie in der Dokumentation zu os.fork().

forkserver

Wenn das Programm startet und die *forkserver*-Startmethode auswählt, wird ein Serverprozess gestartet. Von da an, wenn ein neuer Prozess benötigt wird, verbindet sich der Elternprozess mit dem Server und fordert diesen auf, einen neuen Prozess zu verzweigen. Der Fork-Server-Prozess ist Single-Threaded, es sei denn, Systembibliotheken oder vorab geladene Importe erstellen als Nebeneffekt Threads, sodass er os.fork() sicher verwenden kann. Es werden keine unnötigen Ressourcen geerbt.

Verfügbar auf POSIX-Plattformen, die das Übergeben von Dateideskriptoren über Unix-Pipes unterstützen, wie z.B. Linux. Standard auf diesen.

Geändert in Version 3.14: Dies wurde zur Standard-Startmethode auf POSIX-Plattformen.

Geändert in Version 3.4: *spawn* wurde auf allen POSIX-Plattformen hinzugefügt, und *forkserver* wurde für einige POSIX-Plattformen hinzugefügt. Kindprozesse erben keine alle von den Eltern vererbten Handles unter Windows mehr.

Geändert in Version 3.8: Unter macOS ist die Startmethode *spawn* jetzt Standard. Die Startmethode *fork* sollte als unsicher angesehen werden, da sie zu Abstürzen des Unterprozesses führen kann, da macOS-Systembibliotheken Threads starten können. Siehe bpo-33725.

Geändert in Version 3.14: Auf POSIX-Plattformen wurde die Standard-Startmethode von *fork* auf *forkserver* geändert, um die Leistung beizubehalten, aber gängige Inkompatibilitäten von Multithread-Prozessen zu vermeiden. Siehe gh-84559.

Auf POSIX starten die Startmethoden *spawn* oder *forkserver* auch einen *Ressourcen-Tracker*-Prozess, der die nicht verknüpften benannten Systemressourcen (wie benannte Semaphoren oder SharedMemory-Objekte) verfolgt, die von Prozessen des Programms erstellt wurden. Wenn alle Prozesse beendet sind, verknüpft der Ressourcen-Tracker verbleibende verfolgte Objekte. Normalerweise sollten keine vorhanden sein, aber wenn ein Prozess durch ein Signal getötet wurde, können einige Ressourcen "verloren" gegangen sein. (Weder verlorene Semaphoren noch Shared-Memory-Segmente werden automatisch verknüpft, bis zum nächsten Neustart. Dies ist problematisch für beide Objekte, da das System nur eine begrenzte Anzahl benannter Semaphoren zulässt und Shared-Memory-Segmente etwas Platz im Hauptspeicher belegen.)

Um eine Startmethode auszuwählen, verwenden Sie set_start_method() in der Klausel if __name__ == '__main__' des Hauptmoduls. Zum Beispiel

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() darf im Programm nicht mehr als einmal verwendet werden.

Alternativ können Sie get_context() verwenden, um ein Kontextobjekt zu erhalten. Kontextobjekte haben dieselbe API wie das Modul multiprocessing und ermöglichen die Verwendung mehrerer Startmethoden im selben Programm.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Beachten Sie, dass Objekte, die zu einem Kontext gehören, möglicherweise nicht mit Prozessen eines anderen Kontexts kompatibel sind. Insbesondere können Sperren, die mit dem *fork*-Kontext erstellt wurden, nicht an Prozesse übergeben werden, die mit den Startmethoden *spawn* oder *forkserver* gestartet wurden.

Bibliotheken, die multiprocessing oder ProcessPoolExecutor verwenden, sollten so gestaltet sein, dass sie ihren Benutzern die Bereitstellung eigener Multiprocessing-Kontexte ermöglichen. Die Verwendung eines bestimmten eigenen Kontexts innerhalb einer Bibliothek kann zu Inkompatibilitäten mit der Anwendung des Benutzers der Bibliothek führen. Dokumentieren Sie immer, wenn Ihre Bibliothek eine bestimmte Startmethode erfordert.

Warnung

Die Startmethoden 'spawn' und 'forkserver' können im Allgemeinen nicht mit "eingefrorenen" ausführbaren Dateien (d.h. Binärdateien, die von Paketen wie **PyInstaller** und **cx_Freeze** erstellt wurden) unter POSIX-Systemen verwendet werden. Die Startmethode 'fork' funktioniert möglicherweise, wenn der Code keine Threads verwendet.

Austausch von Objekten zwischen Prozessen

multiprocessing unterstützt zwei Arten von Kommunikationskanälen zwischen Prozessen

Queues

Die Klasse Queue ist eine nahezu exakte Kopie von queue.Queue. Zum Beispiel

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Queues sind Thread- und Prozesssicher. Jedes Objekt, das in eine multiprocessing-Queue eingefügt wird, wird serialisiert.

Pipes

Die Funktion Pipe() gibt ein Paar von Verbindungsobjekten zurück, die durch eine Pipe verbunden sind, die standardmäßig Duplex (bidirektional) ist. Zum Beispiel

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Die beiden von Pipe() zurückgegebenen Verbindungsobjekte repräsentieren die beiden Enden der Pipe. Jedes Verbindungsobjekt verfügt über die Methoden send() und recv() (unter anderem). Beachten Sie, dass Daten in einer Pipe beschädigt werden können, wenn zwei Prozesse (oder Threads) versuchen, gleichzeitig vom *selben* Ende der Pipe zu lesen oder in dieses zu schreiben. Natürlich besteht kein Risiko einer Beschädigung, wenn Prozesse gleichzeitig verschiedene Enden der Pipe verwenden.

Die Methode send() serialisiert das Objekt und recv() erstellt das Objekt neu.

Synchronisation zwischen Prozessen

multiprocessing enthält Entsprechungen für alle Synchronisationsprimitive aus threading. Zum Beispiel kann eine Sperre verwendet werden, um sicherzustellen, dass nur ein Prozess gleichzeitig auf die Standardausgabe schreibt

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Ohne die Sperre ist die Ausgabe aus den verschiedenen Prozessen wahrscheinlich durcheinander.

Gemeinsame Nutzung von Zustand zwischen Prozessen

Wie oben erwähnt, ist es bei der Nebenläufigkeitsprogrammierung normalerweise am besten, die gemeinsame Nutzung von Zustand so weit wie möglich zu vermeiden. Dies gilt insbesondere bei der Verwendung mehrerer Prozesse.

Wenn Sie jedoch tatsächlich Daten gemeinsam nutzen müssen, bietet multiprocessing einige Möglichkeiten, dies zu tun.

Gemeinsamer Speicher

Daten können in einer Shared-Memory-Map mit Value oder Array gespeichert werden. Zum Beispiel der folgende Code

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

wird ausgeben

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Die Argumente 'd' und 'i', die bei der Erstellung von num und arr verwendet werden, sind Typencodes im Stil des Moduls array: 'd' bedeutet ein Gleitkommazahl mit doppelter Genauigkeit und 'i' bedeutet eine vorzeichenbehaftete Ganzzahl. Diese gemeinsamen Objekte sind prozess- und threadsicher.

Für mehr Flexibilität bei der Verwendung von Shared Memory können Sie das Modul multiprocessing.sharedctypes verwenden, das die Erstellung beliebiger ctypes-Objekte unterstützt, die aus Shared Memory zugewiesen werden.

Serverprozess

Ein Manager-Objekt, das von Manager() zurückgegeben wird, steuert einen Serverprozess, der Python-Objekte hält und anderen Prozessen erlaubt, diese über Proxys zu manipulieren.

Ein von Manager() zurückgegebener Manager unterstützt die Typen list, dict, set, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value und Array. Zum Beispiel,

from multiprocessing import Process, Manager

def f(d, l, s):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
    s.add('a')
    s.add('b')

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        s = manager.set()

        p = Process(target=f, args=(d, l, s))
        p.start()
        p.join()

        print(d)
        print(l)
        print(s)

wird ausgeben

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
{'a', 'b'}

Serverprozess-Manager sind flexibler als die Verwendung von Shared-Memory-Objekten, da sie beliebige Objekttypen unterstützen können. Außerdem kann ein einziger Manager von Prozessen auf verschiedenen Computern über ein Netzwerk geteilt werden. Sie sind jedoch langsamer als Shared Memory.

Verwendung eines Pools von Arbeitern

Die Klasse Pool repräsentiert einen Pool von Worker-Prozessen. Sie verfügt über Methoden, die es ermöglichen, Aufgaben auf verschiedene Weise an die Worker-Prozesse auszulagern.

Zum Beispiel

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Beachten Sie, dass die Methoden eines Pools nur von dem Prozess verwendet werden sollten, der ihn erstellt hat.

Hinweis

Funktionalität innerhalb dieses Pakets erfordert, dass das Modul __main__ von den Kindern importiert werden kann. Dies wird unter Programmierrichtlinien behandelt, ist aber hier erwähnenswert. Das bedeutet, dass einige Beispiele, wie die Beispiele für multiprocessing.pool.Pool, im interaktiven Interpreter nicht funktionieren. Zum Beispiel

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(Wenn Sie dies versuchen, werden tatsächlich drei vollständige Tracebacks in einer halbzufälligen Reihenfolge ausgegeben, und dann müssen Sie möglicherweise den Elternprozess irgendwie stoppen.)

Referenz

Das Paket multiprocessing repliziert größtenteils die API des Moduls threading.

Process und Ausnahmen

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process-Objekte repräsentieren Aktivitäten, die in einem separaten Prozess ausgeführt werden. Die Klasse Process hat Entsprechungen für alle Methoden von threading.Thread.

Der Konstruktor sollte immer mit Schlüsselwortargumenten aufgerufen werden. *group* sollte immer None sein; er existiert ausschließlich zur Kompatibilität mit threading.Thread. *target* ist das aufrufbare Objekt, das von der Methode run() aufgerufen wird. Es ist standardmäßig None, was bedeutet, dass nichts aufgerufen wird. *name* ist der Prozessname (siehe name für weitere Details). *args* ist das Argument-Tupel für den Zielaufruf. *kwargs* ist ein Dictionary von Schlüsselwortargumenten für den Zielaufruf. Wenn das schlüsselwort-only Argument *daemon* angegeben ist, wird das daemon-Flag des Prozesses auf True oder False gesetzt. Wenn es None (Standard) ist, wird dieses Flag vom erzeugenden Prozess übernommen.

Standardmäßig werden keine Argumente an *target* übergeben. Das Argument *args*, das standardmäßig () ist, kann verwendet werden, um eine Liste oder ein Tupel der an *target* zu übergebenden Argumente anzugeben.

Wenn eine Unterklasse den Konstruktor überschreibt, muss sie sicherstellen, dass sie den Konstruktor der Basisklasse (super().__init__()) aufruft, bevor sie weitere Aktionen am Prozess vornimmt.

Hinweis

Im Allgemeinen müssen alle Argumente an Process serialisierbar sein. Dies wird häufig beobachtet, wenn versucht wird, ein Process-Objekt zu erstellen oder einen concurrent.futures.ProcessPoolExecutor aus einem REPL mit einer lokal definierten *target*-Funktion zu verwenden.

Die Übergabe eines aufrufbaren Objekts, das in der aktuellen REPL-Sitzung definiert ist, führt dazu, dass der Kindprozess mit einer unbehandelten AttributeError-Ausnahme stirbt, da *target* innerhalb eines importierbaren Moduls definiert sein muss, um beim Deserialisieren geladen zu werden.

Beispiel für diesen nicht abfangbaren Fehler aus dem Kindprozess

>>> import multiprocessing as mp
>>> def knigit():
...     print("Ni!")
...
>>> process = mp.Process(target=knigit)
>>> process.start()
>>> Traceback (most recent call last):
  File ".../multiprocessing/spawn.py", line ..., in spawn_main
  File ".../multiprocessing/spawn.py", line ..., in _main
AttributeError: module '__main__' has no attribute 'knigit'
>>> process
<SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>

Siehe Die Startmethoden spawn und forkserver. Während diese Einschränkung nicht gilt, wenn die Startmethode "fork" verwendet wird, ist dies ab Python 3.14 nicht mehr die Standardmethode auf jeder Plattform. Siehe Kontexte und Startmethoden. Siehe auch gh-132898.

Geändert in Version 3.3: Das Argument *daemon* wurde hinzugefügt.

run()

Methode, die die Aktivität des Prozesses repräsentiert.

Sie können diese Methode in einer Unterklasse überschreiben. Die Standardmethode run() ruft das im Konstruktor des Objekts übergebene aufrufbare Objekt als *target*-Argument auf, falls vorhanden, mit sequenziellen und Schlüsselwortargumenten, die aus den Argumenten *args* bzw. *kwargs* entnommen werden.

Die Verwendung einer Liste oder eines Tupels als Argument *args*, das an Process übergeben wird, erzielt denselben Effekt.

Beispiel

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

Startet die Aktivität des Prozesses.

Dies darf pro Prozessobjekt höchstens einmal aufgerufen werden. Es wird dafür gesorgt, dass die Methode run() des Objekts in einem separaten Prozess aufgerufen wird.

join([timeout])

Wenn das optionale Argument timeout None (Standard) ist, blockiert die Methode, bis der Prozess, dessen join()-Methode aufgerufen wird, beendet ist. Wenn timeout eine positive Zahl ist, blockiert sie höchstens timeout Sekunden. Beachten Sie, dass die Methode None zurückgibt, wenn ihr Prozess beendet wird oder die Methode abläuft. Überprüfen Sie den exitcode des Prozesses, um festzustellen, ob er beendet wurde.

Ein Prozess kann mehrmals gejoint werden.

Ein Prozess kann sich nicht selbst joinen, da dies zu einer Deadlock-Situation führen würde. Es ist ein Fehler, zu versuchen, einen Prozess zu joinen, bevor er gestartet wurde.

name

Der Name des Prozesses. Der Name ist eine Zeichenkette, die nur zur Identifizierung dient. Er hat keine weitere Bedeutung. Mehreren Prozessen kann derselbe Name zugewiesen werden.

Der anfängliche Name wird vom Konstruktor gesetzt. Wenn kein expliziter Name an den Konstruktor übergeben wird, wird ein Name der Form „Process-N1:N2:...:Nk“ gebildet, wobei jedes Nk das N-te Kind seines Elternteils ist.

is_alive()

Gibt zurück, ob der Prozess aktiv ist.

Grob gesagt ist ein Prozessobjekt ab dem Zeitpunkt, an dem die start()-Methode zurückkehrt, bis zum Ende des Kindprozesses aktiv.

daemon

Das Daemon-Flag des Prozesses, ein boolescher Wert. Dies muss gesetzt werden, bevor start() aufgerufen wird.

Der anfängliche Wert wird vom erzeugenden Prozess übernommen.

Wenn ein Prozess beendet wird, versucht er, alle seine Daemon-Kindprozesse zu beenden.

Beachten Sie, dass einem Daemon-Prozess keine Kindprozesse erlaubt sind. Andernfalls würde ein Daemon-Prozess seine Kinder verwaist zurücklassen, wenn er beim Beenden seines Elternprozesses beendet wird. Außerdem sind dies **nicht** Unix-Daemons oder -Dienste, sondern normale Prozesse, die beendet werden (und nicht gejoint werden), wenn nicht-daemmonische Prozesse beendet wurden.

Zusätzlich zur threading.Thread API unterstützen Process-Objekte auch die folgenden Attribute und Methoden

pid

Gibt die Prozess-ID zurück. Bevor der Prozess gestartet wird, ist dies None.

exitcode

Der Exit-Code des Kindes. Dies ist None, wenn der Prozess noch nicht beendet wurde.

Wenn die run()-Methode des Kindes normal zurückgekehrt ist, ist der Exit-Code 0. Wenn sie über sys.exit() mit einem ganzzahligen Argument N beendet wurde, ist der Exit-Code N.

Wenn das Kind aufgrund einer Ausnahme beendet wurde, die nicht innerhalb von run() abgefangen wurde, ist der Exit-Code 1. Wenn es durch das Signal N beendet wurde, ist der Exit-Code der negative Wert -N.

authkey

Der Authentifizierungsschlüssel des Prozesses (eine Byte-Zeichenkette).

Wenn multiprocessing initialisiert wird, erhält der Hauptprozess eine zufällige Zeichenkette über os.urandom().

Wenn ein Process-Objekt erstellt wird, erbt es den Authentifizierungsschlüssel seines Elternprozesses, obwohl dieser durch Zuweisung von authkey an eine andere Byte-Zeichenkette geändert werden kann.

Siehe Authentifizierungsschlüssel.

sentinel

Ein numerischer Handle eines Systemobjekts, das „bereit“ wird, wenn der Prozess endet.

Sie können diesen Wert verwenden, wenn Sie mit multiprocessing.connection.wait() auf mehrere Ereignisse gleichzeitig warten möchten. Andernfalls ist der Aufruf von join() einfacher.

Unter Windows ist dies ein OS-Handle, das mit der API-Familie WaitForSingleObject und WaitForMultipleObjects verwendet werden kann. Unter POSIX ist dies ein Dateideskriptor, der mit primitiven Funktionen aus dem select-Modul verwendet werden kann.

Hinzugefügt in Version 3.3.

interrupt()

Beendet den Prozess. Funktioniert unter POSIX über das Signal SIGINT. Das Verhalten unter Windows ist undefiniert.

Standardmäßig beendet dies den Kindprozess, indem es KeyboardInterrupt auslöst. Dieses Verhalten kann durch Setzen des entsprechenden Signalhandlers in der Kindprozess- signal.signal()-Funktion für SIGINT geändert werden.

Hinweis: Wenn der Kindprozess KeyboardInterrupt abfängt und ignoriert, wird der Prozess nicht beendet.

Hinweis: Das Standardverhalten setzt auch exitcode auf 1, als ob eine nicht abgefangene Ausnahme im Kindprozess aufgetreten wäre. Um einen anderen exitcode zu erhalten, können Sie einfach KeyboardInterrupt abfangen und exit(your_code) aufrufen.

Hinzugefügt in Version 3.14.

terminate()

Beendet den Prozess. Unter POSIX geschieht dies über das Signal SIGTERM; unter Windows wird TerminateProcess() verwendet. Beachten Sie, dass Exit-Handler und finally-Klauseln usw. nicht ausgeführt werden.

Beachten Sie, dass die Nachfolgeprozesse des Prozesses **nicht** beendet werden – sie werden einfach verwaist.

Warnung

Wenn diese Methode verwendet wird, während der zugehörige Prozess eine Pipe oder Warteschlange verwendet, besteht die Gefahr, dass die Pipe oder Warteschlange beschädigt wird und von anderen Prozessen nicht mehr verwendet werden kann. Ebenso, wenn der Prozess eine Sperre oder ein Semaphor usw. erworben hat, kann seine Beendigung dazu führen, dass andere Prozesse in eine Deadlock-Situation geraten.

kill()

Dasselbe wie terminate(), aber unter POSIX mit dem Signal SIGKILL.

Hinzugefügt in Version 3.7.

close()

Schließt das Process-Objekt und gibt alle damit verbundenen Ressourcen frei. ValueError wird ausgelöst, wenn der zugrundeliegende Prozess noch läuft. Sobald close() erfolgreich zurückkehrt, lösen die meisten anderen Methoden und Attribute des Process-Objekts ValueError aus.

Hinzugefügt in Version 3.7.

Beachten Sie, dass die Methoden start(), join(), is_alive(), terminate() und das Attribut exitcode nur von dem Prozess aufgerufen werden sollten, der das Prozessobjekt erstellt hat.

Beispiel für die Verwendung einiger Methoden von Process

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

Die Basisklasse aller multiprocessing-Ausnahmen.

exception multiprocessing.BufferTooShort

Ausnahme, die von Connection.recv_bytes_into() ausgelöst wird, wenn das bereitgestellte Pufferobjekt zu klein für die gelesene Nachricht ist.

Wenn e eine Instanz von BufferTooShort ist, gibt e.args[0] die Nachricht als Byte-Zeichenkette zurück.

exception multiprocessing.AuthenticationError

Ausgelöst bei einem Authentifizierungsfehler.

exception multiprocessing.TimeoutError

Ausgelöst von Methoden mit Timeout, wenn der Timeout abläuft.

Pipes und Queues

Bei der Verwendung mehrerer Prozesse werden normalerweise Nachrichtenübergabeverfahren zur Kommunikation zwischen Prozessen verwendet, um die Verwendung von Synchronisierungsprimitiven wie Sperren zu vermeiden.

Für die Übergabe von Nachrichten kann man Pipe() (für eine Verbindung zwischen zwei Prozessen) oder eine Queue (die mehrere Produzenten und Konsumenten zulässt) verwenden.

Die Typen Queue, SimpleQueue und JoinableQueue sind Multi-Producer, Multi-Consumer FIFO-Queues, die auf der Klasse queue.Queue in der Standardbibliothek basieren. Sie unterscheiden sich dadurch, dass Queue die Methoden task_done() und join() vermisst, die in Pythons queue.Queue-Klasse (ab Python 2.5) eingeführt wurden.

Wenn Sie JoinableQueue verwenden, **müssen** Sie für jede aus der Queue entfernte Aufgabe JoinableQueue.task_done() aufrufen, sonst kann das zur Zählung der unbearbeiteten Aufgaben verwendete Semaphor überlaufen und eine Ausnahme auslösen.

Ein Unterschied zu anderen Python-Queue-Implementierungen ist, dass multiprocessing-Queues alle Objekte, die in sie eingefügt werden, mit pickle serialisieren. Das von der get-Methode zurückgegebene Objekt ist ein neu erstelltes Objekt, das keinen Speicher mit dem Originalobjekt teilt.

Beachten Sie, dass man auch eine gemeinsam genutzte Queue erstellen kann, indem man ein Manager-Objekt verwendet – siehe Manager.

Hinweis

multiprocessing verwendet die üblichen Ausnahmen queue.Empty und queue.Full, um einen Timeout zu signalisieren. Diese sind nicht im multiprocessing-Namespace verfügbar, daher müssen Sie sie aus queue importieren.

Hinweis

Wenn ein Objekt in eine Queue gelegt wird, wird das Objekt gepickelt und ein Hintergrundthread leert später die gepickelten Daten in eine darunterliegende Pipe. Dies hat einige überraschende Konsequenzen, die aber keine praktischen Schwierigkeiten verursachen sollten – wenn sie Sie wirklich stören, können Sie stattdessen eine Queue verwenden, die mit einem Manager erstellt wurde.

  1. Nachdem ein Objekt in eine leere Queue gelegt wurde, kann es zu einer winzigen Verzögerung kommen, bevor die empty()-Methode der Queue False zurückgibt und get_nowait() ohne Auslösen von queue.Empty zurückkehren kann.

  2. Wenn mehrere Prozesse Objekte in die Queue legen, ist es möglich, dass die Objekte am anderen Ende in einer anderen Reihenfolge empfangen werden. Objekte, die vom selben Prozess in die Queue gelegt wurden, sind jedoch immer in Bezug aufeinander in der erwarteten Reihenfolge.

Warnung

Wenn ein Prozess mit Process.terminate() oder os.kill() beendet wird, während er versucht, eine Queue zu verwenden, können die Daten in der Queue beschädigt werden. Dies kann dazu führen, dass jeder andere Prozess eine Ausnahme erhält, wenn er später versucht, die Queue zu verwenden.

Warnung

Wie oben erwähnt, wird ein Kindprozess, der Elemente in eine Queue gelegt hat (und JoinableQueue.cancel_join_thread nicht verwendet hat), erst beendet, wenn alle gepufferten Elemente in die Pipe geleert wurden.

Das bedeutet, dass Sie bei einem Deadlock landen können, wenn Sie versuchen, diesen Prozess zu joinen, es sei denn, Sie sind sicher, dass alle Elemente, die in die Queue gelegt wurden, auch verbraucht wurden. Ebenso, wenn der Kindprozess nicht-daemmonisch ist, kann der Elternprozess beim Beenden hängen bleiben, wenn er versucht, alle seine nicht-daemmonischen Kinder zu joinen.

Beachten Sie, dass eine mit einem Manager erstellte Queue dieses Problem nicht hat. Siehe Programmieranleitungen.

Ein Beispiel für die Verwendung von Queues für die Interprozesskommunikation finden Sie unter Beispiele.

multiprocessing.Pipe([duplex])

Gibt ein Paar (conn1, conn2) von Connection-Objekten zurück, die die Enden einer Pipe darstellen.

Wenn duplex True (Standard) ist, ist die Pipe bidirektional. Wenn duplex False ist, ist die Pipe unidirektional: conn1 kann nur zum Empfangen von Nachrichten verwendet werden und conn2 kann nur zum Senden von Nachrichten verwendet werden.

Die Methode send() serialisiert das Objekt mit pickle und recv() erstellt das Objekt neu.

class multiprocessing.Queue([maxsize])

Gibt eine prozessgeteilte Queue zurück, die über eine Pipe und einige Sperren/Semaphore implementiert ist. Wenn ein Prozess zum ersten Mal ein Element in die Queue legt, wird ein Feeder-Thread gestartet, der Objekte aus einem Puffer in die Pipe überträgt.

Die üblichen Ausnahmen queue.Empty und queue.Full aus dem Modul queue der Standardbibliothek werden ausgelöst, um Timeouts zu signalisieren.

Queue implementiert alle Methoden von queue.Queue außer task_done() und join().

qsize()

Gibt die ungefähre Größe der Queue zurück. Aufgrund von Multithreading-/Multiprocessing-Semantik ist diese Zahl nicht zuverlässig.

Beachten Sie, dass dies auf Plattformen wie macOS, wo sem_getvalue() nicht implementiert ist, eine NotImplementedError auslösen kann.

empty()

Gibt True zurück, wenn die Queue leer ist, andernfalls False. Aufgrund von Multithreading-/Multiprocessing-Semantik ist dies nicht zuverlässig.

Kann bei geschlossenen Queues eine OSError auslösen. (nicht garantiert)

full()

Gibt True zurück, wenn die Queue voll ist, andernfalls False. Aufgrund von Multithreading-/Multiprocessing-Semantik ist dies nicht zuverlässig.

put(obj[, block[, timeout]])

Legt obj in die Queue. Wenn das optionale Argument block True (Standard) ist und timeout None (Standard) ist, wird bei Bedarf blockiert, bis ein freier Platz verfügbar ist. Wenn timeout eine positive Zahl ist, blockiert es höchstens timeout Sekunden und löst die Ausnahme queue.Full aus, wenn innerhalb dieser Zeit kein freier Platz verfügbar war. Andernfalls (block ist False) wird ein Element in die Queue gelegt, wenn ein freier Platz sofort verfügbar ist, andernfalls wird die Ausnahme queue.Full ausgelöst (timeout wird in diesem Fall ignoriert).

Geändert in Version 3.8: Wenn die Queue geschlossen ist, wird anstelle von AssertionError eine ValueError ausgelöst.

put_nowait(obj)

Entspricht put(obj, False).

get([block[, timeout]])

Entfernt und gibt ein Element aus der Queue zurück. Wenn das optionale Argument block True (Standard) ist und timeout None (Standard) ist, wird bei Bedarf blockiert, bis ein Element verfügbar ist. Wenn timeout eine positive Zahl ist, blockiert es höchstens timeout Sekunden und löst die Ausnahme queue.Empty aus, wenn innerhalb dieser Zeit kein Element verfügbar war. Andernfalls (block ist False) wird ein Element zurückgegeben, wenn es sofort verfügbar ist, andernfalls wird die Ausnahme queue.Empty ausgelöst (timeout wird in diesem Fall ignoriert).

Geändert in Version 3.8: Wenn die Queue geschlossen ist, wird anstelle von OSError eine ValueError ausgelöst.

get_nowait()

Entspricht get(False).

multiprocessing.Queue hat einige zusätzliche Methoden, die in queue.Queue nicht vorhanden sind. Diese Methoden sind für die meisten Codes normalerweise nicht erforderlich

close()

Schließt die Queue: Gibt interne Ressourcen frei.

Eine Queue darf nach dem Schließen nicht mehr verwendet werden. Beispielsweise dürfen die Methoden get(), put() und empty() nicht mehr aufgerufen werden.

Der Hintergrundthread wird beendet, sobald alle gepufferten Daten in die Pipe geleert wurden. Dies wird automatisch aufgerufen, wenn die Queue garbage collected wird.

join_thread()

Verbindet den Hintergrundthread. Dies kann nur nach dem Aufruf von close() verwendet werden. Es blockiert, bis der Hintergrundthread beendet ist, und stellt sicher, dass alle Daten im Puffer in die Pipe geleert wurden.

Standardmäßig versucht ein Prozess, der nicht der Ersteller der Queue ist, beim Beenden den Hintergrundthread der Queue zu joinen. Der Prozess kann cancel_join_thread() aufrufen, um join_thread() nichts tun zu lassen.

cancel_join_thread()

Verhindert, dass join_thread() blockiert. Insbesondere verhindert dies, dass der Hintergrundthread beim Beenden des Prozesses automatisch gejoint wird – siehe join_thread().

Ein besserer Name für diese Methode wäre allow_exit_without_flush(). Es ist wahrscheinlich, dass dadurch in die Warteschlange gestellte Daten verloren gehen, und Sie werden sie fast sicher nicht benötigen. Sie ist wirklich nur dafür da, falls Sie den aktuellen Prozess sofort beenden müssen, ohne darauf zu warten, dass in die Warteschlange gestellte Daten an das zugrunde liegende Pipe geleert werden, und Ihnen verlorene Daten egal sind.

Hinweis

Die Funktionalität dieser Klasse erfordert eine funktionierende Implementierung von Shared Semaphoren im Host-Betriebssystem. Ohne diese wird die Funktionalität dieser Klasse deaktiviert, und Versuche, eine Queue zu instanziieren, führen zu einem ImportError. Weitere Informationen finden Sie unter bpo-3770. Dasselbe gilt für alle unten aufgeführten spezialisierten Warteschlangentypen.

class multiprocessing.SimpleQueue

Dies ist ein vereinfachter Queue-Typ, der einer gesperrten Pipe sehr nahe kommt.

close()

Schließt die Queue: Gibt interne Ressourcen frei.

Eine Warteschlange darf nach dem Schließen nicht mehr verwendet werden. Beispielsweise dürfen die Methoden get(), put() und empty() nicht mehr aufgerufen werden.

Hinzugefügt in Version 3.9.

empty()

Gibt True zurück, wenn die Warteschlange leer ist, andernfalls False.

Löst immer eine OSError aus, wenn die SimpleQueue geschlossen ist.

get()

Entfernt und gibt ein Element aus der Warteschlange zurück.

put(item)

Fügt das item in die Warteschlange ein.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, eine Unterklasse von Queue, ist eine Warteschlange, die zusätzlich die Methoden task_done() und join() besitzt.

task_done()

Zeigt an, dass eine zuvor in die Warteschlange gestellte Aufgabe abgeschlossen ist. Wird von Warteschlangen-Konsumenten verwendet. Für jedes mit get() abgerufene Element, teilt ein nachfolgender Aufruf von task_done() der Warteschlange mit, dass die Verarbeitung der Aufgabe abgeschlossen ist.

Wenn eine join() aktuell blockiert, wird sie fortgesetzt, sobald alle Elemente verarbeitet wurden (d. h. ein task_done()-Aufruf für jedes Element empfangen wurde, das in die Warteschlange put() wurde).

Löst einen ValueError aus, wenn öfter aufgerufen als Elemente in die Warteschlange gelegt wurden.

join()

Blockiert, bis alle Elemente in der Warteschlange abgerufen und verarbeitet wurden.

Die Anzahl der noch nicht abgeschlossenen Aufgaben erhöht sich jedes Mal, wenn ein Element zur Warteschlange hinzugefügt wird. Die Anzahl verringert sich jedes Mal, wenn ein Konsument task_done() aufruft, um anzuzeigen, dass das Element abgerufen wurde und alle Arbeiten daran abgeschlossen sind. Wenn die Anzahl der noch nicht abgeschlossenen Aufgaben auf Null fällt, wird join() fortgesetzt.

Sonstiges

multiprocessing.active_children()

Gibt eine Liste aller aktiven Kindprozesse des aktuellen Prozesses zurück.

Der Aufruf hat den Nebeneffekt, dass bereits beendete Prozesse "verknüpft" werden.

multiprocessing.cpu_count()

Gibt die Anzahl der CPUs im System zurück.

Diese Zahl ist nicht gleich der Anzahl der CPUs, die der aktuelle Prozess nutzen kann. Die Anzahl der nutzbaren CPUs kann mit os.process_cpu_count() (oder len(os.sched_getaffinity(0))) ermittelt werden.

Wenn die Anzahl der CPUs nicht ermittelt werden kann, wird ein NotImplementedError ausgelöst.

Geändert in Version 3.13: Der Rückgabewert kann auch durch die Option -X cpu_count oder die Umgebungsvariable PYTHON_CPU_COUNT überschrieben werden, da dies lediglich ein Wrapper um die os CPU-Count-APIs ist.

multiprocessing.current_process()

Gibt das Process-Objekt zurück, das dem aktuellen Prozess entspricht.

Ein Analogon zu threading.current_thread().

multiprocessing.parent_process()

Gibt das Process-Objekt des Elternprozesses von current_process() zurück. Für den Hauptprozess ist parent_process None.

Hinzugefügt in Version 3.8.

multiprocessing.freeze_support()

Fügt Unterstützung hinzu, wenn ein Programm, das multiprocessing verwendet, zu einer ausführbaren Datei gefroren wurde. (Getestet mit **py2exe**, **PyInstaller** und **cx_Freeze**.)

Sie müssen diese Funktion direkt nach der Zeile if __name__ == '__main__' des Hauptmoduls aufrufen. Zum Beispiel

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Wenn die Zeile freeze_support() weggelassen wird, löst der Versuch, die gefrorene ausführbare Datei auszuführen, einen RuntimeError aus.

Der Aufruf von freeze_support() hat keine Auswirkung, wenn die Startmethode nicht spawn ist. Darüber hinaus hat freeze_support() keine Auswirkung, wenn das Modul normal vom Python-Interpreter ausgeführt wird (das Programm wurde nicht gefroren).

multiprocessing.get_all_start_methods()

Gibt eine Liste der unterstützten Startmethoden zurück, wobei die erste die Standardmethode ist. Die möglichen Startmethoden sind 'fork', 'spawn' und 'forkserver'. Nicht alle Plattformen unterstützen alle Methoden. Siehe Kontexte und Startmethoden.

Hinzugefügt in Version 3.4.

multiprocessing.get_context(method=None)

Gibt ein Kontextobjekt zurück, das die gleichen Attribute wie das Modul multiprocessing besitzt.

Wenn method None ist, wird der Standardkontext zurückgegeben. Beachten Sie, dass, wenn die globale Startmethode nicht festgelegt wurde, diese auf die Standardmethode gesetzt wird. Andernfalls sollte method 'fork', 'spawn' oder 'forkserver' sein. Ein ValueError wird ausgelöst, wenn die angegebene Startmethode nicht verfügbar ist. Siehe Kontexte und Startmethoden.

Hinzugefügt in Version 3.4.

multiprocessing.get_start_method(allow_none=False)

Gibt den Namen der Startmethode zurück, die zum Starten von Kindprozessen verwendet wird.

Wenn die globale Startmethode nicht festgelegt wurde und allow_none False ist, wird die Startmethode auf den Standardwert gesetzt und der Name zurückgegeben. Wenn die Startmethode nicht festgelegt wurde und allow_none True ist, wird None zurückgegeben.

Der Rückgabewert kann 'fork', 'spawn', 'forkserver' oder None sein. Siehe Kontexte und Startmethoden.

Hinzugefügt in Version 3.4.

Geändert in Version 3.8: Unter macOS ist die Startmethode spawn jetzt die Standardmethode. Die Startmethode fork sollte als unsicher betrachtet werden, da sie zu Abstürzen des Subprozesses führen kann. Siehe bpo-33725.

multiprocessing.set_executable(executable)

Legt den Pfad des Python-Interpreters fest, der beim Start eines Kindprozesses verwendet werden soll. (Standardmäßig wird sys.executable verwendet). Einbettende Programme müssen wahrscheinlich etwas tun wie

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

bevor sie Kindprozesse erstellen können.

Geändert in Version 3.4: Unter POSIX wird nun die Startmethode 'spawn' unterstützt.

Geändert in Version 3.11: Akzeptiert ein pfadähnliches Objekt.

multiprocessing.set_forkserver_preload(module_names)

Legt eine Liste von Modulnamen fest, die der Forkserver-Hauptprozess zu importieren versucht, damit ihr bereits importierter Zustand von den geforkten Prozessen übernommen wird. Jegliche ImportError wird dabei stillschweigend ignoriert. Dies kann als Leistungssteigerung verwendet werden, um wiederholte Arbeit in jedem Prozess zu vermeiden.

Damit dies funktioniert, muss es aufgerufen werden, bevor der Forkserver-Prozess gestartet wurde (bevor ein Pool erstellt oder ein Process gestartet wird).

Nur sinnvoll, wenn die Startmethode 'forkserver' verwendet wird. Siehe Kontexte und Startmethoden.

Hinzugefügt in Version 3.4.

multiprocessing.set_start_method(method, force=False)

Legt die Methode fest, die zum Starten von Kindprozessen verwendet werden soll. Das Argument method kann 'fork', 'spawn' oder 'forkserver' sein. Löst einen RuntimeError aus, wenn die Startmethode bereits festgelegt wurde und force nicht True ist. Wenn method None ist und force True ist, wird die Startmethode auf None gesetzt. Wenn method None ist und force False ist, wird der Kontext auf den Standardkontext gesetzt.

Beachten Sie, dass dies höchstens einmal aufgerufen werden sollte und innerhalb der Klausel if __name__ == '__main__' des Hauptmoduls geschützt sein sollte.

Siehe Kontexte und Startmethoden.

Hinzugefügt in Version 3.4.

Verbindungsobjekte

Verbindungsobjekte ermöglichen das Senden und Empfangen von serialisierbaren Objekten oder Zeichenketten. Sie können als nachrichtenorientierte verbundene Sockets betrachtet werden.

Verbindungsobjekte werden normalerweise mit Pipe erstellt – siehe auch Listener und Clients.

class multiprocessing.connection.Connection
send(obj)

Sendet ein Objekt an das andere Ende der Verbindung, das mit recv() gelesen werden sollte.

Das Objekt muss serialisierbar sein. Sehr große serialisierte Objekte (ungefähr 32 MiB oder mehr, abhängig vom Betriebssystem) können eine ValueError-Ausnahme auslösen.

recv()

Gibt ein vom anderen Ende der Verbindung mit send() gesendetes Objekt zurück. Blockiert, bis etwas zu empfangen ist. Löst EOFError aus, wenn nichts mehr zu empfangen ist und das andere Ende geschlossen wurde.

fileno()

Gibt den von der Verbindung verwendeten Dateideskriptor oder Handle zurück.

close()

Schließt die Verbindung.

Dies wird automatisch aufgerufen, wenn das Verbindungsobjekt vom Garbage Collector bereinigt wird.

poll([timeout])

Gibt zurück, ob Daten zum Lesen verfügbar sind.

Wenn timeout nicht angegeben ist, wird sofort zurückgegeben. Wenn timeout eine Zahl ist, gibt dies die maximale Zeit in Sekunden an, die blockiert werden soll. Wenn timeout None ist, wird eine unendliche Timeout-Zeit verwendet.

Beachten Sie, dass mehrere Verbindungsobjekte gleichzeitig mit multiprocessing.connection.wait() abgefragt werden können.

send_bytes(buffer[, offset[, size]])

Sendet Byte-Daten aus einem byteähnlichen Objekt als vollständige Nachricht.

Wenn offset angegeben ist, werden die Daten ab dieser Position im buffer gelesen. Wenn size angegeben ist, werden so viele Bytes aus dem Puffer gelesen. Sehr große Puffer (ungefähr 32 MiB+, abhängig vom Betriebssystem) können eine ValueError-Ausnahme auslösen.

recv_bytes([maxlength])

Gibt eine vollständige Nachricht von Byte-Daten zurück, die vom anderen Ende der Verbindung gesendet wurde, als Zeichenkette. Blockiert, bis etwas zu empfangen ist. Löst EOFError aus, wenn nichts mehr zu empfangen ist und das andere Ende geschlossen wurde.

Wenn maxlength angegeben ist und die Nachricht länger als maxlength ist, wird eine OSError ausgelöst und die Verbindung kann nicht mehr gelesen werden.

Geändert in Version 3.3: Diese Funktion löste früher IOError aus, was nun ein Alias für OSError ist.

recv_bytes_into(buffer[, offset])

Liest eine vollständige Nachricht von Byte-Daten, die vom anderen Ende der Verbindung gesendet wurde, in buffer und gibt die Anzahl der Bytes in der Nachricht zurück. Blockiert, bis etwas zu empfangen ist. Löst EOFError aus, wenn nichts mehr zu empfangen ist und das andere Ende geschlossen wurde.

buffer muss ein beschreibbares byteähnliches Objekt sein. Wenn offset angegeben ist, wird die Nachricht ab dieser Position in den Puffer geschrieben. Offset muss eine nicht-negative ganze Zahl kleiner als die Länge von buffer (in Bytes) sein.

Wenn der Puffer zu kurz ist, wird eine BufferTooShort-Ausnahme ausgelöst und die vollständige Nachricht ist als e.args[0] verfügbar, wobei e die Ausnahmeinstanz ist.

Geändert in Version 3.3: Verbindungsobjekte können nun zwischen Prozessen übermittelt werden, indem Connection.send() und Connection.recv() verwendet werden.

Verbindungsobjekte unterstützen nun auch das Protokoll für Kontextmanager – siehe Kontextmanager-Typen. __enter__() gibt das Verbindungsobjekt zurück, und __exit__() ruft close() auf.

Zum Beispiel

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Warnung

Die Methode Connection.recv() deserialisiert die empfangenen Daten automatisch, was ein Sicherheitsrisiko darstellen kann, es sei denn, Sie können dem Prozess, der die Nachricht gesendet hat, vertrauen.

Daher sollten Sie, es sei denn, das Verbindungsobjekt wurde mit Pipe() erstellt, die Methoden recv() und send() nur nach Durchführung einer Art von Authentifizierung verwenden. Siehe Authentifizierungsschlüssel.

Warnung

Wenn ein Prozess beim Lesen oder Schreiben einer Pipe getötet wird, sind die Daten in der Pipe wahrscheinlich beschädigt, da es unmöglich werden kann, die Nachrichtenbegrenzungen sicher zu erkennen.

Synchronisationsprimitive

Im Allgemeinen sind Synchronisationsprimitive in einem Multiprocess-Programm weniger notwendig als in einem Multithread-Programm. Siehe die Dokumentation für das Modul threading.

Beachten Sie, dass Synchronisationsprimitive auch mit einem Manager-Objekt erstellt werden können – siehe Manager.

class multiprocessing.Barrier(parties[, action[, timeout]])

Ein Barrierenobjekt: eine Kopie von threading.Barrier.

Hinzugefügt in Version 3.3.

class multiprocessing.BoundedSemaphore([value])

Ein begrenztes Semaphorobjekt: ein enges Analogon zu threading.BoundedSemaphore.

Ein einziger Unterschied zu seinem engen Analogon besteht darin, dass das erste Argument seiner `acquire`-Methode block genannt wird, wie es mit Lock.acquire() konsistent ist.

Hinweis

Unter macOS ist dies von Semaphore nicht zu unterscheiden, da sem_getvalue() auf dieser Plattform nicht implementiert ist.

class multiprocessing.Condition([lock])

Eine Bedingungsvariable: ein Alias für threading.Condition.

Wenn lock angegeben wird, sollte es sich um ein Lock- oder RLock-Objekt aus multiprocessing handeln.

Geändert in Version 3.3: Die Methode wait_for() wurde hinzugefügt.

class multiprocessing.Event

Eine Kopie von threading.Event.

class multiprocessing.Lock

Ein nicht-rekursives Schloss-Objekt: ein enges Analogon zu threading.Lock. Sobald ein Prozess oder Thread ein Schloss erworben hat, blockieren weitere Versuche, es von einem beliebigen Prozess oder Thread aus zu erwerben, bis es freigegeben wird; jeder Prozess oder Thread kann es freigeben. Die Konzepte und Verhaltensweisen von threading.Lock, wie sie sich auf Threads beziehen, werden hier in multiprocessing.Lock, wie sie sich auf Prozesse oder Threads beziehen, nachgebildet, mit Ausnahme der unten genannten Punkte.

Beachten Sie, dass Lock tatsächlich eine Factory-Funktion ist, die eine Instanz von multiprocessing.synchronize.Lock mit einem Standardkontext zurückgibt.

Lock unterstützt das Protokoll für Kontextmanager und kann daher in with-Anweisungen verwendet werden.

acquire(block=True, timeout=None)

Erwirbt ein Schloss, blockierend oder nicht-blockierend.

Wenn das Argument block auf True (Standard) gesetzt ist, blockiert der Methodenaufruf, bis das Schloss in einem entsperrten Zustand ist, setzt es dann auf gesperrt und gibt True zurück. Beachten Sie, dass der Name dieses ersten Arguments vomjenigen in threading.Lock.acquire() abweicht.

Wenn das Argument block auf False gesetzt ist, blockiert der Methodenaufruf nicht. Wenn das Schloss derzeit in einem gesperrten Zustand ist, gibt False zurück; andernfalls wird das Schloss in einen gesperrten Zustand versetzt und True zurückgegeben.

Bei Aufruf mit einem positiven, Gleitkommawert für timeout wird solange blockiert, wie das Schloss nicht erworben werden kann, maximal jedoch die durch timeout angegebene Anzahl von Sekunden. Aufrufe mit einem negativen Wert für timeout entsprechen einem timeout von Null. Aufrufe mit einem timeout-Wert von None (Standard) setzen die Timeout-Periode auf unendlich. Beachten Sie, dass die Behandlung negativer oder None-Werte für timeout vom implementierten Verhalten in threading.Lock.acquire() abweicht. Das timeout-Argument hat keine praktischen Auswirkungen, wenn das Argument block auf False gesetzt ist und wird daher ignoriert. Gibt True zurück, wenn das Schloss erworben wurde, oder False, wenn die Timeout-Periode abgelaufen ist.

release()

Gibt ein Schloss frei. Dies kann von jedem Prozess oder Thread aufgerufen werden, nicht nur von dem Prozess oder Thread, der das Schloss ursprünglich erworben hat.

Das Verhalten ist dasselbe wie in threading.Lock.release(), außer dass beim Aufruf eines ungesperrten Schlosses ein ValueError ausgelöst wird.

locked()

Gibt einen booleschen Wert zurück, der angibt, ob dieses Objekt gerade gesperrt ist.

Hinzugefügt in Version 3.14.

class multiprocessing.RLock

Ein rekursives Schloss-Objekt: ein enges Analogon zu threading.RLock. Ein rekursives Schloss muss von dem Prozess oder Thread freigegeben werden, der es erworben hat. Sobald ein Prozess oder Thread ein rekursives Schloss erworben hat, kann derselbe Prozess oder Thread es erneut erwerben, ohne zu blockieren. Dieser Prozess oder Thread muss es einmal für jede Erwerbung freigeben.

Beachten Sie, dass RLock tatsächlich eine Factory-Funktion ist, die eine Instanz von multiprocessing.synchronize.RLock mit einem Standardkontext zurückgibt.

RLock unterstützt das Protokoll für Kontextmanager und kann daher in with-Anweisungen verwendet werden.

acquire(block=True, timeout=None)

Erwirbt ein Schloss, blockierend oder nicht-blockierend.

Wenn das Argument block auf True gesetzt ist, wird blockiert, bis das Schloss in einem entsperrten Zustand ist (nicht im Besitz eines Prozesses oder Threads), es sei denn, das Schloss ist bereits im Besitz des aktuellen Prozesses oder Threads. Der aktuelle Prozess oder Thread übernimmt dann den Besitz des Schlosses (falls er ihn nicht bereits hat) und der Rekursionsgrad innerhalb des Schlosses erhöht sich um eins, was zu einem Rückgabewert von True führt. Beachten Sie, dass es mehrere Unterschiede im Verhalten dieses ersten Arguments im Vergleich zur Implementierung von threading.RLock.acquire() gibt, beginnend mit dem Namen des Arguments selbst.

Wenn das Argument block auf False gesetzt ist, wird nicht blockiert. Wenn das Schloss bereits von einem anderen Prozess oder Thread erworben wurde (und somit im Besitz ist), übernimmt der aktuelle Prozess oder Thread keinen Besitz und der Rekursionsgrad innerhalb des Schlosses wird nicht geändert, was zu einem Rückgabewert von False führt. Wenn das Schloss in einem entsperrten Zustand ist, übernimmt der aktuelle Prozess oder Thread den Besitz und der Rekursionsgrad wird erhöht, was zu einem Rückgabewert von True führt.

Die Verwendung und das Verhalten des Arguments timeout sind dieselben wie in Lock.acquire(). Beachten Sie, dass einige dieser Verhaltensweisen von timeout von den implementierten Verhaltensweisen in threading.RLock.acquire() abweichen.

release()

Gibt ein Schloss frei und dekrementiert den Rekursionsgrad. Wenn nach der Dekrementierung der Rekursionsgrad null ist, wird das Schloss auf "entsperrt" (nicht im Besitz eines Prozesses oder Threads) zurückgesetzt. Wenn andere Prozesse oder Threads darauf warten, dass das Schloss entsperrt wird, wird genau einer von ihnen zugelassen. Wenn der Rekursionsgrad nach der Dekrementierung immer noch ungleich null ist, bleibt das Schloss gesperrt und im Besitz des aufrufenden Prozesses oder Threads.

Rufen Sie diese Methode nur auf, wenn der aufrufende Prozess oder Thread das Schloss besitzt. Ein AssertionError wird ausgelöst, wenn diese Methode von einem anderen Prozess oder Thread als dem Besitzer aufgerufen wird oder wenn das Schloss in einem entsperrten (nicht besessenen) Zustand ist. Beachten Sie, dass die Art der in dieser Situation ausgelösten Ausnahme vom implementierten Verhalten in threading.RLock.release() abweicht.

locked()

Gibt einen booleschen Wert zurück, der angibt, ob dieses Objekt gerade gesperrt ist.

Hinzugefügt in Version 3.14.

class multiprocessing.Semaphore([value])

Ein Semaphor-Objekt: ein enges Analogon zu threading.Semaphore.

Ein einziger Unterschied zu seinem engen Analogon besteht darin, dass das erste Argument seiner `acquire`-Methode block genannt wird, wie es mit Lock.acquire() konsistent ist.

Hinweis

Unter macOS wird sem_timedwait nicht unterstützt, daher wird der Aufruf von acquire() mit einem Timeout das Verhalten dieser Funktion durch eine Schleife mit Schlafzeiten emulieren.

Hinweis

Einige Funktionen dieses Pakets erfordern eine funktionierende Implementierung von gemeinsam genutzten Semaphoren im Host-Betriebssystem. Ohne eine solche wird das Modul multiprocessing.synchronize deaktiviert, und Versuche, es zu importieren, führen zu einem ImportError. Siehe bpo-3770 für zusätzliche Informationen.

Gemeinsam genutzte ctypes-Objekte

Es ist möglich, gemeinsam genutzte Objekte mittels Shared Memory zu erstellen, die von Kindprozessen geerbt werden können.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Gibt ein ctypes-Objekt zurück, das aus Shared Memory zugewiesen wird. Standardmäßig ist der Rückgabewert eigentlich ein synchronisierter Wrapper für das Objekt. Das Objekt selbst kann über das Attribut value eines Value-Objekts abgerufen werden.

typecode_or_type bestimmt den Typ des zurückgegebenen Objekts: es ist entweder ein ctypes-Typ oder ein einbuchstabiger Typencode, wie er vom Modul array verwendet wird. *args wird an den Konstruktor für den Typ übergeben.

Wenn lock True (Standard) ist, wird ein neues rekursives Schloss-Objekt erstellt, um den Zugriff auf den Wert zu synchronisieren. Wenn lock ein Lock- oder RLock-Objekt ist, wird dieses zur Synchronisierung des Zugriffs auf den Wert verwendet. Wenn lock False ist, wird der Zugriff auf das zurückgegebene Objekt nicht automatisch durch ein Schloss geschützt, sodass es nicht unbedingt "prozesssicher" ist.

Operationen wie +=, die ein Lesen und Schreiben umfassen, sind nicht atomar. Wenn Sie also beispielsweise einen gemeinsam genutzten Wert atomar inkrementieren möchten, reicht es nicht aus, nur

counter.value += 1

Unter der Annahme, dass das zugehörige Schloss rekursiv ist (was standardmäßig der Fall ist), können Sie stattdessen

with counter.get_lock():
    counter.value += 1

Beachten Sie, dass lock ein Keyword-Only-Argument ist.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Gibt ein ctypes-Array zurück, das aus Shared Memory zugewiesen wird. Standardmäßig ist der Rückgabewert eigentlich ein synchronisierter Wrapper für das Array.

typecode_or_type bestimmt den Typ der Elemente des zurückgegebenen Arrays: es ist entweder ein ctypes-Typ oder ein einbuchstabiger Typencode, wie er vom Modul array verwendet wird. Wenn size_or_initializer eine Ganzzahl ist, bestimmt sie die Länge des Arrays, und das Array wird zunächst mit Nullen gefüllt. Andernfalls ist size_or_initializer eine Sequenz, die zur Initialisierung des Arrays verwendet wird und deren Länge die Länge des Arrays bestimmt.

Wenn lock True (Standard) ist, wird ein neues Schloss-Objekt erstellt, um den Zugriff auf den Wert zu synchronisieren. Wenn lock ein Lock- oder RLock-Objekt ist, wird dieses zur Synchronisierung des Zugriffs auf den Wert verwendet. Wenn lock False ist, wird der Zugriff auf das zurückgegebene Objekt nicht automatisch durch ein Schloss geschützt, sodass es nicht unbedingt "prozesssicher" ist.

Beachten Sie, dass lock ein reines Keyword-Argument ist.

Beachten Sie, dass ein Array von ctypes.c_char die Attribute value und raw hat, die es ermöglichen, es zur Speicherung und zum Abrufen von Zeichenketten zu verwenden.

Das Modul multiprocessing.sharedctypes

Das Modul multiprocessing.sharedctypes stellt Funktionen zur Zuweisung von ctypes-Objekten aus Shared Memory bereit, die von Kindprozessen geerbt werden können.

Hinweis

Obwohl es möglich ist, einen Zeiger im Shared Memory zu speichern, denken Sie daran, dass dieser sich auf eine Stelle im Adressraum eines bestimmten Prozesses bezieht. Der Zeiger ist jedoch wahrscheinlich ungültig im Kontext eines zweiten Prozesses, und der Versuch, den Zeiger aus dem zweiten Prozess zu dereferenzieren, kann zu einem Absturz führen.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Gibt ein ctypes-Array zurück, das aus Shared Memory zugewiesen wird.

typecode_or_type bestimmt den Typ der Elemente des zurückgegebenen Arrays: es ist entweder ein ctypes-Typ oder ein einbuchstabiger Typencode, wie er vom Modul array verwendet wird. Wenn size_or_initializer eine Ganzzahl ist, bestimmt sie die Länge des Arrays, und das Array wird zunächst mit Nullen gefüllt. Andernfalls ist size_or_initializer eine Sequenz, die zur Initialisierung des Arrays verwendet wird und deren Länge die Länge des Arrays bestimmt.

Beachten Sie, dass das Setzen und Abrufen eines Elements potenziell nicht-atomar ist – verwenden Sie stattdessen Array(), um sicherzustellen, dass der Zugriff automatisch mithilfe eines Schlosses synchronisiert wird.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Gibt ein ctypes-Objekt zurück, das aus Shared Memory zugewiesen wird.

typecode_or_type bestimmt den Typ des zurückgegebenen Objekts: es ist entweder ein ctypes-Typ oder ein einbuchstabiger Typencode, wie er vom Modul array verwendet wird. *args wird an den Konstruktor für den Typ übergeben.

Beachten Sie, dass das Setzen und Abrufen des Werts potenziell nicht-atomar ist – verwenden Sie stattdessen Value(), um sicherzustellen, dass der Zugriff automatisch mithilfe eines Schlosses synchronisiert wird.

Beachten Sie, dass ein Array von ctypes.c_char die Attribute value und raw hat, die es ermöglichen, es zur Speicherung und zum Abrufen von Zeichenketten zu verwenden – siehe Dokumentation für ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

Dasselbe wie RawArray(), außer dass je nach Wert von lock möglicherweise ein prozesssicherer Synchronisierungs-Wrapper anstelle eines rohen ctypes-Arrays zurückgegeben wird.

Wenn lock True (Standard) ist, wird ein neues Schloss-Objekt erstellt, um den Zugriff auf den Wert zu synchronisieren. Wenn lock ein Lock- oder RLock-Objekt ist, wird dieses zur Synchronisierung des Zugriffs auf den Wert verwendet. Wenn lock False ist, wird der Zugriff auf das zurückgegebene Objekt nicht automatisch durch ein Schloss geschützt, sodass es nicht unbedingt "prozesssicher" ist.

Beachten Sie, dass lock ein Keyword-Only-Argument ist.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

Dasselbe wie RawValue(), außer dass je nach Wert von lock möglicherweise ein prozesssicherer Synchronisierungs-Wrapper anstelle eines rohen ctypes-Objekts zurückgegeben wird.

Wenn lock True (Standard) ist, wird ein neues Schloss-Objekt erstellt, um den Zugriff auf den Wert zu synchronisieren. Wenn lock ein Lock- oder RLock-Objekt ist, wird dieses zur Synchronisierung des Zugriffs auf den Wert verwendet. Wenn lock False ist, wird der Zugriff auf das zurückgegebene Objekt nicht automatisch durch ein Schloss geschützt, sodass es nicht unbedingt "prozesssicher" ist.

Beachten Sie, dass lock ein Keyword-Only-Argument ist.

multiprocessing.sharedctypes.copy(obj)

Gibt ein ctypes-Objekt zurück, das aus Shared Memory zugewiesen wird und eine Kopie des ctypes-Objekts obj ist.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Gibt ein prozesssicheres Wrapper-Objekt für ein ctypes-Objekt zurück, das lock zur Synchronisierung des Zugriffs verwendet. Wenn lock None (Standard) ist, wird automatisch ein multiprocessing.RLock-Objekt erstellt.

Ein synchronisierter Wrapper hat zusätzlich zu den Methoden des umhüllten Objekts zwei weitere Methoden: get_obj() gibt das umhüllte Objekt zurück und get_lock() gibt das zur Synchronisierung verwendete Schloss-Objekt zurück.

Beachten Sie, dass der Zugriff auf das ctypes-Objekt über den Wrapper viel langsamer sein kann als der Zugriff auf das rohe ctypes-Objekt.

Geändert in Version 3.5: Synchronisierte Objekte unterstützen das Protokoll für Kontextmanager.

Die folgende Tabelle vergleicht die Syntax für die Erstellung von gemeinsam genutzten ctypes-Objekten aus Shared Memory mit der normalen ctypes-Syntax. (In der Tabelle ist MyStruct eine Unterklasse von ctypes.Structure.)

ctypes

sharedctypes mit Typ

sharedctypes mit Typencode

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(‘d’, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(‘h’, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(‘i’, (9, 2, 8))

Im Folgenden finden Sie ein Beispiel, bei dem eine Reihe von ctypes-Objekten von einem Kindprozess geändert wird.

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Die ausgegebenen Ergebnisse sind

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Manager

Manager bieten eine Möglichkeit, Daten zu erstellen, die zwischen verschiedenen Prozessen gemeinsam genutzt werden können, einschließlich der gemeinsamen Nutzung über ein Netzwerk zwischen Prozessen, die auf verschiedenen Rechnern laufen. Ein Manager-Objekt steuert einen Serverprozess, der shared objects verwaltet. Andere Prozesse können auf die shared objects zugreifen, indem sie Proxies verwenden.

multiprocessing.Manager()

Gibt ein gestartetes SyncManager-Objekt zurück, das zum Teilen von Objekten zwischen Prozessen verwendet werden kann. Das zurückgegebene Manager-Objekt entspricht einem gestarteten Kindprozess und verfügt über Methoden, die freigegebene Objekte erstellen und entsprechende Proxys zurückgeben.

Manager-Prozesse werden heruntergefahren, sobald sie garbage collected werden oder ihr Elternprozess beendet wird. Die Manager-Klassen sind im Modul multiprocessing.managers definiert.

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

Erstellt ein BaseManager-Objekt.

Nach der Erstellung sollte start() oder get_server().serve_forever() aufgerufen werden, um sicherzustellen, dass das Manager-Objekt auf einen gestarteten Manager-Prozess verweist.

address ist die Adresse, auf der der Manager-Prozess auf neue Verbindungen wartet. Wenn address None ist, wird eine beliebige Adresse gewählt.

authkey ist der Authentifizierungsschlüssel, der zur Überprüfung der Gültigkeit eingehender Verbindungen zum Serverprozess verwendet wird. Wenn authkey None ist, wird current_process().authkey verwendet. Andernfalls wird authkey verwendet und muss eine Byte-Zeichenkette sein.

serializer muss 'pickle' (verwendet pickle-Serialisierung) oder 'xmlrpclib' (verwendet xmlrpc.client-Serialisierung) sein.

ctx ist ein Kontextobjekt oder None (verwendet den aktuellen Kontext). Siehe die Funktion get_context().

shutdown_timeout ist ein Timeout in Sekunden, das verwendet wird, um zu warten, bis der vom Manager verwendete Prozess in der Methode shutdown() beendet ist. Wenn das Herunterfahren zu lange dauert, wird der Prozess beendet. Wenn das Beenden des Prozesses ebenfalls zu lange dauert, wird der Prozess getötet.

Geändert in Version 3.11: Der Parameter shutdown_timeout wurde hinzugefügt.

start([initializer[, initargs]])

Startet einen Unterprozess, um den Manager zu starten. Wenn initializer nicht None ist, ruft der Unterprozess initializer(*initargs) auf, wenn er startet.

get_server()

Gibt ein Server-Objekt zurück, das den tatsächlichen Server unter der Kontrolle des Managers darstellt. Das Server-Objekt unterstützt die Methode serve_forever().

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server hat zusätzlich ein Attribut address.

connect()

Verbindet ein lokales Manager-Objekt mit einem entfernten Manager-Prozess.

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Stoppt den vom Manager verwendeten Prozess. Dies ist nur verfügbar, wenn start() zum Starten des Serverprozesses verwendet wurde.

Dies kann mehrmals aufgerufen werden.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Eine Klassenmethode, die zur Registrierung eines Typs oder einer aufrufbaren Funktion bei der Manager-Klasse verwendet werden kann.

typeid ist ein „Typ-Identifikator“, der zur Identifizierung eines bestimmten Typs von geteiltem Objekt verwendet wird. Dies muss eine Zeichenkette sein.

callable ist eine aufrufbare Funktion, die zum Erstellen von Objekten für diesen Typ-Identifikator verwendet wird. Wenn eine Manager-Instanz über die Methode connect() mit dem Server verbunden wird oder wenn das Argument create_method False ist, kann dies None bleiben.

proxytype ist eine Unterklasse von BaseProxy, die zum Erstellen von Proxys für geteilte Objekte mit diesem typeid verwendet wird. Wenn None ist, wird automatisch eine Proxy-Klasse erstellt.

exposed wird verwendet, um eine Sequenz von Methodennamen anzugeben, auf die Proxys für diesen typeid über BaseProxy._callmethod() zugreifen dürfen. (Wenn exposed None ist, wird stattdessen proxytype._exposed_ verwendet, falls es existiert.) Wenn keine freigegebene Liste angegeben ist, sind alle „öffentlichen Methoden“ des geteilten Objekts zugänglich. (Hier bedeutet eine „öffentliche Methode“ jedes Attribut, das eine __call__()-Methode hat und dessen Name nicht mit '_' beginnt.)

method_to_typeid ist eine Zuordnung, die verwendet wird, um den Rückgabetyp der freigegebenen Methoden anzugeben, die einen Proxy zurückgeben sollen. Sie ordnet Methodennamen Typ-ID-Zeichenketten zu. (Wenn method_to_typeid None ist, wird stattdessen proxytype._method_to_typeid_ verwendet, falls es existiert.) Wenn der Name einer Methode kein Schlüssel dieser Zuordnung ist oder wenn die Zuordnung None ist, wird das von der Methode zurückgegebene Objekt per Wert kopiert.

create_method bestimmt, ob eine Methode mit dem Namen typeid erstellt werden soll, mit der der Serverprozess angewiesen werden kann, ein neues geteiltes Objekt zu erstellen und einen Proxy dafür zurückzugeben. Standardmäßig ist dies True.

BaseManager-Instanzen verfügen außerdem über eine schreibgeschützte Eigenschaft

address

Die vom Manager verwendete Adresse.

Geändert in Version 3.3: Manager-Objekte unterstützen das Context-Management-Protokoll – siehe Context Manager Types. __enter__() startet den Serverprozess (falls er noch nicht gestartet wurde) und gibt dann das Manager-Objekt zurück. __exit__() ruft shutdown() auf.

In früheren Versionen startete __enter__() den Serverprozess des Managers nicht, wenn dieser noch nicht gestartet war.

class multiprocessing.managers.SyncManager

Eine Unterklasse von BaseManager, die für die Synchronisation von Prozessen verwendet werden kann. Objekte dieses Typs werden von multiprocessing.Manager() zurückgegeben.

Seine Methoden erstellen und geben Proxy-Objekte für eine Reihe häufig verwendeter Datentypen zurück, die über Prozesse hinweg synchronisiert werden sollen. Dazu gehören insbesondere geteilte Listen und Dictionaries.

Barrier(parties[, action[, timeout]])

Erstellt ein geteiltes threading.Barrier-Objekt und gibt einen Proxy dafür zurück.

Hinzugefügt in Version 3.3.

BoundedSemaphore([value])

Erstellt ein geteiltes threading.BoundedSemaphore-Objekt und gibt einen Proxy dafür zurück.

Condition([lock])

Erstellt ein geteiltes threading.Condition-Objekt und gibt einen Proxy dafür zurück.

Wenn lock übergeben wird, sollte es ein Proxy für ein threading.Lock- oder threading.RLock-Objekt sein.

Geändert in Version 3.3: Die Methode wait_for() wurde hinzugefügt.

Event()

Erstellt ein geteiltes threading.Event-Objekt und gibt einen Proxy dafür zurück.

Lock()

Erstellt ein geteiltes threading.Lock-Objekt und gibt einen Proxy dafür zurück.

Namespace()

Erstellt ein geteiltes Namespace-Objekt und gibt einen Proxy dafür zurück.

Queue([maxsize])

Erstellt eine geteilte queue.Queue und gibt einen Proxy dafür zurück.

RLock()

Erstellt ein geteiltes threading.RLock-Objekt und gibt einen Proxy dafür zurück.

Semaphore([value])

Erstellt ein geteiltes threading.Semaphore-Objekt und gibt einen Proxy dafür zurück.

Array(typecode, sequence)

Erstellt ein Array und gibt einen Proxy dafür zurück.

Value(typecode, value)

Erstellt ein Objekt mit einem beschreibbaren Attribut value und gibt einen Proxy dafür zurück.

dict()
dict(mapping)
dict(sequence)

Erstellt ein geteiltes dict-Objekt und gibt einen Proxy dafür zurück.

list()
list(sequence)

Erstellt eine geteilte list und gibt einen Proxy dafür zurück.

set()
set(sequence)
set(mapping)

Erstellt ein geteiltes set-Objekt und gibt einen Proxy dafür zurück.

Hinzugefügt in Version 3.14: Unterstützung für set wurde hinzugefügt.

Geändert in Version 3.6: Geteilte Objekte können verschachtelt werden. Zum Beispiel kann ein geteiltes Container-Objekt wie eine geteilte Liste andere geteilte Objekte enthalten, die alle vom SyncManager verwaltet und synchronisiert werden.

class multiprocessing.managers.Namespace

Ein Typ, der bei SyncManager registriert werden kann.

Ein Namespace-Objekt hat keine öffentlichen Methoden, aber beschreibbare Attribute. Seine Darstellung zeigt die Werte seiner Attribute.

Wenn jedoch ein Proxy für ein Namespace-Objekt verwendet wird, ist ein Attribut, das mit '_' beginnt, ein Attribut des Proxys und nicht ein Attribut des Referenten.

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Angepasste Manager

Um einen eigenen Manager zu erstellen, erstellt man eine Unterklasse von BaseManager und verwendet die Klassenmethode register(), um neue Typen oder aufrufbare Funktionen beim Manager zu registrieren. Zum Beispiel

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

Verwendung eines entfernten Managers

Es ist möglich, einen Manager-Server auf einer Maschine laufen zu lassen und Clients von anderen Maschinen darauf zugreifen zu lassen (vorausgesetzt, die beteiligten Firewalls lassen dies zu).

Das Ausführen der folgenden Befehle erstellt einen Server für eine einzelne geteilte Warteschlange, auf die entfernte Clients zugreifen können.

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Ein Client kann wie folgt auf den Server zugreifen:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Ein anderer Client kann ihn ebenfalls verwenden:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Lokale Prozesse können auch auf diese Warteschlange zugreifen, indem sie den obigen Code auf dem Client verwenden, um remote darauf zuzugreifen.

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Proxy-Objekte

Ein Proxy ist ein Objekt, das sich auf ein geteiltes Objekt bezieht, das (vermutlich) in einem anderen Prozess lebt. Das geteilte Objekt wird als Referent des Proxys bezeichnet. Mehrere Proxy-Objekte können denselben Referenten haben.

Ein Proxy-Objekt hat Methoden, die entsprechende Methoden seines Referenten aufrufen (obwohl nicht jede Methode des Referenten notwendigerweise über den Proxy verfügbar ist). Auf diese Weise kann ein Proxy genauso verwendet werden wie sein Referent.

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Beachten Sie, dass die Anwendung von str() auf einen Proxy die Darstellung des Referenten zurückgibt, während die Anwendung von repr() die Darstellung des Proxys zurückgibt.

Ein wichtiges Merkmal von Proxy-Objekten ist, dass sie pickelbar sind, damit sie zwischen Prozessen übergeben werden können. Daher kann ein Referent Proxy-Objekte enthalten. Dies ermöglicht die Verschachtelung dieser verwalteten Listen, Dictionaries und anderer Proxy-Objekte.

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Ebenso können Dictionary- und Listen-Proxys ineinander verschachtelt werden.

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Wenn Standard- (Nicht-Proxy-) list- oder dict-Objekte in einem Referenten enthalten sind, werden Änderungen an diesen veränderlichen Werten nicht über den Manager weitergegeben, da der Proxy nicht weiß, wann die darin enthaltenen Werte geändert werden. Das Speichern eines Wertes in einem Container-Proxy (was ein __setitem__ auf dem Proxy-Objekt auslöst) wird jedoch über den Manager und damit an den Referenten weitergegeben. Um ein solches Element effektiv zu ändern, könnte man den geänderten Wert dem Container-Proxy neu zuweisen.

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

Dieser Ansatz ist für die meisten Anwendungsfälle vielleicht weniger bequem als die Verwendung verschachtelter Proxy-Objekte, zeigt aber auch ein gewisses Maß an Kontrolle über die Synchronisation.

Hinweis

Die Proxy-Typen in multiprocessing tun nichts, um Vergleiche nach Wert zu unterstützen. So haben wir zum Beispiel:

>>> manager.list([1,2,3]) == [1,2,3]
False

Man sollte einfach eine Kopie des Referenten verwenden, wenn man Vergleiche durchführt.

class multiprocessing.managers.BaseProxy

Proxy-Objekte sind Instanzen von Unterklassen von BaseProxy.

_callmethod(methodname[, args[, kwds]])

Ruft eine Methode des Referenten des Proxys auf und gibt deren Ergebnis zurück.

Wenn proxy ein Proxy ist, dessen Referent obj ist, dann wird der Ausdruck

proxy._callmethod(methodname, args, kwds)

wertet den Ausdruck aus

getattr(obj, methodname)(*args, **kwds)

im Prozess des Managers.

Der zurückgegebene Wert ist eine Kopie des Ergebnisses des Aufrufs oder ein Proxy für ein neues geteiltes Objekt – siehe Dokumentation für das Argument method_to_typeid von BaseManager.register().

Wenn eine Ausnahme durch den Aufruf ausgelöst wird, wird sie von _callmethod() erneut ausgelöst. Wenn eine andere Ausnahme im Prozess des Managers ausgelöst wird, wird diese in eine RemoteError-Ausnahme umgewandelt und von _callmethod() ausgelöst.

Beachten Sie insbesondere, dass eine Ausnahme ausgelöst wird, wenn methodname nicht exposed wurde.

Ein Beispiel für die Verwendung von _callmethod()

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Gibt eine Kopie des Referenten zurück.

Wenn der Referent nicht pickelbar ist, löst dies eine Ausnahme aus.

__repr__()

Gibt eine Darstellung des Proxy-Objekts zurück.

__str__()

Gibt die Darstellung des Referenten zurück.

Bereinigung

Ein Proxy-Objekt verwendet einen Weakref-Callback, so dass es sich beim Garbage-Collecting selbst vom Manager abmeldet, dem sein Referent gehört.

Ein geteiltes Objekt wird aus dem Manager-Prozess gelöscht, wenn keine Proxys mehr darauf verweisen.

Prozess-Pools

Man kann einen Pool von Prozessen erstellen, die Aufgaben, die an ihn übergeben werden, mit der Klasse Pool ausführen.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Ein Prozess-Pool-Objekt, das einen Pool von Worker-Prozessen steuert, an die Jobs übergeben werden können. Es unterstützt asynchrone Ergebnisse mit Timeouts und Callbacks und verfügt über eine parallele Map-Implementierung.

processes ist die Anzahl der zu verwendenden Worker-Prozesse. Wenn processes None ist, wird die von os.process_cpu_count() zurückgegebene Anzahl verwendet.

Wenn initializer nicht None ist, ruft jeder Worker-Prozess initializer(*initargs) auf, wenn er startet.

maxtasksperchild ist die Anzahl der Aufgaben, die ein Worker-Prozess abschließen kann, bevor er beendet und durch einen neuen Worker-Prozess ersetzt wird, um ungenutzte Ressourcen freizugeben. Der Standardwert für maxtasksperchild ist None, was bedeutet, dass Worker-Prozesse so lange leben wie der Pool.

context kann verwendet werden, um den Kontext anzugeben, der zum Starten der Worker-Prozesse verwendet wird. Normalerweise wird ein Pool über die Funktion multiprocessing.Pool() oder die Methode Pool() eines Kontextobjekts erstellt. In beiden Fällen wird context entsprechend gesetzt.

Beachten Sie, dass die Methoden des Pool-Objekts nur von dem Prozess aufgerufen werden sollten, der den Pool erstellt hat.

Warnung

multiprocessing.pool Objekte haben interne Ressourcen, die ordnungsgemäß verwaltet werden müssen (wie jede andere Ressource), indem der Pool als Kontextmanager verwendet wird oder indem close() und terminate() manuell aufgerufen werden. Wenn dies nicht geschieht, kann der Prozess beim Beenden hängen bleiben.

Beachten Sie, dass es nicht korrekt ist, sich auf den Garbage Collector zu verlassen, um den Pool zu zerstören, da CPython nicht garantiert, dass der Finalizer des Pools aufgerufen wird (siehe object.__del__() für weitere Informationen).

Geändert in Version 3.2: Der Parameter maxtasksperchild wurde hinzugefügt.

Geändert in Version 3.4: Der Parameter context wurde hinzugefügt.

Geändert in Version 3.13: processes verwendet standardmäßig os.process_cpu_count() anstelle von os.cpu_count().

Hinweis

Worker-Prozesse innerhalb eines Pool leben typischerweise für die gesamte Dauer der Arbeitswarteschlange des Pools. Ein häufiges Muster in anderen Systemen (wie Apache, mod_wsgi, etc.), um von Arbeitern gehaltene Ressourcen freizugeben, besteht darin, einem Arbeiter innerhalb eines Pools nur eine bestimmte Menge an Arbeit abzuschließen, bevor er beendet, bereinigt und ein neuer Prozess zur Ersetzung des alten gestartet wird. Das Argument maxtasksperchild zum Pool macht diese Fähigkeit dem Endbenutzer zugänglich.

apply(func[, args[, kwds]])

Ruft func mit den Argumenten args und Schlüsselwortargumenten kwds auf. Es blockiert, bis das Ergebnis bereit ist. Da dies blockiert, ist apply_async() besser geeignet, um parallele Arbeiten auszuführen. Zusätzlich wird func nur in einem der Worker des Pools ausgeführt.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Eine Variante der Methode apply(), die ein AsyncResult-Objekt zurückgibt.

Wenn callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn das Ergebnis verfügbar ist, wird callback darauf angewendet, es sei denn, der Aufruf ist fehlgeschlagen, in welchem Fall error_callback stattdessen angewendet wird.

Wenn error_callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn die Ziel-Funktion fehlschlägt, wird error_callback mit der Ausnahmeinstanz aufgerufen.

Callbacks sollten sofort abgeschlossen werden, da sonst der Thread, der die Ergebnisse verarbeitet, blockiert wird.

map(func, iterable[, chunksize])

Ein paralleles Äquivalent der eingebauten Funktion map() (es unterstützt nur ein iterable Argument, für mehrere Iterables siehe starmap()). Es blockiert, bis das Ergebnis bereit ist.

Diese Methode teilt das Iterable in eine Anzahl von Blöcken auf, die als separate Aufgaben an den Prozesspool übermittelt werden. Die (ungefähre) Größe dieser Blöcke kann durch Setzen von chunksize auf eine positive Ganzzahl festgelegt werden.

Beachten Sie, dass dies bei sehr langen Iterables zu hohem Speicherverbrauch führen kann. Erwägen Sie die Verwendung von imap() oder imap_unordered() mit expliziter chunksize Option für bessere Effizienz.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Eine Variante der Methode map(), die ein AsyncResult-Objekt zurückgibt.

Wenn callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn das Ergebnis verfügbar ist, wird callback darauf angewendet, es sei denn, der Aufruf ist fehlgeschlagen, in welchem Fall error_callback stattdessen angewendet wird.

Wenn error_callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn die Ziel-Funktion fehlschlägt, wird error_callback mit der Ausnahmeinstanz aufgerufen.

Callbacks sollten sofort abgeschlossen werden, da sonst der Thread, der die Ergebnisse verarbeitet, blockiert wird.

imap(func, iterable[, chunksize])

Eine träge Version von map().

Das Argument chunksize ist dasselbe wie das, das von der Methode map() verwendet wird. Bei sehr langen Iterables kann die Verwendung eines großen Wertes für chunksize dazu führen, dass der Job viel schneller abgeschlossen wird als bei Verwendung des Standardwerts von 1.

Auch wenn chunksize 1 ist, hat die Methode next() des von der Methode imap() zurückgegebenen Iterators einen optionalen timeout-Parameter: next(timeout) löst multiprocessing.TimeoutError aus, wenn das Ergebnis nicht innerhalb von timeout Sekunden zurückgegeben werden kann.

imap_unordered(func, iterable[, chunksize])

Das Gleiche wie imap(), nur dass die Reihenfolge der Ergebnisse des zurückgegebenen Iterators als willkürlich betrachtet werden sollte. (Nur wenn es nur einen Worker-Prozess gibt, ist die Reihenfolge garantiert "korrekt".)

starmap(func, iterable[, chunksize])

Wie map(), nur dass die Elemente des iterable als Iterables erwartet werden, die als Argumente entpackt werden.

Daher ergibt ein iterable von [(1,2), (3, 4)] [func(1,2), func(3,4)].

Hinzugefügt in Version 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Eine Kombination aus starmap() und map_async(), die über ein iterable von Iterables iteriert und func mit den entpackten Iterables aufruft. Gibt ein Ergebnisobjekt zurück.

Hinzugefügt in Version 3.3.

close()

Verhindert, dass weitere Aufgaben an den Pool übermittelt werden. Sobald alle Aufgaben abgeschlossen sind, werden die Worker-Prozesse beendet.

terminate()

Stoppt die Worker-Prozesse sofort, ohne ausstehende Arbeit abzuschließen. Wenn das Pool-Objekt vom Garbage Collector eingesammelt wird, wird terminate() sofort aufgerufen.

join()

Wartet darauf, dass die Worker-Prozesse beendet werden. close() oder terminate() muss aufgerufen werden, bevor join() verwendet werden kann.

Geändert in Version 3.3: Pool-Objekte unterstützen nun das Kontextmanager-Protokoll – siehe Context Manager Types. __enter__() gibt das Pool-Objekt zurück, und __exit__() ruft terminate() auf.

class multiprocessing.pool.AsyncResult

Die Klasse des Ergebnisses, das von Pool.apply_async() und Pool.map_async() zurückgegeben wird.

get([timeout])

Gibt das Ergebnis zurück, wenn es eintrifft. Wenn timeout nicht None ist und das Ergebnis nicht innerhalb von timeout Sekunden eintrifft, wird multiprocessing.TimeoutError ausgelöst. Wenn der Remote-Aufruf eine Ausnahme ausgelöst hat, wird diese Ausnahme von get() erneut ausgelöst.

wait([timeout])

Wartet, bis das Ergebnis verfügbar ist oder bis timeout Sekunden vergangen sind.

ready()

Gibt zurück, ob der Aufruf abgeschlossen wurde.

successful()

Gibt zurück, ob der Aufruf ohne Auslösung einer Ausnahme abgeschlossen wurde. Löst ValueError aus, wenn das Ergebnis noch nicht bereit ist.

Geändert in Version 3.7: Wenn das Ergebnis noch nicht bereit ist, wird ValueError anstelle von AssertionError ausgelöst.

Das folgende Beispiel demonstriert die Verwendung eines Pools

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

Listener und Clients

Normalerweise wird die Nachrichtenübertragung zwischen Prozessen über Queues oder durch die Verwendung von Connection-Objekten, die von Pipe() zurückgegeben werden, durchgeführt.

Das Modul multiprocessing.connection bietet jedoch zusätzliche Flexibilität. Es bietet im Grunde eine High-Level-Nachrichten-API für den Umgang mit Sockets oder Windows-Named Pipes. Es unterstützt auch Digest-Authentifizierung mithilfe des Moduls hmac und das gleichzeitige Polling mehrerer Verbindungen.

multiprocessing.connection.deliver_challenge(connection, authkey)

Sendet eine zufällig generierte Nachricht an das andere Ende der Verbindung und wartet auf eine Antwort.

Wenn die Antwort dem Digest der Nachricht mit authkey als Schlüssel entspricht, wird eine Willkommensnachricht an das andere Ende der Verbindung gesendet. Andernfalls wird AuthenticationError ausgelöst.

multiprocessing.connection.answer_challenge(connection, authkey)

Empfängt eine Nachricht, berechnet den Digest der Nachricht mit authkey als Schlüssel und sendet dann den Digest zurück.

Wenn keine Willkommensnachricht empfangen wird, wird AuthenticationError ausgelöst.

multiprocessing.connection.Client(address[, family[, authkey]])

Versucht, eine Verbindung zum Listener herzustellen, der die Adresse address verwendet, und gibt eine Connection zurück.

Der Typ der Verbindung wird durch das Argument family bestimmt, aber dies kann generell weggelassen werden, da es normalerweise aus dem Format von address abgeleitet werden kann. (Siehe Adressformate)

Wenn authkey angegeben und nicht None ist, sollte es ein Byte-String sein und als geheimer Schlüssel für eine HMAC-basierte Authentifizierungsaufforderung verwendet werden. Wenn authkey None ist, findet keine Authentifizierung statt. AuthenticationError wird ausgelöst, wenn die Authentifizierung fehlschlägt. Siehe Authentifizierungsschlüssel.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Eine Wrapper-Klasse für einen gebundenen Socket oder eine Windows-Named-Pipe, die auf Verbindungen "lauscht".

address ist die Adresse, die vom gebundenen Socket oder der Named-Pipe des Listener-Objekts verwendet wird.

Hinweis

Wenn die Adresse '0.0.0.0' verwendet wird, ist die Adresse unter Windows kein verbindungsfähiger Endpunkt. Wenn Sie einen verbindungsfähigen Endpunkt benötigen, sollten Sie '127.0.0.1' verwenden.

family ist der Typ des Sockets (oder der Named-Pipe). Dies kann einer der Strings 'AF_INET' (für einen TCP-Socket), 'AF_UNIX' (für einen Unix-Domain-Socket) oder 'AF_PIPE' (für eine Windows-Named-Pipe) sein. Von diesen ist nur der erste garantiert verfügbar. Wenn family None ist, wird die Familie aus dem Format von address abgeleitet. Wenn address ebenfalls None ist, wird ein Standardwert gewählt. Dieser Standardwert ist die Familie, die als die schnellste verfügbare angenommen wird. Siehe Adressformate. Beachten Sie, dass, wenn family 'AF_UNIX' ist und address None ist, der Socket in einem privaten temporären Verzeichnis erstellt wird, das mit tempfile.mkstemp() erstellt wurde.

Wenn das Listener-Objekt einen Socket verwendet, wird backlog (standardmäßig 1) an die Methode listen() des Sockets übergeben, sobald dieser gebunden wurde.

Wenn authkey angegeben und nicht None ist, sollte es ein Byte-String sein und als geheimer Schlüssel für eine HMAC-basierte Authentifizierungsaufforderung verwendet werden. Wenn authkey None ist, findet keine Authentifizierung statt. AuthenticationError wird ausgelöst, wenn die Authentifizierung fehlschlägt. Siehe Authentifizierungsschlüssel.

accept()

Akzeptiert eine Verbindung auf dem gebundenen Socket oder der Named-Pipe des Listener-Objekts und gibt ein Connection-Objekt zurück. Wenn die Authentifizierung versucht und fehlschlägt, wird AuthenticationError ausgelöst.

close()

Schließt den gebundenen Socket oder die Named-Pipe des Listener-Objekts. Dies wird automatisch aufgerufen, wenn der Listener vom Garbage Collector eingesammelt wird. Es ist jedoch ratsam, ihn explizit aufzurufen.

Listener-Objekte haben die folgenden schreibgeschützten Eigenschaften

address

Die Adresse, die vom Listener-Objekt verwendet wird.

last_accepted

Die Adresse, von der die zuletzt akzeptierte Verbindung kam. Wenn dies nicht verfügbar ist, ist es None.

Geändert in Version 3.3: Listener-Objekte unterstützen nun das Kontextmanager-Protokoll – siehe Context Manager Types. __enter__() gibt das Listener-Objekt zurück, und __exit__() ruft close() auf.

multiprocessing.connection.wait(object_list, timeout=None)

Wartet, bis ein Objekt in object_list bereit ist. Gibt die Liste der Objekte in object_list zurück, die bereit sind. Wenn timeout ein Float ist, blockiert der Aufruf für höchstens so viele Sekunden. Wenn timeout None ist, blockiert er unbegrenzt. Ein negativer Timeout ist gleichbedeutend mit einem Null-Timeout.

Sowohl unter POSIX als auch unter Windows kann ein Objekt in object_list erscheinen, wenn es

Eine Verbindung oder ein Socket-Objekt ist bereit, wenn Daten zum Lesen verfügbar sind oder das andere Ende geschlossen wurde.

POSIX: wait(object_list, timeout) ist fast äquivalent zu select.select(object_list, [], [], timeout). Der Unterschied besteht darin, dass, wenn select.select() durch ein Signal unterbrochen wird, es OSError mit der Fehlernummer EINTR auslösen kann, während wait() dies nicht tut.

Windows: Ein Element in object_list muss entweder ein integer Handle sein, das wartbar ist (gemäß der Definition in der Dokumentation der Win32-Funktion WaitForMultipleObjects()), oder es kann ein Objekt mit einer Methode fileno() sein, die ein Socket-Handle oder ein Pipe-Handle zurückgibt. (Beachten Sie, dass Pipe-Handles und Socket-Handles keine wartbaren Handles sind.)

Hinzugefügt in Version 3.3.

Beispiele

Der folgende Server-Code erstellt einen Listener, der 'secret password' als Authentifizierungsschlüssel verwendet. Er wartet dann auf eine Verbindung und sendet einige Daten an den Client.

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Der folgende Code stellt eine Verbindung zum Server her und empfängt einige Daten vom Server.

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Der folgende Code verwendet wait(), um gleichzeitig auf Nachrichten von mehreren Prozessen zu warten.

from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Adressformate

  • Eine 'AF_INET'-Adresse ist ein Tupel der Form (hostname, port), wobei hostname ein String und port eine Ganzzahl ist.

  • Eine 'AF_UNIX'-Adresse ist ein String, der einen Dateinamen im Dateisystem darstellt.

  • Eine Adresse vom Typ 'AF_PIPE' ist ein String der Form r'\\.\pipe\PipeName'. Um Client() zum Verbinden mit einer benannten Pipe auf einem entfernten Computer namens ServerName zu verwenden, sollte stattdessen eine Adresse der Form r'\\ServerName\pipe\PipeName' verwendet werden.

Beachten Sie, dass jeder String, der mit zwei Backslashes beginnt, standardmäßig als eine 'AF_PIPE'-Adresse und nicht als eine 'AF_UNIX'-Adresse angenommen wird.

Authentifizierungsschlüssel

Wenn man Connection.recv verwendet, werden die empfangenen Daten automatisch entpickelt. Das Entpickeln von Daten aus einer nicht vertrauenswürdigen Quelle ist leider ein Sicherheitsrisiko. Daher verwenden Listener und Client() das Modul hmac, um eine Digest-Authentifizierung bereitzustellen.

Ein Authentifizierungsschlüssel ist ein Byte-String, der als Passwort betrachtet werden kann: Sobald eine Verbindung hergestellt ist, verlangt jeder Endpunkt einen Nachweis, dass der andere den Authentifizierungsschlüssel kennt. (Der Nachweis, dass beide Endpunkte denselben Schlüssel verwenden, beinhaltet **nicht** das Senden des Schlüssels über die Verbindung.)

Wenn Authentifizierung angefordert wird, aber kein Authentifizierungsschlüssel angegeben ist, wird der Rückgabewert von current_process().authkey verwendet (siehe Process). Dieser Wert wird automatisch an jedes Process-Objekt vererbt, das der aktuelle Prozess erstellt. Das bedeutet, dass (standardmäßig) alle Prozesse eines Mehrprozessprogramms einen einzigen Authentifizierungsschlüssel teilen, der beim Einrichten von Verbindungen zwischen ihnen verwendet werden kann.

Geeignete Authentifizierungsschlüssel können auch mit os.urandom() generiert werden.

Protokollierung

Es gibt eine gewisse Unterstützung für die Protokollierung. Beachten Sie jedoch, dass das Paket logging keine prozessübergreifenden Sperren verwendet, so dass es möglich ist (abhängig vom Handler-Typ), dass Nachrichten von verschiedenen Prozessen durcheinander geraten.

multiprocessing.get_logger()

Gibt den von multiprocessing verwendeten Logger zurück. Falls erforderlich, wird ein neuer erstellt.

Wenn der Logger zum ersten Mal erstellt wird, hat er die Stufe logging.NOTSET und keinen Standard-Handler. Nachrichten, die an diesen Logger gesendet werden, werden standardmäßig nicht an den Root-Logger weitergeleitet.

Beachten Sie, dass unter Windows Kindprozesse nur die Stufe des Loggers des Elternprozesses erben – jede andere Anpassung des Loggers wird nicht geerbt.

multiprocessing.log_to_stderr(level=None)

Diese Funktion ruft get_logger() auf, fügt aber zusätzlich zum zurückgegebenen Logger einen Handler hinzu, der die Ausgabe über sys.stderr mit dem Format '[%(levelname)s/%(processName)s] %(message)s' sendet. Sie können levelname des Loggers durch Übergabe eines Arguments level ändern.

Unten sehen Sie eine Beispielsitzung mit aktiviertem Logging

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Eine vollständige Tabelle der Logging-Stufen finden Sie im Modul logging.

Das Modul multiprocessing.dummy

Das Modul multiprocessing.dummy repliziert die API von multiprocessing, ist aber nichts weiter als ein Wrapper um das Modul threading.

Insbesondere gibt die Funktion Pool von multiprocessing.dummy eine Instanz von ThreadPool zurück, die eine Unterklasse von Pool ist und alle gleichen Methodenaufrufe unterstützt, aber eine Gruppe von Worker-Threads anstelle von Worker-Prozessen verwendet.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Ein Thread-Pool-Objekt, das einen Pool von Worker-Threads steuert, an den Jobs übermittelt werden können. ThreadPool-Instanzen sind vollständig mit Pool-Instanzen kompatibel, und ihre Ressourcen müssen entweder durch Verwendung des Pools als Kontextmanager oder durch manuelles Aufrufen von close() und terminate() ordnungsgemäß verwaltet werden.

processes ist die Anzahl der zu verwendenden Worker-Threads. Wenn processes None ist, wird die von os.process_cpu_count() zurückgegebene Anzahl verwendet.

Wenn initializer nicht None ist, ruft jeder Worker-Prozess initializer(*initargs) auf, wenn er startet.

Im Gegensatz zu Pool können maxtasksperchild und context nicht angegeben werden.

Hinweis

Ein ThreadPool teilt sich die gleiche Schnittstelle wie Pool, die um einen Prozesspool herum aufgebaut ist und vor der Einführung des Moduls concurrent.futures existierte. Daher erbt es einige Operationen, die für einen Thread-basierten Pool keinen Sinn ergeben, und es hat seinen eigenen Typ zur Darstellung des Status von asynchronen Jobs, AsyncResult, der von keiner anderen Bibliothek verstanden wird.

Benutzer sollten generell concurrent.futures.ThreadPoolExecutor bevorzugen, das eine einfachere Schnittstelle hat, die von Anfang an für Threads konzipiert wurde, und das concurrent.futures.Future-Instanzen zurückgibt, die mit vielen anderen Bibliotheken kompatibel sind, einschließlich asyncio.

Programmierrichtlinien

Es gibt bestimmte Richtlinien und Idiome, die bei der Verwendung von multiprocessing beachtet werden sollten.

Alle Startmethoden

Das Folgende gilt für alle Startmethoden.

Vermeiden Sie geteilten Zustand

Soweit wie möglich sollte versucht werden, die Übertragung großer Datenmengen zwischen Prozessen zu vermeiden.

Es ist wahrscheinlich am besten, für die Kommunikation zwischen Prozessen Warteschlangen oder Pipes zu verwenden, anstatt auf synchronisationsprimitive der unteren Ebene zurückzugreifen.

Pickelbarkeit

Stellen Sie sicher, dass die Argumente der Methoden von Proxys pickelbar sind.

Thread-Sicherheit von Proxys

Verwenden Sie ein Proxy-Objekt nicht aus mehr als einem Thread, es sei denn, Sie schützen es mit einer Sperre.

(Es gibt niemals ein Problem, wenn verschiedene Prozesse denselben Proxy verwenden.)

Zombifizierung von Prozessen vermeiden

Auf POSIX-Systemen wird ein beendeter, aber noch nicht gekoppelter Prozess zu einem Zombie. Davon sollte es nie sehr viele geben, da jedes Mal, wenn ein neuer Prozess gestartet wird (oder active_children() aufgerufen wird), alle abgeschlossenen, noch nicht gekoppelten Prozesse gekoppelt werden. Auch das Aufrufen von Process.is_alive eines abgeschlossenen Prozesses koppelt den Prozess. Dennoch ist es wahrscheinlich gute Praxis, alle gestarteten Prozesse explizit zu koppeln.

Besser vererben als picklen/entpicklen

Bei Verwendung der Startmethoden spawn oder forkserver müssen viele Typen aus multiprocessing pickelbar sein, damit Kindprozesse sie verwenden können. Es sollte jedoch im Allgemeinen vermieden werden, geteilte Objekte über Pipes oder Warteschlangen an andere Prozesse zu senden. Stattdessen sollten Sie das Programm so gestalten, dass ein Prozess, der Zugriff auf eine anderswo erstellte geteilte Ressource benötigt, diese von einem Vorfahren erben kann.

Vermeiden Sie das Beenden von Prozessen

Die Verwendung der Methode Process.terminate zum Stoppen eines Prozesses kann dazu führen, dass gemeinsam genutzte Ressourcen (wie Sperren, Semaphoren, Pipes und Warteschlangen), die gerade von dem Prozess verwendet werden, beschädigt oder für andere Prozesse unzugänglich werden.

Daher ist es wahrscheinlich am besten, Process.terminate nur für Prozesse in Betracht zu ziehen, die niemals gemeinsam genutzte Ressourcen verwenden.

Prozesse mit Warteschlangen koppeln

Beachten Sie, dass ein Prozess, der Elemente in eine Warteschlange gelegt hat, wartet, bevor er beendet wird, bis alle gepufferten Elemente vom "Feeder"-Thread an die zugrunde liegende Pipe übergeben wurden. (Der Kindprozess kann die Methode Queue.cancel_join_thread der Warteschlange aufrufen, um dieses Verhalten zu vermeiden.)

Das bedeutet, dass Sie jedes Mal, wenn Sie eine Warteschlange verwenden, sicherstellen müssen, dass alle Elemente, die in die Warteschlange gestellt wurden, schließlich entfernt werden, bevor der Prozess gekoppelt wird. Andernfalls können Sie nicht sicher sein, dass Prozesse, die Elemente in die Warteschlange gelegt haben, beendet werden. Denken Sie auch daran, dass nicht-daimonische Prozesse automatisch gekoppelt werden.

Ein Beispiel, das zu einem Deadlock führt, ist folgendes

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Eine Korrektur hier wäre, die letzten beiden Zeilen zu tauschen (oder einfach die Zeile p.join() zu entfernen).

Ressourcen explizit an Kindprozesse übergeben

Unter POSIX kann ein Kindprozess bei Verwendung der Startmethode fork eine in einem Elternprozess erstellte gemeinsam genutzte Ressource über eine globale Ressource nutzen. Es ist jedoch besser, das Objekt als Argument an den Konstruktor für den Kindprozess zu übergeben.

Abgesehen davon, dass der Code (potenziell) mit Windows und den anderen Startmethoden kompatibel ist, stellt dies auch sicher, dass das Objekt nicht vom Elternprozess durch die Garbage Collection entfernt wird, solange der Kindprozess noch lebt. Dies kann wichtig sein, wenn eine Ressource freigegeben wird, wenn das Objekt im Elternprozess durch die Garbage Collection entfernt wird.

Zum Beispiel

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

sollte umgeschrieben werden als

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Vorsicht beim Ersetzen von sys.stdin durch ein „file-like object“

multiprocessing hat ursprünglich bedingungslos aufgerufen

os.close(sys.stdin.fileno())

in der Methode multiprocessing.Process._bootstrap() — dies führte zu Problemen mit Prozessen innerhalb von Prozessen. Dies wurde geändert zu

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

was das grundlegende Problem von kollidierenden Prozessen, die zu einem Fehler mit ungültigem Dateideskriptor führen, löst, aber eine potenzielle Gefahr für Anwendungen birgt, die sys.stdin() durch ein „file-like object“ mit Ausgabe-Pufferung ersetzen. Diese Gefahr besteht darin, dass, wenn mehrere Prozesse close() auf diesem file-like object aufrufen, dies dazu führen kann, dass dieselben Daten mehrmals in das Objekt geschrieben werden, was zu Korruption führt.

Wenn Sie ein file-like object schreiben und Ihren eigenen Cache implementieren, können Sie es fork-sicher machen, indem Sie die PID speichern, wann immer Sie an den Cache anhängen, und den Cache verwerfen, wenn sich die PID ändert. Zum Beispiel

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Weitere Informationen finden Sie unter bpo-5155, bpo-5313 und bpo-5331

Die Startmethoden spawn und forkserver

Es gibt einige zusätzliche Einschränkungen, die für die Startmethode fork nicht gelten.

Mehr Pickelbarkeit

Stellen Sie sicher, dass alle Argumente für Process pickelbar sind. Wenn Sie zudem Process.__init__ überschreiben, müssen Sie sicherstellen, dass Instanzen pickelbar sind, wenn die Methode Process.start aufgerufen wird.

Globale Variablen

Beachten Sie, dass, wenn Code, der in einem Kindprozess ausgeführt wird, versucht, auf eine globale Variable zuzugreifen, der von ihm gesehene Wert (falls vorhanden) möglicherweise nicht mit dem Wert im Elternprozess zum Zeitpunkt des Aufrufs von Process.start übereinstimmt.

Globale Variablen, die nur Modulkonstanten sind, verursachen jedoch keine Probleme.

Sicheres Importieren des Hauptmoduls

Stellen Sie sicher, dass das Hauptmodul von einem neuen Python-Interpreter sicher importiert werden kann, ohne unbeabsichtigte Nebenwirkungen zu verursachen (wie z. B. den Start eines neuen Prozesses).

Zum Beispiel würde die Ausführung des folgenden Moduls mit der Startmethode spawn oder forkserver mit einem RuntimeError fehlschlagen

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Stattdessen sollte der „Einstiegspunkt“ des Programms durch Verwendung von if __name__ == '__main__': wie folgt geschützt werden

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Die Zeile freeze_support() kann weggelassen werden, wenn das Programm normal und nicht als ausführbare Datei ausgeführt wird.)

Dies ermöglicht es dem neu erzeugten Python-Interpreter, das Modul sicher zu importieren und dann die Funktion foo() des Moduls auszuführen.

Ähnliche Einschränkungen gelten, wenn ein Pool oder Manager im Hauptmodul erstellt wird.

Beispiele

Demonstration der Erstellung und Verwendung von benutzerdefinierten Managern und Proxys

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Verwendung von Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Ein Beispiel, das zeigt, wie Warteschlangen verwendet werden, um Aufgaben an eine Sammlung von Worker-Prozessen zu übergeben und die Ergebnisse zu sammeln

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()