multiprocessing — Prozessbasierte Parallelität¶
Quellcode: Lib/multiprocessing/
Verfügbarkeit: nicht Android, nicht iOS, nicht WASI.
Dieses Modul wird auf mobilen Plattformen oder WebAssembly-Plattformen nicht unterstützt.
Einleitung¶
multiprocessing ist ein Paket, das das Erzeugen von Prozessen mit einer API unterstützt, die der des threading-Moduls ähnelt. Das Paket multiprocessing bietet sowohl lokale als auch entfernte Nebenläufigkeit und umgeht effektiv den Global Interpreter Lock, indem es Unterprozesse anstelle von Threads verwendet. Aus diesem Grund ermöglicht das Modul multiprocessing dem Programmierer, mehrere Prozessoren auf einem gegebenen Computer vollständig auszunutzen. Es läuft sowohl unter POSIX als auch unter Windows.
Das Modul multiprocessing führt auch APIs ein, die keine Entsprechungen im Modul threading haben. Ein Paradebeispiel dafür ist das Objekt Pool, das eine bequeme Möglichkeit bietet, die Ausführung einer Funktion über mehrere Eingabewerte hinweg zu parallelisieren, indem die Eingabedaten auf Prozesse verteilt werden (Datenparallelität). Das folgende Beispiel demonstriert die gängige Praxis, solche Funktionen in einem Modul zu definieren, damit Kindprozesse dieses Modul erfolgreich importieren können. Dieses grundlegende Beispiel für Datenparallelität mit Pool,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
wird auf die Standardausgabe ausgegeben.
[1, 4, 9]
Siehe auch
concurrent.futures.ProcessPoolExecutor bietet eine Schnittstelle auf höherer Ebene, um Aufgaben an einen Hintergrundprozess zu übergeben, ohne die Ausführung des aufrufenden Prozesses zu blockieren. Verglichen mit der direkten Verwendung der Pool-Schnittstelle ermöglicht die API von concurrent.futures die Einreichung von Arbeit an den zugrunde liegenden Prozesspool leichter von der Wartezeit auf die Ergebnisse zu trennen.
Die Klasse Process¶
In multiprocessing werden Prozesse erzeugt, indem ein Process-Objekt erstellt und dann dessen Methode start() aufgerufen wird. Process folgt der API von threading.Thread. Ein triviales Beispiel für ein Multiprocess-Programm ist
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Um die beteiligten einzelnen Prozess-IDs anzuzeigen, hier ein erweitertes Beispiel.
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Eine Erklärung, warum der Teil if __name__ == '__main__' notwendig ist, finden Sie unter Programmierrichtlinien.
Die Argumente für Process müssen im Kindprozess normalerweise nicht serialisierbar sein. Wenn Sie versuchen, das obige Beispiel direkt in einem REPL einzugeben, kann dies zu einem AttributeError im Kindprozess führen, der versucht, die Funktion *f* im Modul __main__ zu finden.
Kontexte und Startmethoden¶
Abhängig von der Plattform unterstützt multiprocessing drei Möglichkeiten, einen Prozess zu starten. Diese *Startmethoden* sind
- spawn
Der Elternprozess startet einen neuen Python-Interpreter-Prozess. Der Kindprozess erbt nur die Ressourcen, die für die Ausführung der Methode
run()des Prozessobjekts notwendig sind. Insbesondere werden unnötige Dateideskriptoren und Handles vom Elternprozess nicht geerbt. Das Starten eines Prozesses mit dieser Methode ist im Vergleich zur Verwendung von *fork* oder *forkserver* ziemlich langsam.Verfügbar auf POSIX- und Windows-Plattformen. Standard unter Windows und macOS.
- fork
Der Elternprozess verwendet
os.fork(), um den Python-Interpreter zu verzweigen. Der Kindprozess ist, wenn er beginnt, praktisch identisch mit dem Elternprozess. Alle Ressourcen des Elternteils werden vom Kindprozess geerbt. Beachten Sie, dass das sichere Verzweigen eines Multithread-Prozesses problematisch ist.Verfügbar auf POSIX-Systemen.
Geändert in Version 3.14: Dies ist auf keiner Plattform mehr die Standard-Startmethode. Code, der *fork* benötigt, muss dies explizit über
get_context()oderset_start_method()angeben.Geändert in Version 3.12: Wenn Python feststellen kann, dass Ihr Prozess mehrere Threads hat, löst die Funktion
os.fork(), die diese Startmethode intern aufruft, eineDeprecationWarningaus. Verwenden Sie eine andere Startmethode. Weitere Erläuterungen finden Sie in der Dokumentation zuos.fork().
- forkserver
Wenn das Programm startet und die *forkserver*-Startmethode auswählt, wird ein Serverprozess gestartet. Von da an, wenn ein neuer Prozess benötigt wird, verbindet sich der Elternprozess mit dem Server und fordert diesen auf, einen neuen Prozess zu verzweigen. Der Fork-Server-Prozess ist Single-Threaded, es sei denn, Systembibliotheken oder vorab geladene Importe erstellen als Nebeneffekt Threads, sodass er
os.fork()sicher verwenden kann. Es werden keine unnötigen Ressourcen geerbt.Verfügbar auf POSIX-Plattformen, die das Übergeben von Dateideskriptoren über Unix-Pipes unterstützen, wie z.B. Linux. Standard auf diesen.
Geändert in Version 3.14: Dies wurde zur Standard-Startmethode auf POSIX-Plattformen.
Geändert in Version 3.4: *spawn* wurde auf allen POSIX-Plattformen hinzugefügt, und *forkserver* wurde für einige POSIX-Plattformen hinzugefügt. Kindprozesse erben keine alle von den Eltern vererbten Handles unter Windows mehr.
Geändert in Version 3.8: Unter macOS ist die Startmethode *spawn* jetzt Standard. Die Startmethode *fork* sollte als unsicher angesehen werden, da sie zu Abstürzen des Unterprozesses führen kann, da macOS-Systembibliotheken Threads starten können. Siehe bpo-33725.
Geändert in Version 3.14: Auf POSIX-Plattformen wurde die Standard-Startmethode von *fork* auf *forkserver* geändert, um die Leistung beizubehalten, aber gängige Inkompatibilitäten von Multithread-Prozessen zu vermeiden. Siehe gh-84559.
Auf POSIX starten die Startmethoden *spawn* oder *forkserver* auch einen *Ressourcen-Tracker*-Prozess, der die nicht verknüpften benannten Systemressourcen (wie benannte Semaphoren oder SharedMemory-Objekte) verfolgt, die von Prozessen des Programms erstellt wurden. Wenn alle Prozesse beendet sind, verknüpft der Ressourcen-Tracker verbleibende verfolgte Objekte. Normalerweise sollten keine vorhanden sein, aber wenn ein Prozess durch ein Signal getötet wurde, können einige Ressourcen "verloren" gegangen sein. (Weder verlorene Semaphoren noch Shared-Memory-Segmente werden automatisch verknüpft, bis zum nächsten Neustart. Dies ist problematisch für beide Objekte, da das System nur eine begrenzte Anzahl benannter Semaphoren zulässt und Shared-Memory-Segmente etwas Platz im Hauptspeicher belegen.)
Um eine Startmethode auszuwählen, verwenden Sie set_start_method() in der Klausel if __name__ == '__main__' des Hauptmoduls. Zum Beispiel
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method() darf im Programm nicht mehr als einmal verwendet werden.
Alternativ können Sie get_context() verwenden, um ein Kontextobjekt zu erhalten. Kontextobjekte haben dieselbe API wie das Modul multiprocessing und ermöglichen die Verwendung mehrerer Startmethoden im selben Programm.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Beachten Sie, dass Objekte, die zu einem Kontext gehören, möglicherweise nicht mit Prozessen eines anderen Kontexts kompatibel sind. Insbesondere können Sperren, die mit dem *fork*-Kontext erstellt wurden, nicht an Prozesse übergeben werden, die mit den Startmethoden *spawn* oder *forkserver* gestartet wurden.
Bibliotheken, die multiprocessing oder ProcessPoolExecutor verwenden, sollten so gestaltet sein, dass sie ihren Benutzern die Bereitstellung eigener Multiprocessing-Kontexte ermöglichen. Die Verwendung eines bestimmten eigenen Kontexts innerhalb einer Bibliothek kann zu Inkompatibilitäten mit der Anwendung des Benutzers der Bibliothek führen. Dokumentieren Sie immer, wenn Ihre Bibliothek eine bestimmte Startmethode erfordert.
Warnung
Die Startmethoden 'spawn' und 'forkserver' können im Allgemeinen nicht mit "eingefrorenen" ausführbaren Dateien (d.h. Binärdateien, die von Paketen wie **PyInstaller** und **cx_Freeze** erstellt wurden) unter POSIX-Systemen verwendet werden. Die Startmethode 'fork' funktioniert möglicherweise, wenn der Code keine Threads verwendet.
Austausch von Objekten zwischen Prozessen¶
multiprocessing unterstützt zwei Arten von Kommunikationskanälen zwischen Prozessen
Queues
Die Klasse
Queueist eine nahezu exakte Kopie vonqueue.Queue. Zum Beispielfrom multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Queues sind Thread- und Prozesssicher. Jedes Objekt, das in eine
multiprocessing-Queue eingefügt wird, wird serialisiert.
Pipes
Die Funktion
Pipe()gibt ein Paar von Verbindungsobjekten zurück, die durch eine Pipe verbunden sind, die standardmäßig Duplex (bidirektional) ist. Zum Beispielfrom multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Die beiden von
Pipe()zurückgegebenen Verbindungsobjekte repräsentieren die beiden Enden der Pipe. Jedes Verbindungsobjekt verfügt über die Methodensend()undrecv()(unter anderem). Beachten Sie, dass Daten in einer Pipe beschädigt werden können, wenn zwei Prozesse (oder Threads) versuchen, gleichzeitig vom *selben* Ende der Pipe zu lesen oder in dieses zu schreiben. Natürlich besteht kein Risiko einer Beschädigung, wenn Prozesse gleichzeitig verschiedene Enden der Pipe verwenden.Die Methode
send()serialisiert das Objekt undrecv()erstellt das Objekt neu.
Synchronisation zwischen Prozessen¶
multiprocessing enthält Entsprechungen für alle Synchronisationsprimitive aus threading. Zum Beispiel kann eine Sperre verwendet werden, um sicherzustellen, dass nur ein Prozess gleichzeitig auf die Standardausgabe schreibt
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Ohne die Sperre ist die Ausgabe aus den verschiedenen Prozessen wahrscheinlich durcheinander.
Verwendung eines Pools von Arbeitern¶
Die Klasse Pool repräsentiert einen Pool von Worker-Prozessen. Sie verfügt über Methoden, die es ermöglichen, Aufgaben auf verschiedene Weise an die Worker-Prozesse auszulagern.
Zum Beispiel
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 seconds
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Beachten Sie, dass die Methoden eines Pools nur von dem Prozess verwendet werden sollten, der ihn erstellt hat.
Hinweis
Funktionalität innerhalb dieses Pakets erfordert, dass das Modul __main__ von den Kindern importiert werden kann. Dies wird unter Programmierrichtlinien behandelt, ist aber hier erwähnenswert. Das bedeutet, dass einige Beispiele, wie die Beispiele für multiprocessing.pool.Pool, im interaktiven Interpreter nicht funktionieren. Zum Beispiel
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(Wenn Sie dies versuchen, werden tatsächlich drei vollständige Tracebacks in einer halbzufälligen Reihenfolge ausgegeben, und dann müssen Sie möglicherweise den Elternprozess irgendwie stoppen.)
Referenz¶
Das Paket multiprocessing repliziert größtenteils die API des Moduls threading.
Process und Ausnahmen¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
Process-Objekte repräsentieren Aktivitäten, die in einem separaten Prozess ausgeführt werden. Die Klasse
Processhat Entsprechungen für alle Methoden vonthreading.Thread.Der Konstruktor sollte immer mit Schlüsselwortargumenten aufgerufen werden. *group* sollte immer
Nonesein; er existiert ausschließlich zur Kompatibilität mitthreading.Thread. *target* ist das aufrufbare Objekt, das von der Methoderun()aufgerufen wird. Es ist standardmäßigNone, was bedeutet, dass nichts aufgerufen wird. *name* ist der Prozessname (siehenamefür weitere Details). *args* ist das Argument-Tupel für den Zielaufruf. *kwargs* ist ein Dictionary von Schlüsselwortargumenten für den Zielaufruf. Wenn das schlüsselwort-only Argument *daemon* angegeben ist, wird dasdaemon-Flag des Prozesses aufTrueoderFalsegesetzt. Wenn esNone(Standard) ist, wird dieses Flag vom erzeugenden Prozess übernommen.Standardmäßig werden keine Argumente an *target* übergeben. Das Argument *args*, das standardmäßig
()ist, kann verwendet werden, um eine Liste oder ein Tupel der an *target* zu übergebenden Argumente anzugeben.Wenn eine Unterklasse den Konstruktor überschreibt, muss sie sicherstellen, dass sie den Konstruktor der Basisklasse (
super().__init__()) aufruft, bevor sie weitere Aktionen am Prozess vornimmt.Hinweis
Im Allgemeinen müssen alle Argumente an
Processserialisierbar sein. Dies wird häufig beobachtet, wenn versucht wird, einProcess-Objekt zu erstellen oder einenconcurrent.futures.ProcessPoolExecutoraus einem REPL mit einer lokal definierten *target*-Funktion zu verwenden.Die Übergabe eines aufrufbaren Objekts, das in der aktuellen REPL-Sitzung definiert ist, führt dazu, dass der Kindprozess mit einer unbehandelten
AttributeError-Ausnahme stirbt, da *target* innerhalb eines importierbaren Moduls definiert sein muss, um beim Deserialisieren geladen zu werden.Beispiel für diesen nicht abfangbaren Fehler aus dem Kindprozess
>>> import multiprocessing as mp >>> def knigit(): ... print("Ni!") ... >>> process = mp.Process(target=knigit) >>> process.start() >>> Traceback (most recent call last): File ".../multiprocessing/spawn.py", line ..., in spawn_main File ".../multiprocessing/spawn.py", line ..., in _main AttributeError: module '__main__' has no attribute 'knigit' >>> process <SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>
Siehe Die Startmethoden spawn und forkserver. Während diese Einschränkung nicht gilt, wenn die Startmethode
"fork"verwendet wird, ist dies ab Python3.14nicht mehr die Standardmethode auf jeder Plattform. Siehe Kontexte und Startmethoden. Siehe auch gh-132898.Geändert in Version 3.3: Das Argument *daemon* wurde hinzugefügt.
- run()¶
Methode, die die Aktivität des Prozesses repräsentiert.
Sie können diese Methode in einer Unterklasse überschreiben. Die Standardmethode
run()ruft das im Konstruktor des Objekts übergebene aufrufbare Objekt als *target*-Argument auf, falls vorhanden, mit sequenziellen und Schlüsselwortargumenten, die aus den Argumenten *args* bzw. *kwargs* entnommen werden.Die Verwendung einer Liste oder eines Tupels als Argument *args*, das an
Processübergeben wird, erzielt denselben Effekt.Beispiel
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- start()¶
Startet die Aktivität des Prozesses.
Dies darf pro Prozessobjekt höchstens einmal aufgerufen werden. Es wird dafür gesorgt, dass die Methode
run()des Objekts in einem separaten Prozess aufgerufen wird.
- join([timeout])¶
Wenn das optionale Argument timeout
None(Standard) ist, blockiert die Methode, bis der Prozess, dessenjoin()-Methode aufgerufen wird, beendet ist. Wenn timeout eine positive Zahl ist, blockiert sie höchstens timeout Sekunden. Beachten Sie, dass die MethodeNonezurückgibt, wenn ihr Prozess beendet wird oder die Methode abläuft. Überprüfen Sie denexitcodedes Prozesses, um festzustellen, ob er beendet wurde.Ein Prozess kann mehrmals gejoint werden.
Ein Prozess kann sich nicht selbst joinen, da dies zu einer Deadlock-Situation führen würde. Es ist ein Fehler, zu versuchen, einen Prozess zu joinen, bevor er gestartet wurde.
- name¶
Der Name des Prozesses. Der Name ist eine Zeichenkette, die nur zur Identifizierung dient. Er hat keine weitere Bedeutung. Mehreren Prozessen kann derselbe Name zugewiesen werden.
Der anfängliche Name wird vom Konstruktor gesetzt. Wenn kein expliziter Name an den Konstruktor übergeben wird, wird ein Name der Form „Process-N1:N2:...:Nk“ gebildet, wobei jedes Nk das N-te Kind seines Elternteils ist.
- is_alive()¶
Gibt zurück, ob der Prozess aktiv ist.
Grob gesagt ist ein Prozessobjekt ab dem Zeitpunkt, an dem die
start()-Methode zurückkehrt, bis zum Ende des Kindprozesses aktiv.
- daemon¶
Das Daemon-Flag des Prozesses, ein boolescher Wert. Dies muss gesetzt werden, bevor
start()aufgerufen wird.Der anfängliche Wert wird vom erzeugenden Prozess übernommen.
Wenn ein Prozess beendet wird, versucht er, alle seine Daemon-Kindprozesse zu beenden.
Beachten Sie, dass einem Daemon-Prozess keine Kindprozesse erlaubt sind. Andernfalls würde ein Daemon-Prozess seine Kinder verwaist zurücklassen, wenn er beim Beenden seines Elternprozesses beendet wird. Außerdem sind dies **nicht** Unix-Daemons oder -Dienste, sondern normale Prozesse, die beendet werden (und nicht gejoint werden), wenn nicht-daemmonische Prozesse beendet wurden.
Zusätzlich zur
threading.ThreadAPI unterstützenProcess-Objekte auch die folgenden Attribute und Methoden- pid¶
Gibt die Prozess-ID zurück. Bevor der Prozess gestartet wird, ist dies
None.
- exitcode¶
Der Exit-Code des Kindes. Dies ist
None, wenn der Prozess noch nicht beendet wurde.Wenn die
run()-Methode des Kindes normal zurückgekehrt ist, ist der Exit-Code 0. Wenn sie übersys.exit()mit einem ganzzahligen Argument N beendet wurde, ist der Exit-Code N.Wenn das Kind aufgrund einer Ausnahme beendet wurde, die nicht innerhalb von
run()abgefangen wurde, ist der Exit-Code 1. Wenn es durch das Signal N beendet wurde, ist der Exit-Code der negative Wert -N.
- authkey¶
Der Authentifizierungsschlüssel des Prozesses (eine Byte-Zeichenkette).
Wenn
multiprocessinginitialisiert wird, erhält der Hauptprozess eine zufällige Zeichenkette überos.urandom().Wenn ein
Process-Objekt erstellt wird, erbt es den Authentifizierungsschlüssel seines Elternprozesses, obwohl dieser durch Zuweisung vonauthkeyan eine andere Byte-Zeichenkette geändert werden kann.Siehe Authentifizierungsschlüssel.
- sentinel¶
Ein numerischer Handle eines Systemobjekts, das „bereit“ wird, wenn der Prozess endet.
Sie können diesen Wert verwenden, wenn Sie mit
multiprocessing.connection.wait()auf mehrere Ereignisse gleichzeitig warten möchten. Andernfalls ist der Aufruf vonjoin()einfacher.Unter Windows ist dies ein OS-Handle, das mit der API-Familie
WaitForSingleObjectundWaitForMultipleObjectsverwendet werden kann. Unter POSIX ist dies ein Dateideskriptor, der mit primitiven Funktionen aus demselect-Modul verwendet werden kann.Hinzugefügt in Version 3.3.
- interrupt()¶
Beendet den Prozess. Funktioniert unter POSIX über das Signal
SIGINT. Das Verhalten unter Windows ist undefiniert.Standardmäßig beendet dies den Kindprozess, indem es
KeyboardInterruptauslöst. Dieses Verhalten kann durch Setzen des entsprechenden Signalhandlers in der Kindprozess-signal.signal()-Funktion fürSIGINTgeändert werden.Hinweis: Wenn der Kindprozess
KeyboardInterruptabfängt und ignoriert, wird der Prozess nicht beendet.Hinweis: Das Standardverhalten setzt auch
exitcodeauf1, als ob eine nicht abgefangene Ausnahme im Kindprozess aufgetreten wäre. Um einen anderenexitcodezu erhalten, können Sie einfachKeyboardInterruptabfangen undexit(your_code)aufrufen.Hinzugefügt in Version 3.14.
- terminate()¶
Beendet den Prozess. Unter POSIX geschieht dies über das Signal
SIGTERM; unter Windows wirdTerminateProcess()verwendet. Beachten Sie, dass Exit-Handler und finally-Klauseln usw. nicht ausgeführt werden.Beachten Sie, dass die Nachfolgeprozesse des Prozesses **nicht** beendet werden – sie werden einfach verwaist.
Warnung
Wenn diese Methode verwendet wird, während der zugehörige Prozess eine Pipe oder Warteschlange verwendet, besteht die Gefahr, dass die Pipe oder Warteschlange beschädigt wird und von anderen Prozessen nicht mehr verwendet werden kann. Ebenso, wenn der Prozess eine Sperre oder ein Semaphor usw. erworben hat, kann seine Beendigung dazu führen, dass andere Prozesse in eine Deadlock-Situation geraten.
- kill()¶
Dasselbe wie
terminate(), aber unter POSIX mit dem SignalSIGKILL.Hinzugefügt in Version 3.7.
- close()¶
Schließt das
Process-Objekt und gibt alle damit verbundenen Ressourcen frei.ValueErrorwird ausgelöst, wenn der zugrundeliegende Prozess noch läuft. Sobaldclose()erfolgreich zurückkehrt, lösen die meisten anderen Methoden und Attribute desProcess-ObjektsValueErroraus.Hinzugefügt in Version 3.7.
Beachten Sie, dass die Methoden
start(),join(),is_alive(),terminate()und das Attributexitcodenur von dem Prozess aufgerufen werden sollten, der das Prozessobjekt erstellt hat.Beispiel für die Verwendung einiger Methoden von
Process>>> import multiprocessing, time, signal >>> mp_context = multiprocessing.get_context('spawn') >>> p = mp_context.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <...Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <...Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <...Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
Die Basisklasse aller
multiprocessing-Ausnahmen.
- exception multiprocessing.BufferTooShort¶
Ausnahme, die von
Connection.recv_bytes_into()ausgelöst wird, wenn das bereitgestellte Pufferobjekt zu klein für die gelesene Nachricht ist.Wenn
eeine Instanz vonBufferTooShortist, gibte.args[0]die Nachricht als Byte-Zeichenkette zurück.
- exception multiprocessing.AuthenticationError¶
Ausgelöst bei einem Authentifizierungsfehler.
- exception multiprocessing.TimeoutError¶
Ausgelöst von Methoden mit Timeout, wenn der Timeout abläuft.
Pipes und Queues¶
Bei der Verwendung mehrerer Prozesse werden normalerweise Nachrichtenübergabeverfahren zur Kommunikation zwischen Prozessen verwendet, um die Verwendung von Synchronisierungsprimitiven wie Sperren zu vermeiden.
Für die Übergabe von Nachrichten kann man Pipe() (für eine Verbindung zwischen zwei Prozessen) oder eine Queue (die mehrere Produzenten und Konsumenten zulässt) verwenden.
Die Typen Queue, SimpleQueue und JoinableQueue sind Multi-Producer, Multi-Consumer FIFO-Queues, die auf der Klasse queue.Queue in der Standardbibliothek basieren. Sie unterscheiden sich dadurch, dass Queue die Methoden task_done() und join() vermisst, die in Pythons queue.Queue-Klasse (ab Python 2.5) eingeführt wurden.
Wenn Sie JoinableQueue verwenden, **müssen** Sie für jede aus der Queue entfernte Aufgabe JoinableQueue.task_done() aufrufen, sonst kann das zur Zählung der unbearbeiteten Aufgaben verwendete Semaphor überlaufen und eine Ausnahme auslösen.
Ein Unterschied zu anderen Python-Queue-Implementierungen ist, dass multiprocessing-Queues alle Objekte, die in sie eingefügt werden, mit pickle serialisieren. Das von der get-Methode zurückgegebene Objekt ist ein neu erstelltes Objekt, das keinen Speicher mit dem Originalobjekt teilt.
Beachten Sie, dass man auch eine gemeinsam genutzte Queue erstellen kann, indem man ein Manager-Objekt verwendet – siehe Manager.
Hinweis
multiprocessing verwendet die üblichen Ausnahmen queue.Empty und queue.Full, um einen Timeout zu signalisieren. Diese sind nicht im multiprocessing-Namespace verfügbar, daher müssen Sie sie aus queue importieren.
Hinweis
Wenn ein Objekt in eine Queue gelegt wird, wird das Objekt gepickelt und ein Hintergrundthread leert später die gepickelten Daten in eine darunterliegende Pipe. Dies hat einige überraschende Konsequenzen, die aber keine praktischen Schwierigkeiten verursachen sollten – wenn sie Sie wirklich stören, können Sie stattdessen eine Queue verwenden, die mit einem Manager erstellt wurde.
Nachdem ein Objekt in eine leere Queue gelegt wurde, kann es zu einer winzigen Verzögerung kommen, bevor die
empty()-Methode der QueueFalsezurückgibt undget_nowait()ohne Auslösen vonqueue.Emptyzurückkehren kann.Wenn mehrere Prozesse Objekte in die Queue legen, ist es möglich, dass die Objekte am anderen Ende in einer anderen Reihenfolge empfangen werden. Objekte, die vom selben Prozess in die Queue gelegt wurden, sind jedoch immer in Bezug aufeinander in der erwarteten Reihenfolge.
Warnung
Wenn ein Prozess mit Process.terminate() oder os.kill() beendet wird, während er versucht, eine Queue zu verwenden, können die Daten in der Queue beschädigt werden. Dies kann dazu führen, dass jeder andere Prozess eine Ausnahme erhält, wenn er später versucht, die Queue zu verwenden.
Warnung
Wie oben erwähnt, wird ein Kindprozess, der Elemente in eine Queue gelegt hat (und JoinableQueue.cancel_join_thread nicht verwendet hat), erst beendet, wenn alle gepufferten Elemente in die Pipe geleert wurden.
Das bedeutet, dass Sie bei einem Deadlock landen können, wenn Sie versuchen, diesen Prozess zu joinen, es sei denn, Sie sind sicher, dass alle Elemente, die in die Queue gelegt wurden, auch verbraucht wurden. Ebenso, wenn der Kindprozess nicht-daemmonisch ist, kann der Elternprozess beim Beenden hängen bleiben, wenn er versucht, alle seine nicht-daemmonischen Kinder zu joinen.
Beachten Sie, dass eine mit einem Manager erstellte Queue dieses Problem nicht hat. Siehe Programmieranleitungen.
Ein Beispiel für die Verwendung von Queues für die Interprozesskommunikation finden Sie unter Beispiele.
- multiprocessing.Pipe([duplex])¶
Gibt ein Paar
(conn1, conn2)vonConnection-Objekten zurück, die die Enden einer Pipe darstellen.Wenn duplex
True(Standard) ist, ist die Pipe bidirektional. Wenn duplexFalseist, ist die Pipe unidirektional:conn1kann nur zum Empfangen von Nachrichten verwendet werden undconn2kann nur zum Senden von Nachrichten verwendet werden.Die Methode
send()serialisiert das Objekt mitpickleundrecv()erstellt das Objekt neu.
- class multiprocessing.Queue([maxsize])¶
Gibt eine prozessgeteilte Queue zurück, die über eine Pipe und einige Sperren/Semaphore implementiert ist. Wenn ein Prozess zum ersten Mal ein Element in die Queue legt, wird ein Feeder-Thread gestartet, der Objekte aus einem Puffer in die Pipe überträgt.
Die üblichen Ausnahmen
queue.Emptyundqueue.Fullaus dem Modulqueueder Standardbibliothek werden ausgelöst, um Timeouts zu signalisieren.Queueimplementiert alle Methoden vonqueue.Queueaußertask_done()undjoin().- qsize()¶
Gibt die ungefähre Größe der Queue zurück. Aufgrund von Multithreading-/Multiprocessing-Semantik ist diese Zahl nicht zuverlässig.
Beachten Sie, dass dies auf Plattformen wie macOS, wo
sem_getvalue()nicht implementiert ist, eineNotImplementedErrorauslösen kann.
- empty()¶
Gibt
Truezurück, wenn die Queue leer ist, andernfallsFalse. Aufgrund von Multithreading-/Multiprocessing-Semantik ist dies nicht zuverlässig.Kann bei geschlossenen Queues eine
OSErrorauslösen. (nicht garantiert)
- full()¶
Gibt
Truezurück, wenn die Queue voll ist, andernfallsFalse. Aufgrund von Multithreading-/Multiprocessing-Semantik ist dies nicht zuverlässig.
- put(obj[, block[, timeout]])¶
Legt obj in die Queue. Wenn das optionale Argument block
True(Standard) ist und timeoutNone(Standard) ist, wird bei Bedarf blockiert, bis ein freier Platz verfügbar ist. Wenn timeout eine positive Zahl ist, blockiert es höchstens timeout Sekunden und löst die Ausnahmequeue.Fullaus, wenn innerhalb dieser Zeit kein freier Platz verfügbar war. Andernfalls (block istFalse) wird ein Element in die Queue gelegt, wenn ein freier Platz sofort verfügbar ist, andernfalls wird die Ausnahmequeue.Fullausgelöst (timeout wird in diesem Fall ignoriert).Geändert in Version 3.8: Wenn die Queue geschlossen ist, wird anstelle von
AssertionErroreineValueErrorausgelöst.
- put_nowait(obj)¶
Entspricht
put(obj, False).
- get([block[, timeout]])¶
Entfernt und gibt ein Element aus der Queue zurück. Wenn das optionale Argument block
True(Standard) ist und timeoutNone(Standard) ist, wird bei Bedarf blockiert, bis ein Element verfügbar ist. Wenn timeout eine positive Zahl ist, blockiert es höchstens timeout Sekunden und löst die Ausnahmequeue.Emptyaus, wenn innerhalb dieser Zeit kein Element verfügbar war. Andernfalls (block istFalse) wird ein Element zurückgegeben, wenn es sofort verfügbar ist, andernfalls wird die Ausnahmequeue.Emptyausgelöst (timeout wird in diesem Fall ignoriert).Geändert in Version 3.8: Wenn die Queue geschlossen ist, wird anstelle von
OSErroreineValueErrorausgelöst.
- get_nowait()¶
Entspricht
get(False).
multiprocessing.Queuehat einige zusätzliche Methoden, die inqueue.Queuenicht vorhanden sind. Diese Methoden sind für die meisten Codes normalerweise nicht erforderlich- close()¶
Schließt die Queue: Gibt interne Ressourcen frei.
Eine Queue darf nach dem Schließen nicht mehr verwendet werden. Beispielsweise dürfen die Methoden
get(),put()undempty()nicht mehr aufgerufen werden.Der Hintergrundthread wird beendet, sobald alle gepufferten Daten in die Pipe geleert wurden. Dies wird automatisch aufgerufen, wenn die Queue garbage collected wird.
- join_thread()¶
Verbindet den Hintergrundthread. Dies kann nur nach dem Aufruf von
close()verwendet werden. Es blockiert, bis der Hintergrundthread beendet ist, und stellt sicher, dass alle Daten im Puffer in die Pipe geleert wurden.Standardmäßig versucht ein Prozess, der nicht der Ersteller der Queue ist, beim Beenden den Hintergrundthread der Queue zu joinen. Der Prozess kann
cancel_join_thread()aufrufen, umjoin_thread()nichts tun zu lassen.
- cancel_join_thread()¶
Verhindert, dass
join_thread()blockiert. Insbesondere verhindert dies, dass der Hintergrundthread beim Beenden des Prozesses automatisch gejoint wird – siehejoin_thread().Ein besserer Name für diese Methode wäre
allow_exit_without_flush(). Es ist wahrscheinlich, dass dadurch in die Warteschlange gestellte Daten verloren gehen, und Sie werden sie fast sicher nicht benötigen. Sie ist wirklich nur dafür da, falls Sie den aktuellen Prozess sofort beenden müssen, ohne darauf zu warten, dass in die Warteschlange gestellte Daten an das zugrunde liegende Pipe geleert werden, und Ihnen verlorene Daten egal sind.
Hinweis
Die Funktionalität dieser Klasse erfordert eine funktionierende Implementierung von Shared Semaphoren im Host-Betriebssystem. Ohne diese wird die Funktionalität dieser Klasse deaktiviert, und Versuche, eine
Queuezu instanziieren, führen zu einemImportError. Weitere Informationen finden Sie unter bpo-3770. Dasselbe gilt für alle unten aufgeführten spezialisierten Warteschlangentypen.
- class multiprocessing.SimpleQueue¶
Dies ist ein vereinfachter
Queue-Typ, der einer gesperrtenPipesehr nahe kommt.- close()¶
Schließt die Queue: Gibt interne Ressourcen frei.
Eine Warteschlange darf nach dem Schließen nicht mehr verwendet werden. Beispielsweise dürfen die Methoden
get(),put()undempty()nicht mehr aufgerufen werden.Hinzugefügt in Version 3.9.
- empty()¶
Gibt
Truezurück, wenn die Warteschlange leer ist, andernfallsFalse.Löst immer eine
OSErroraus, wenn die SimpleQueue geschlossen ist.
- get()¶
Entfernt und gibt ein Element aus der Warteschlange zurück.
- put(item)¶
Fügt das item in die Warteschlange ein.
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue, eine Unterklasse vonQueue, ist eine Warteschlange, die zusätzlich die Methodentask_done()undjoin()besitzt.- task_done()¶
Zeigt an, dass eine zuvor in die Warteschlange gestellte Aufgabe abgeschlossen ist. Wird von Warteschlangen-Konsumenten verwendet. Für jedes mit
get()abgerufene Element, teilt ein nachfolgender Aufruf vontask_done()der Warteschlange mit, dass die Verarbeitung der Aufgabe abgeschlossen ist.Wenn eine
join()aktuell blockiert, wird sie fortgesetzt, sobald alle Elemente verarbeitet wurden (d. h. eintask_done()-Aufruf für jedes Element empfangen wurde, das in die Warteschlangeput()wurde).Löst einen
ValueErroraus, wenn öfter aufgerufen als Elemente in die Warteschlange gelegt wurden.
- join()¶
Blockiert, bis alle Elemente in der Warteschlange abgerufen und verarbeitet wurden.
Die Anzahl der noch nicht abgeschlossenen Aufgaben erhöht sich jedes Mal, wenn ein Element zur Warteschlange hinzugefügt wird. Die Anzahl verringert sich jedes Mal, wenn ein Konsument
task_done()aufruft, um anzuzeigen, dass das Element abgerufen wurde und alle Arbeiten daran abgeschlossen sind. Wenn die Anzahl der noch nicht abgeschlossenen Aufgaben auf Null fällt, wirdjoin()fortgesetzt.
Sonstiges¶
- multiprocessing.active_children()¶
Gibt eine Liste aller aktiven Kindprozesse des aktuellen Prozesses zurück.
Der Aufruf hat den Nebeneffekt, dass bereits beendete Prozesse "verknüpft" werden.
- multiprocessing.cpu_count()¶
Gibt die Anzahl der CPUs im System zurück.
Diese Zahl ist nicht gleich der Anzahl der CPUs, die der aktuelle Prozess nutzen kann. Die Anzahl der nutzbaren CPUs kann mit
os.process_cpu_count()(oderlen(os.sched_getaffinity(0))) ermittelt werden.Wenn die Anzahl der CPUs nicht ermittelt werden kann, wird ein
NotImplementedErrorausgelöst.Siehe auch
Geändert in Version 3.13: Der Rückgabewert kann auch durch die Option
-X cpu_countoder die UmgebungsvariablePYTHON_CPU_COUNTüberschrieben werden, da dies lediglich ein Wrapper um dieosCPU-Count-APIs ist.
- multiprocessing.current_process()¶
Gibt das
Process-Objekt zurück, das dem aktuellen Prozess entspricht.Ein Analogon zu
threading.current_thread().
- multiprocessing.parent_process()¶
Gibt das
Process-Objekt des Elternprozesses voncurrent_process()zurück. Für den Hauptprozess istparent_processNone.Hinzugefügt in Version 3.8.
- multiprocessing.freeze_support()¶
Fügt Unterstützung hinzu, wenn ein Programm, das
multiprocessingverwendet, zu einer ausführbaren Datei gefroren wurde. (Getestet mit **py2exe**, **PyInstaller** und **cx_Freeze**.)Sie müssen diese Funktion direkt nach der Zeile
if __name__ == '__main__'des Hauptmoduls aufrufen. Zum Beispielfrom multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Wenn die Zeile
freeze_support()weggelassen wird, löst der Versuch, die gefrorene ausführbare Datei auszuführen, einenRuntimeErroraus.Der Aufruf von
freeze_support()hat keine Auswirkung, wenn die Startmethode nicht spawn ist. Darüber hinaus hatfreeze_support()keine Auswirkung, wenn das Modul normal vom Python-Interpreter ausgeführt wird (das Programm wurde nicht gefroren).
- multiprocessing.get_all_start_methods()¶
Gibt eine Liste der unterstützten Startmethoden zurück, wobei die erste die Standardmethode ist. Die möglichen Startmethoden sind
'fork','spawn'und'forkserver'. Nicht alle Plattformen unterstützen alle Methoden. Siehe Kontexte und Startmethoden.Hinzugefügt in Version 3.4.
- multiprocessing.get_context(method=None)¶
Gibt ein Kontextobjekt zurück, das die gleichen Attribute wie das Modul
multiprocessingbesitzt.Wenn method
Noneist, wird der Standardkontext zurückgegeben. Beachten Sie, dass, wenn die globale Startmethode nicht festgelegt wurde, diese auf die Standardmethode gesetzt wird. Andernfalls sollte method'fork','spawn'oder'forkserver'sein. EinValueErrorwird ausgelöst, wenn die angegebene Startmethode nicht verfügbar ist. Siehe Kontexte und Startmethoden.Hinzugefügt in Version 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
Gibt den Namen der Startmethode zurück, die zum Starten von Kindprozessen verwendet wird.
Wenn die globale Startmethode nicht festgelegt wurde und allow_none
Falseist, wird die Startmethode auf den Standardwert gesetzt und der Name zurückgegeben. Wenn die Startmethode nicht festgelegt wurde und allow_noneTrueist, wirdNonezurückgegeben.Der Rückgabewert kann
'fork','spawn','forkserver'oderNonesein. Siehe Kontexte und Startmethoden.Hinzugefügt in Version 3.4.
Geändert in Version 3.8: Unter macOS ist die Startmethode spawn jetzt die Standardmethode. Die Startmethode fork sollte als unsicher betrachtet werden, da sie zu Abstürzen des Subprozesses führen kann. Siehe bpo-33725.
- multiprocessing.set_executable(executable)¶
Legt den Pfad des Python-Interpreters fest, der beim Start eines Kindprozesses verwendet werden soll. (Standardmäßig wird
sys.executableverwendet). Einbettende Programme müssen wahrscheinlich etwas tun wieset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
bevor sie Kindprozesse erstellen können.
Geändert in Version 3.4: Unter POSIX wird nun die Startmethode
'spawn'unterstützt.Geändert in Version 3.11: Akzeptiert ein pfadähnliches Objekt.
- multiprocessing.set_forkserver_preload(module_names)¶
Legt eine Liste von Modulnamen fest, die der Forkserver-Hauptprozess zu importieren versucht, damit ihr bereits importierter Zustand von den geforkten Prozessen übernommen wird. Jegliche
ImportErrorwird dabei stillschweigend ignoriert. Dies kann als Leistungssteigerung verwendet werden, um wiederholte Arbeit in jedem Prozess zu vermeiden.Damit dies funktioniert, muss es aufgerufen werden, bevor der Forkserver-Prozess gestartet wurde (bevor ein
Poolerstellt oder einProcessgestartet wird).Nur sinnvoll, wenn die Startmethode
'forkserver'verwendet wird. Siehe Kontexte und Startmethoden.Hinzugefügt in Version 3.4.
- multiprocessing.set_start_method(method, force=False)¶
Legt die Methode fest, die zum Starten von Kindprozessen verwendet werden soll. Das Argument method kann
'fork','spawn'oder'forkserver'sein. Löst einenRuntimeErroraus, wenn die Startmethode bereits festgelegt wurde und force nichtTrueist. Wenn methodNoneist und forceTrueist, wird die Startmethode aufNonegesetzt. Wenn methodNoneist und forceFalseist, wird der Kontext auf den Standardkontext gesetzt.Beachten Sie, dass dies höchstens einmal aufgerufen werden sollte und innerhalb der Klausel
if __name__ == '__main__'des Hauptmoduls geschützt sein sollte.Siehe Kontexte und Startmethoden.
Hinzugefügt in Version 3.4.
Hinweis
multiprocessing enthält keine Analoga zu threading.active_count(), threading.enumerate(), threading.settrace(), threading.setprofile(), threading.Timer oder threading.local.
Verbindungsobjekte¶
Verbindungsobjekte ermöglichen das Senden und Empfangen von serialisierbaren Objekten oder Zeichenketten. Sie können als nachrichtenorientierte verbundene Sockets betrachtet werden.
Verbindungsobjekte werden normalerweise mit Pipe erstellt – siehe auch Listener und Clients.
- class multiprocessing.connection.Connection¶
- send(obj)¶
Sendet ein Objekt an das andere Ende der Verbindung, das mit
recv()gelesen werden sollte.Das Objekt muss serialisierbar sein. Sehr große serialisierte Objekte (ungefähr 32 MiB oder mehr, abhängig vom Betriebssystem) können eine
ValueError-Ausnahme auslösen.
- recv()¶
Gibt ein vom anderen Ende der Verbindung mit
send()gesendetes Objekt zurück. Blockiert, bis etwas zu empfangen ist. LöstEOFErroraus, wenn nichts mehr zu empfangen ist und das andere Ende geschlossen wurde.
- fileno()¶
Gibt den von der Verbindung verwendeten Dateideskriptor oder Handle zurück.
- close()¶
Schließt die Verbindung.
Dies wird automatisch aufgerufen, wenn das Verbindungsobjekt vom Garbage Collector bereinigt wird.
- poll([timeout])¶
Gibt zurück, ob Daten zum Lesen verfügbar sind.
Wenn timeout nicht angegeben ist, wird sofort zurückgegeben. Wenn timeout eine Zahl ist, gibt dies die maximale Zeit in Sekunden an, die blockiert werden soll. Wenn timeout
Noneist, wird eine unendliche Timeout-Zeit verwendet.Beachten Sie, dass mehrere Verbindungsobjekte gleichzeitig mit
multiprocessing.connection.wait()abgefragt werden können.
- send_bytes(buffer[, offset[, size]])¶
Sendet Byte-Daten aus einem byteähnlichen Objekt als vollständige Nachricht.
Wenn offset angegeben ist, werden die Daten ab dieser Position im buffer gelesen. Wenn size angegeben ist, werden so viele Bytes aus dem Puffer gelesen. Sehr große Puffer (ungefähr 32 MiB+, abhängig vom Betriebssystem) können eine
ValueError-Ausnahme auslösen.
- recv_bytes([maxlength])¶
Gibt eine vollständige Nachricht von Byte-Daten zurück, die vom anderen Ende der Verbindung gesendet wurde, als Zeichenkette. Blockiert, bis etwas zu empfangen ist. Löst
EOFErroraus, wenn nichts mehr zu empfangen ist und das andere Ende geschlossen wurde.Wenn maxlength angegeben ist und die Nachricht länger als maxlength ist, wird eine
OSErrorausgelöst und die Verbindung kann nicht mehr gelesen werden.
- recv_bytes_into(buffer[, offset])¶
Liest eine vollständige Nachricht von Byte-Daten, die vom anderen Ende der Verbindung gesendet wurde, in buffer und gibt die Anzahl der Bytes in der Nachricht zurück. Blockiert, bis etwas zu empfangen ist. Löst
EOFErroraus, wenn nichts mehr zu empfangen ist und das andere Ende geschlossen wurde.buffer muss ein beschreibbares byteähnliches Objekt sein. Wenn offset angegeben ist, wird die Nachricht ab dieser Position in den Puffer geschrieben. Offset muss eine nicht-negative ganze Zahl kleiner als die Länge von buffer (in Bytes) sein.
Wenn der Puffer zu kurz ist, wird eine
BufferTooShort-Ausnahme ausgelöst und die vollständige Nachricht ist alse.args[0]verfügbar, wobeiedie Ausnahmeinstanz ist.
Geändert in Version 3.3: Verbindungsobjekte können nun zwischen Prozessen übermittelt werden, indem
Connection.send()undConnection.recv()verwendet werden.Verbindungsobjekte unterstützen nun auch das Protokoll für Kontextmanager – siehe Kontextmanager-Typen.
__enter__()gibt das Verbindungsobjekt zurück, und__exit__()ruftclose()auf.
Zum Beispiel
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Warnung
Die Methode Connection.recv() deserialisiert die empfangenen Daten automatisch, was ein Sicherheitsrisiko darstellen kann, es sei denn, Sie können dem Prozess, der die Nachricht gesendet hat, vertrauen.
Daher sollten Sie, es sei denn, das Verbindungsobjekt wurde mit Pipe() erstellt, die Methoden recv() und send() nur nach Durchführung einer Art von Authentifizierung verwenden. Siehe Authentifizierungsschlüssel.
Warnung
Wenn ein Prozess beim Lesen oder Schreiben einer Pipe getötet wird, sind die Daten in der Pipe wahrscheinlich beschädigt, da es unmöglich werden kann, die Nachrichtenbegrenzungen sicher zu erkennen.
Synchronisationsprimitive¶
Im Allgemeinen sind Synchronisationsprimitive in einem Multiprocess-Programm weniger notwendig als in einem Multithread-Programm. Siehe die Dokumentation für das Modul threading.
Beachten Sie, dass Synchronisationsprimitive auch mit einem Manager-Objekt erstellt werden können – siehe Manager.
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
Ein Barrierenobjekt: eine Kopie von
threading.Barrier.Hinzugefügt in Version 3.3.
- class multiprocessing.BoundedSemaphore([value])¶
Ein begrenztes Semaphorobjekt: ein enges Analogon zu
threading.BoundedSemaphore.Ein einziger Unterschied zu seinem engen Analogon besteht darin, dass das erste Argument seiner `acquire`-Methode block genannt wird, wie es mit
Lock.acquire()konsistent ist.Hinweis
Unter macOS ist dies von
Semaphorenicht zu unterscheiden, dasem_getvalue()auf dieser Plattform nicht implementiert ist.
- class multiprocessing.Condition([lock])¶
Eine Bedingungsvariable: ein Alias für
threading.Condition.Wenn lock angegeben wird, sollte es sich um ein
Lock- oderRLock-Objekt ausmultiprocessinghandeln.Geändert in Version 3.3: Die Methode
wait_for()wurde hinzugefügt.
- class multiprocessing.Event¶
Eine Kopie von
threading.Event.
- class multiprocessing.Lock¶
Ein nicht-rekursives Schloss-Objekt: ein enges Analogon zu
threading.Lock. Sobald ein Prozess oder Thread ein Schloss erworben hat, blockieren weitere Versuche, es von einem beliebigen Prozess oder Thread aus zu erwerben, bis es freigegeben wird; jeder Prozess oder Thread kann es freigeben. Die Konzepte und Verhaltensweisen vonthreading.Lock, wie sie sich auf Threads beziehen, werden hier inmultiprocessing.Lock, wie sie sich auf Prozesse oder Threads beziehen, nachgebildet, mit Ausnahme der unten genannten Punkte.Beachten Sie, dass
Locktatsächlich eine Factory-Funktion ist, die eine Instanz vonmultiprocessing.synchronize.Lockmit einem Standardkontext zurückgibt.Lockunterstützt das Protokoll für Kontextmanager und kann daher inwith-Anweisungen verwendet werden.- acquire(block=True, timeout=None)¶
Erwirbt ein Schloss, blockierend oder nicht-blockierend.
Wenn das Argument block auf
True(Standard) gesetzt ist, blockiert der Methodenaufruf, bis das Schloss in einem entsperrten Zustand ist, setzt es dann auf gesperrt und gibtTruezurück. Beachten Sie, dass der Name dieses ersten Arguments vomjenigen inthreading.Lock.acquire()abweicht.Wenn das Argument block auf
Falsegesetzt ist, blockiert der Methodenaufruf nicht. Wenn das Schloss derzeit in einem gesperrten Zustand ist, gibtFalsezurück; andernfalls wird das Schloss in einen gesperrten Zustand versetzt undTruezurückgegeben.Bei Aufruf mit einem positiven, Gleitkommawert für timeout wird solange blockiert, wie das Schloss nicht erworben werden kann, maximal jedoch die durch timeout angegebene Anzahl von Sekunden. Aufrufe mit einem negativen Wert für timeout entsprechen einem timeout von Null. Aufrufe mit einem timeout-Wert von
None(Standard) setzen die Timeout-Periode auf unendlich. Beachten Sie, dass die Behandlung negativer oderNone-Werte für timeout vom implementierten Verhalten inthreading.Lock.acquire()abweicht. Das timeout-Argument hat keine praktischen Auswirkungen, wenn das Argument block aufFalsegesetzt ist und wird daher ignoriert. GibtTruezurück, wenn das Schloss erworben wurde, oderFalse, wenn die Timeout-Periode abgelaufen ist.
- release()¶
Gibt ein Schloss frei. Dies kann von jedem Prozess oder Thread aufgerufen werden, nicht nur von dem Prozess oder Thread, der das Schloss ursprünglich erworben hat.
Das Verhalten ist dasselbe wie in
threading.Lock.release(), außer dass beim Aufruf eines ungesperrten Schlosses einValueErrorausgelöst wird.
- locked()¶
Gibt einen booleschen Wert zurück, der angibt, ob dieses Objekt gerade gesperrt ist.
Hinzugefügt in Version 3.14.
- class multiprocessing.RLock¶
Ein rekursives Schloss-Objekt: ein enges Analogon zu
threading.RLock. Ein rekursives Schloss muss von dem Prozess oder Thread freigegeben werden, der es erworben hat. Sobald ein Prozess oder Thread ein rekursives Schloss erworben hat, kann derselbe Prozess oder Thread es erneut erwerben, ohne zu blockieren. Dieser Prozess oder Thread muss es einmal für jede Erwerbung freigeben.Beachten Sie, dass
RLocktatsächlich eine Factory-Funktion ist, die eine Instanz vonmultiprocessing.synchronize.RLockmit einem Standardkontext zurückgibt.RLockunterstützt das Protokoll für Kontextmanager und kann daher inwith-Anweisungen verwendet werden.- acquire(block=True, timeout=None)¶
Erwirbt ein Schloss, blockierend oder nicht-blockierend.
Wenn das Argument block auf
Truegesetzt ist, wird blockiert, bis das Schloss in einem entsperrten Zustand ist (nicht im Besitz eines Prozesses oder Threads), es sei denn, das Schloss ist bereits im Besitz des aktuellen Prozesses oder Threads. Der aktuelle Prozess oder Thread übernimmt dann den Besitz des Schlosses (falls er ihn nicht bereits hat) und der Rekursionsgrad innerhalb des Schlosses erhöht sich um eins, was zu einem Rückgabewert vonTrueführt. Beachten Sie, dass es mehrere Unterschiede im Verhalten dieses ersten Arguments im Vergleich zur Implementierung vonthreading.RLock.acquire()gibt, beginnend mit dem Namen des Arguments selbst.Wenn das Argument block auf
Falsegesetzt ist, wird nicht blockiert. Wenn das Schloss bereits von einem anderen Prozess oder Thread erworben wurde (und somit im Besitz ist), übernimmt der aktuelle Prozess oder Thread keinen Besitz und der Rekursionsgrad innerhalb des Schlosses wird nicht geändert, was zu einem Rückgabewert vonFalseführt. Wenn das Schloss in einem entsperrten Zustand ist, übernimmt der aktuelle Prozess oder Thread den Besitz und der Rekursionsgrad wird erhöht, was zu einem Rückgabewert vonTrueführt.Die Verwendung und das Verhalten des Arguments timeout sind dieselben wie in
Lock.acquire(). Beachten Sie, dass einige dieser Verhaltensweisen von timeout von den implementierten Verhaltensweisen inthreading.RLock.acquire()abweichen.
- release()¶
Gibt ein Schloss frei und dekrementiert den Rekursionsgrad. Wenn nach der Dekrementierung der Rekursionsgrad null ist, wird das Schloss auf "entsperrt" (nicht im Besitz eines Prozesses oder Threads) zurückgesetzt. Wenn andere Prozesse oder Threads darauf warten, dass das Schloss entsperrt wird, wird genau einer von ihnen zugelassen. Wenn der Rekursionsgrad nach der Dekrementierung immer noch ungleich null ist, bleibt das Schloss gesperrt und im Besitz des aufrufenden Prozesses oder Threads.
Rufen Sie diese Methode nur auf, wenn der aufrufende Prozess oder Thread das Schloss besitzt. Ein
AssertionErrorwird ausgelöst, wenn diese Methode von einem anderen Prozess oder Thread als dem Besitzer aufgerufen wird oder wenn das Schloss in einem entsperrten (nicht besessenen) Zustand ist. Beachten Sie, dass die Art der in dieser Situation ausgelösten Ausnahme vom implementierten Verhalten inthreading.RLock.release()abweicht.
- locked()¶
Gibt einen booleschen Wert zurück, der angibt, ob dieses Objekt gerade gesperrt ist.
Hinzugefügt in Version 3.14.
- class multiprocessing.Semaphore([value])¶
Ein Semaphor-Objekt: ein enges Analogon zu
threading.Semaphore.Ein einziger Unterschied zu seinem engen Analogon besteht darin, dass das erste Argument seiner `acquire`-Methode block genannt wird, wie es mit
Lock.acquire()konsistent ist.
Hinweis
Unter macOS wird sem_timedwait nicht unterstützt, daher wird der Aufruf von acquire() mit einem Timeout das Verhalten dieser Funktion durch eine Schleife mit Schlafzeiten emulieren.
Hinweis
Einige Funktionen dieses Pakets erfordern eine funktionierende Implementierung von gemeinsam genutzten Semaphoren im Host-Betriebssystem. Ohne eine solche wird das Modul multiprocessing.synchronize deaktiviert, und Versuche, es zu importieren, führen zu einem ImportError. Siehe bpo-3770 für zusätzliche Informationen.
Manager¶
Manager bieten eine Möglichkeit, Daten zu erstellen, die zwischen verschiedenen Prozessen gemeinsam genutzt werden können, einschließlich der gemeinsamen Nutzung über ein Netzwerk zwischen Prozessen, die auf verschiedenen Rechnern laufen. Ein Manager-Objekt steuert einen Serverprozess, der shared objects verwaltet. Andere Prozesse können auf die shared objects zugreifen, indem sie Proxies verwenden.
- multiprocessing.Manager()¶
Gibt ein gestartetes
SyncManager-Objekt zurück, das zum Teilen von Objekten zwischen Prozessen verwendet werden kann. Das zurückgegebene Manager-Objekt entspricht einem gestarteten Kindprozess und verfügt über Methoden, die freigegebene Objekte erstellen und entsprechende Proxys zurückgeben.
Manager-Prozesse werden heruntergefahren, sobald sie garbage collected werden oder ihr Elternprozess beendet wird. Die Manager-Klassen sind im Modul multiprocessing.managers definiert.
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
Erstellt ein BaseManager-Objekt.
Nach der Erstellung sollte
start()oderget_server().serve_forever()aufgerufen werden, um sicherzustellen, dass das Manager-Objekt auf einen gestarteten Manager-Prozess verweist.address ist die Adresse, auf der der Manager-Prozess auf neue Verbindungen wartet. Wenn address
Noneist, wird eine beliebige Adresse gewählt.authkey ist der Authentifizierungsschlüssel, der zur Überprüfung der Gültigkeit eingehender Verbindungen zum Serverprozess verwendet wird. Wenn authkey
Noneist, wirdcurrent_process().authkeyverwendet. Andernfalls wird authkey verwendet und muss eine Byte-Zeichenkette sein.serializer muss
'pickle'(verwendetpickle-Serialisierung) oder'xmlrpclib'(verwendetxmlrpc.client-Serialisierung) sein.ctx ist ein Kontextobjekt oder
None(verwendet den aktuellen Kontext). Siehe die Funktionget_context().shutdown_timeout ist ein Timeout in Sekunden, das verwendet wird, um zu warten, bis der vom Manager verwendete Prozess in der Methode
shutdown()beendet ist. Wenn das Herunterfahren zu lange dauert, wird der Prozess beendet. Wenn das Beenden des Prozesses ebenfalls zu lange dauert, wird der Prozess getötet.Geändert in Version 3.11: Der Parameter shutdown_timeout wurde hinzugefügt.
- start([initializer[, initargs]])¶
Startet einen Unterprozess, um den Manager zu starten. Wenn initializer nicht
Noneist, ruft der Unterprozessinitializer(*initargs)auf, wenn er startet.
- get_server()¶
Gibt ein
Server-Objekt zurück, das den tatsächlichen Server unter der Kontrolle des Managers darstellt. DasServer-Objekt unterstützt die Methodeserve_forever().>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Serverhat zusätzlich ein Attributaddress.
- connect()¶
Verbindet ein lokales Manager-Objekt mit einem entfernten Manager-Prozess.
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()¶
Stoppt den vom Manager verwendeten Prozess. Dies ist nur verfügbar, wenn
start()zum Starten des Serverprozesses verwendet wurde.Dies kann mehrmals aufgerufen werden.
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
Eine Klassenmethode, die zur Registrierung eines Typs oder einer aufrufbaren Funktion bei der Manager-Klasse verwendet werden kann.
typeid ist ein „Typ-Identifikator“, der zur Identifizierung eines bestimmten Typs von geteiltem Objekt verwendet wird. Dies muss eine Zeichenkette sein.
callable ist eine aufrufbare Funktion, die zum Erstellen von Objekten für diesen Typ-Identifikator verwendet wird. Wenn eine Manager-Instanz über die Methode
connect()mit dem Server verbunden wird oder wenn das Argument create_methodFalseist, kann diesNonebleiben.proxytype ist eine Unterklasse von
BaseProxy, die zum Erstellen von Proxys für geteilte Objekte mit diesem typeid verwendet wird. WennNoneist, wird automatisch eine Proxy-Klasse erstellt.exposed wird verwendet, um eine Sequenz von Methodennamen anzugeben, auf die Proxys für diesen typeid über
BaseProxy._callmethod()zugreifen dürfen. (Wenn exposedNoneist, wird stattdessenproxytype._exposed_verwendet, falls es existiert.) Wenn keine freigegebene Liste angegeben ist, sind alle „öffentlichen Methoden“ des geteilten Objekts zugänglich. (Hier bedeutet eine „öffentliche Methode“ jedes Attribut, das eine__call__()-Methode hat und dessen Name nicht mit'_'beginnt.)method_to_typeid ist eine Zuordnung, die verwendet wird, um den Rückgabetyp der freigegebenen Methoden anzugeben, die einen Proxy zurückgeben sollen. Sie ordnet Methodennamen Typ-ID-Zeichenketten zu. (Wenn method_to_typeid
Noneist, wird stattdessenproxytype._method_to_typeid_verwendet, falls es existiert.) Wenn der Name einer Methode kein Schlüssel dieser Zuordnung ist oder wenn die ZuordnungNoneist, wird das von der Methode zurückgegebene Objekt per Wert kopiert.create_method bestimmt, ob eine Methode mit dem Namen typeid erstellt werden soll, mit der der Serverprozess angewiesen werden kann, ein neues geteiltes Objekt zu erstellen und einen Proxy dafür zurückzugeben. Standardmäßig ist dies
True.
BaseManager-Instanzen verfügen außerdem über eine schreibgeschützte Eigenschaft- address¶
Die vom Manager verwendete Adresse.
Geändert in Version 3.3: Manager-Objekte unterstützen das Context-Management-Protokoll – siehe Context Manager Types.
__enter__()startet den Serverprozess (falls er noch nicht gestartet wurde) und gibt dann das Manager-Objekt zurück.__exit__()ruftshutdown()auf.In früheren Versionen startete
__enter__()den Serverprozess des Managers nicht, wenn dieser noch nicht gestartet war.
- class multiprocessing.managers.SyncManager¶
Eine Unterklasse von
BaseManager, die für die Synchronisation von Prozessen verwendet werden kann. Objekte dieses Typs werden vonmultiprocessing.Manager()zurückgegeben.Seine Methoden erstellen und geben Proxy-Objekte für eine Reihe häufig verwendeter Datentypen zurück, die über Prozesse hinweg synchronisiert werden sollen. Dazu gehören insbesondere geteilte Listen und Dictionaries.
- Barrier(parties[, action[, timeout]])¶
Erstellt ein geteiltes
threading.Barrier-Objekt und gibt einen Proxy dafür zurück.Hinzugefügt in Version 3.3.
- BoundedSemaphore([value])¶
Erstellt ein geteiltes
threading.BoundedSemaphore-Objekt und gibt einen Proxy dafür zurück.
- Condition([lock])¶
Erstellt ein geteiltes
threading.Condition-Objekt und gibt einen Proxy dafür zurück.Wenn lock übergeben wird, sollte es ein Proxy für ein
threading.Lock- oderthreading.RLock-Objekt sein.Geändert in Version 3.3: Die Methode
wait_for()wurde hinzugefügt.
- Event()¶
Erstellt ein geteiltes
threading.Event-Objekt und gibt einen Proxy dafür zurück.
- Lock()¶
Erstellt ein geteiltes
threading.Lock-Objekt und gibt einen Proxy dafür zurück.
- Queue([maxsize])¶
Erstellt eine geteilte
queue.Queueund gibt einen Proxy dafür zurück.
- RLock()¶
Erstellt ein geteiltes
threading.RLock-Objekt und gibt einen Proxy dafür zurück.
- Semaphore([value])¶
Erstellt ein geteiltes
threading.Semaphore-Objekt und gibt einen Proxy dafür zurück.
- Array(typecode, sequence)¶
Erstellt ein Array und gibt einen Proxy dafür zurück.
- Value(typecode, value)¶
Erstellt ein Objekt mit einem beschreibbaren Attribut
valueund gibt einen Proxy dafür zurück.
- dict()¶
- dict(mapping)
- dict(sequence)
Erstellt ein geteiltes
dict-Objekt und gibt einen Proxy dafür zurück.
- set()¶
- set(sequence)
- set(mapping)
Erstellt ein geteiltes
set-Objekt und gibt einen Proxy dafür zurück.Hinzugefügt in Version 3.14: Unterstützung für
setwurde hinzugefügt.
Geändert in Version 3.6: Geteilte Objekte können verschachtelt werden. Zum Beispiel kann ein geteiltes Container-Objekt wie eine geteilte Liste andere geteilte Objekte enthalten, die alle vom
SyncManagerverwaltet und synchronisiert werden.
- class multiprocessing.managers.Namespace¶
Ein Typ, der bei
SyncManagerregistriert werden kann.Ein Namespace-Objekt hat keine öffentlichen Methoden, aber beschreibbare Attribute. Seine Darstellung zeigt die Werte seiner Attribute.
Wenn jedoch ein Proxy für ein Namespace-Objekt verwendet wird, ist ein Attribut, das mit
'_'beginnt, ein Attribut des Proxys und nicht ein Attribut des Referenten.>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Angepasste Manager¶
Um einen eigenen Manager zu erstellen, erstellt man eine Unterklasse von BaseManager und verwendet die Klassenmethode register(), um neue Typen oder aufrufbare Funktionen beim Manager zu registrieren. Zum Beispiel
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Verwendung eines entfernten Managers¶
Es ist möglich, einen Manager-Server auf einer Maschine laufen zu lassen und Clients von anderen Maschinen darauf zugreifen zu lassen (vorausgesetzt, die beteiligten Firewalls lassen dies zu).
Das Ausführen der folgenden Befehle erstellt einen Server für eine einzelne geteilte Warteschlange, auf die entfernte Clients zugreifen können.
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Ein Client kann wie folgt auf den Server zugreifen:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Ein anderer Client kann ihn ebenfalls verwenden:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Lokale Prozesse können auch auf diese Warteschlange zugreifen, indem sie den obigen Code auf dem Client verwenden, um remote darauf zuzugreifen.
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Proxy-Objekte¶
Ein Proxy ist ein Objekt, das sich auf ein geteiltes Objekt bezieht, das (vermutlich) in einem anderen Prozess lebt. Das geteilte Objekt wird als Referent des Proxys bezeichnet. Mehrere Proxy-Objekte können denselben Referenten haben.
Ein Proxy-Objekt hat Methoden, die entsprechende Methoden seines Referenten aufrufen (obwohl nicht jede Methode des Referenten notwendigerweise über den Proxy verfügbar ist). Auf diese Weise kann ein Proxy genauso verwendet werden wie sein Referent.
>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Beachten Sie, dass die Anwendung von str() auf einen Proxy die Darstellung des Referenten zurückgibt, während die Anwendung von repr() die Darstellung des Proxys zurückgibt.
Ein wichtiges Merkmal von Proxy-Objekten ist, dass sie pickelbar sind, damit sie zwischen Prozessen übergeben werden können. Daher kann ein Referent Proxy-Objekte enthalten. Dies ermöglicht die Verschachtelung dieser verwalteten Listen, Dictionaries und anderer Proxy-Objekte.
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Ebenso können Dictionary- und Listen-Proxys ineinander verschachtelt werden.
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Wenn Standard- (Nicht-Proxy-) list- oder dict-Objekte in einem Referenten enthalten sind, werden Änderungen an diesen veränderlichen Werten nicht über den Manager weitergegeben, da der Proxy nicht weiß, wann die darin enthaltenen Werte geändert werden. Das Speichern eines Wertes in einem Container-Proxy (was ein __setitem__ auf dem Proxy-Objekt auslöst) wird jedoch über den Manager und damit an den Referenten weitergegeben. Um ein solches Element effektiv zu ändern, könnte man den geänderten Wert dem Container-Proxy neu zuweisen.
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Dieser Ansatz ist für die meisten Anwendungsfälle vielleicht weniger bequem als die Verwendung verschachtelter Proxy-Objekte, zeigt aber auch ein gewisses Maß an Kontrolle über die Synchronisation.
Hinweis
Die Proxy-Typen in multiprocessing tun nichts, um Vergleiche nach Wert zu unterstützen. So haben wir zum Beispiel:
>>> manager.list([1,2,3]) == [1,2,3]
False
Man sollte einfach eine Kopie des Referenten verwenden, wenn man Vergleiche durchführt.
- class multiprocessing.managers.BaseProxy¶
Proxy-Objekte sind Instanzen von Unterklassen von
BaseProxy.- _callmethod(methodname[, args[, kwds]])¶
Ruft eine Methode des Referenten des Proxys auf und gibt deren Ergebnis zurück.
Wenn
proxyein Proxy ist, dessen Referentobjist, dann wird der Ausdruckproxy._callmethod(methodname, args, kwds)
wertet den Ausdruck aus
getattr(obj, methodname)(*args, **kwds)
im Prozess des Managers.
Der zurückgegebene Wert ist eine Kopie des Ergebnisses des Aufrufs oder ein Proxy für ein neues geteiltes Objekt – siehe Dokumentation für das Argument method_to_typeid von
BaseManager.register().Wenn eine Ausnahme durch den Aufruf ausgelöst wird, wird sie von
_callmethod()erneut ausgelöst. Wenn eine andere Ausnahme im Prozess des Managers ausgelöst wird, wird diese in eineRemoteError-Ausnahme umgewandelt und von_callmethod()ausgelöst.Beachten Sie insbesondere, dass eine Ausnahme ausgelöst wird, wenn methodname nicht exposed wurde.
Ein Beispiel für die Verwendung von
_callmethod()>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
Gibt eine Kopie des Referenten zurück.
Wenn der Referent nicht pickelbar ist, löst dies eine Ausnahme aus.
- __repr__()¶
Gibt eine Darstellung des Proxy-Objekts zurück.
- __str__()¶
Gibt die Darstellung des Referenten zurück.
Bereinigung¶
Ein Proxy-Objekt verwendet einen Weakref-Callback, so dass es sich beim Garbage-Collecting selbst vom Manager abmeldet, dem sein Referent gehört.
Ein geteiltes Objekt wird aus dem Manager-Prozess gelöscht, wenn keine Proxys mehr darauf verweisen.
Prozess-Pools¶
Man kann einen Pool von Prozessen erstellen, die Aufgaben, die an ihn übergeben werden, mit der Klasse Pool ausführen.
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
Ein Prozess-Pool-Objekt, das einen Pool von Worker-Prozessen steuert, an die Jobs übergeben werden können. Es unterstützt asynchrone Ergebnisse mit Timeouts und Callbacks und verfügt über eine parallele Map-Implementierung.
processes ist die Anzahl der zu verwendenden Worker-Prozesse. Wenn processes
Noneist, wird die vonos.process_cpu_count()zurückgegebene Anzahl verwendet.Wenn initializer nicht
Noneist, ruft jeder Worker-Prozessinitializer(*initargs)auf, wenn er startet.maxtasksperchild ist die Anzahl der Aufgaben, die ein Worker-Prozess abschließen kann, bevor er beendet und durch einen neuen Worker-Prozess ersetzt wird, um ungenutzte Ressourcen freizugeben. Der Standardwert für maxtasksperchild ist
None, was bedeutet, dass Worker-Prozesse so lange leben wie der Pool.context kann verwendet werden, um den Kontext anzugeben, der zum Starten der Worker-Prozesse verwendet wird. Normalerweise wird ein Pool über die Funktion
multiprocessing.Pool()oder die MethodePool()eines Kontextobjekts erstellt. In beiden Fällen wird context entsprechend gesetzt.Beachten Sie, dass die Methoden des Pool-Objekts nur von dem Prozess aufgerufen werden sollten, der den Pool erstellt hat.
Warnung
multiprocessing.poolObjekte haben interne Ressourcen, die ordnungsgemäß verwaltet werden müssen (wie jede andere Ressource), indem der Pool als Kontextmanager verwendet wird oder indemclose()undterminate()manuell aufgerufen werden. Wenn dies nicht geschieht, kann der Prozess beim Beenden hängen bleiben.Beachten Sie, dass es nicht korrekt ist, sich auf den Garbage Collector zu verlassen, um den Pool zu zerstören, da CPython nicht garantiert, dass der Finalizer des Pools aufgerufen wird (siehe
object.__del__()für weitere Informationen).Geändert in Version 3.2: Der Parameter maxtasksperchild wurde hinzugefügt.
Geändert in Version 3.4: Der Parameter context wurde hinzugefügt.
Geändert in Version 3.13: processes verwendet standardmäßig
os.process_cpu_count()anstelle vonos.cpu_count().Hinweis
Worker-Prozesse innerhalb eines
Poolleben typischerweise für die gesamte Dauer der Arbeitswarteschlange des Pools. Ein häufiges Muster in anderen Systemen (wie Apache, mod_wsgi, etc.), um von Arbeitern gehaltene Ressourcen freizugeben, besteht darin, einem Arbeiter innerhalb eines Pools nur eine bestimmte Menge an Arbeit abzuschließen, bevor er beendet, bereinigt und ein neuer Prozess zur Ersetzung des alten gestartet wird. Das Argument maxtasksperchild zumPoolmacht diese Fähigkeit dem Endbenutzer zugänglich.- apply(func[, args[, kwds]])¶
Ruft func mit den Argumenten args und Schlüsselwortargumenten kwds auf. Es blockiert, bis das Ergebnis bereit ist. Da dies blockiert, ist
apply_async()besser geeignet, um parallele Arbeiten auszuführen. Zusätzlich wird func nur in einem der Worker des Pools ausgeführt.
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
Eine Variante der Methode
apply(), die einAsyncResult-Objekt zurückgibt.Wenn callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn das Ergebnis verfügbar ist, wird callback darauf angewendet, es sei denn, der Aufruf ist fehlgeschlagen, in welchem Fall error_callback stattdessen angewendet wird.
Wenn error_callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn die Ziel-Funktion fehlschlägt, wird error_callback mit der Ausnahmeinstanz aufgerufen.
Callbacks sollten sofort abgeschlossen werden, da sonst der Thread, der die Ergebnisse verarbeitet, blockiert wird.
- map(func, iterable[, chunksize])¶
Ein paralleles Äquivalent der eingebauten Funktion
map()(es unterstützt nur ein iterable Argument, für mehrere Iterables siehestarmap()). Es blockiert, bis das Ergebnis bereit ist.Diese Methode teilt das Iterable in eine Anzahl von Blöcken auf, die als separate Aufgaben an den Prozesspool übermittelt werden. Die (ungefähre) Größe dieser Blöcke kann durch Setzen von chunksize auf eine positive Ganzzahl festgelegt werden.
Beachten Sie, dass dies bei sehr langen Iterables zu hohem Speicherverbrauch führen kann. Erwägen Sie die Verwendung von
imap()oderimap_unordered()mit expliziter chunksize Option für bessere Effizienz.
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Eine Variante der Methode
map(), die einAsyncResult-Objekt zurückgibt.Wenn callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn das Ergebnis verfügbar ist, wird callback darauf angewendet, es sei denn, der Aufruf ist fehlgeschlagen, in welchem Fall error_callback stattdessen angewendet wird.
Wenn error_callback angegeben ist, sollte es eine aufrufbare Funktion sein, die ein einzelnes Argument akzeptiert. Wenn die Ziel-Funktion fehlschlägt, wird error_callback mit der Ausnahmeinstanz aufgerufen.
Callbacks sollten sofort abgeschlossen werden, da sonst der Thread, der die Ergebnisse verarbeitet, blockiert wird.
- imap(func, iterable[, chunksize])¶
Eine träge Version von
map().Das Argument chunksize ist dasselbe wie das, das von der Methode
map()verwendet wird. Bei sehr langen Iterables kann die Verwendung eines großen Wertes für chunksize dazu führen, dass der Job viel schneller abgeschlossen wird als bei Verwendung des Standardwerts von1.Auch wenn chunksize
1ist, hat die Methodenext()des von der Methodeimap()zurückgegebenen Iterators einen optionalen timeout-Parameter:next(timeout)löstmultiprocessing.TimeoutErroraus, wenn das Ergebnis nicht innerhalb von timeout Sekunden zurückgegeben werden kann.
- imap_unordered(func, iterable[, chunksize])¶
Das Gleiche wie
imap(), nur dass die Reihenfolge der Ergebnisse des zurückgegebenen Iterators als willkürlich betrachtet werden sollte. (Nur wenn es nur einen Worker-Prozess gibt, ist die Reihenfolge garantiert "korrekt".)
- starmap(func, iterable[, chunksize])¶
Wie
map(), nur dass die Elemente des iterable als Iterables erwartet werden, die als Argumente entpackt werden.Daher ergibt ein iterable von
[(1,2), (3, 4)][func(1,2), func(3,4)].Hinzugefügt in Version 3.3.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Eine Kombination aus
starmap()undmap_async(), die über ein iterable von Iterables iteriert und func mit den entpackten Iterables aufruft. Gibt ein Ergebnisobjekt zurück.Hinzugefügt in Version 3.3.
- close()¶
Verhindert, dass weitere Aufgaben an den Pool übermittelt werden. Sobald alle Aufgaben abgeschlossen sind, werden die Worker-Prozesse beendet.
- terminate()¶
Stoppt die Worker-Prozesse sofort, ohne ausstehende Arbeit abzuschließen. Wenn das Pool-Objekt vom Garbage Collector eingesammelt wird, wird
terminate()sofort aufgerufen.
- join()¶
Wartet darauf, dass die Worker-Prozesse beendet werden.
close()oderterminate()muss aufgerufen werden, bevorjoin()verwendet werden kann.
Geändert in Version 3.3: Pool-Objekte unterstützen nun das Kontextmanager-Protokoll – siehe Context Manager Types.
__enter__()gibt das Pool-Objekt zurück, und__exit__()ruftterminate()auf.
- class multiprocessing.pool.AsyncResult¶
Die Klasse des Ergebnisses, das von
Pool.apply_async()undPool.map_async()zurückgegeben wird.- get([timeout])¶
Gibt das Ergebnis zurück, wenn es eintrifft. Wenn timeout nicht
Noneist und das Ergebnis nicht innerhalb von timeout Sekunden eintrifft, wirdmultiprocessing.TimeoutErrorausgelöst. Wenn der Remote-Aufruf eine Ausnahme ausgelöst hat, wird diese Ausnahme vonget()erneut ausgelöst.
- wait([timeout])¶
Wartet, bis das Ergebnis verfügbar ist oder bis timeout Sekunden vergangen sind.
- ready()¶
Gibt zurück, ob der Aufruf abgeschlossen wurde.
- successful()¶
Gibt zurück, ob der Aufruf ohne Auslösung einer Ausnahme abgeschlossen wurde. Löst
ValueErroraus, wenn das Ergebnis noch nicht bereit ist.Geändert in Version 3.7: Wenn das Ergebnis noch nicht bereit ist, wird
ValueErroranstelle vonAssertionErrorausgelöst.
Das folgende Beispiel demonstriert die Verwendung eines Pools
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Listener und Clients¶
Normalerweise wird die Nachrichtenübertragung zwischen Prozessen über Queues oder durch die Verwendung von Connection-Objekten, die von Pipe() zurückgegeben werden, durchgeführt.
Das Modul multiprocessing.connection bietet jedoch zusätzliche Flexibilität. Es bietet im Grunde eine High-Level-Nachrichten-API für den Umgang mit Sockets oder Windows-Named Pipes. Es unterstützt auch Digest-Authentifizierung mithilfe des Moduls hmac und das gleichzeitige Polling mehrerer Verbindungen.
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
Sendet eine zufällig generierte Nachricht an das andere Ende der Verbindung und wartet auf eine Antwort.
Wenn die Antwort dem Digest der Nachricht mit authkey als Schlüssel entspricht, wird eine Willkommensnachricht an das andere Ende der Verbindung gesendet. Andernfalls wird
AuthenticationErrorausgelöst.
- multiprocessing.connection.answer_challenge(connection, authkey)¶
Empfängt eine Nachricht, berechnet den Digest der Nachricht mit authkey als Schlüssel und sendet dann den Digest zurück.
Wenn keine Willkommensnachricht empfangen wird, wird
AuthenticationErrorausgelöst.
- multiprocessing.connection.Client(address[, family[, authkey]])¶
Versucht, eine Verbindung zum Listener herzustellen, der die Adresse address verwendet, und gibt eine
Connectionzurück.Der Typ der Verbindung wird durch das Argument family bestimmt, aber dies kann generell weggelassen werden, da es normalerweise aus dem Format von address abgeleitet werden kann. (Siehe Adressformate)
Wenn authkey angegeben und nicht
Noneist, sollte es ein Byte-String sein und als geheimer Schlüssel für eine HMAC-basierte Authentifizierungsaufforderung verwendet werden. Wenn authkeyNoneist, findet keine Authentifizierung statt.AuthenticationErrorwird ausgelöst, wenn die Authentifizierung fehlschlägt. Siehe Authentifizierungsschlüssel.
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
Eine Wrapper-Klasse für einen gebundenen Socket oder eine Windows-Named-Pipe, die auf Verbindungen "lauscht".
address ist die Adresse, die vom gebundenen Socket oder der Named-Pipe des Listener-Objekts verwendet wird.
Hinweis
Wenn die Adresse '0.0.0.0' verwendet wird, ist die Adresse unter Windows kein verbindungsfähiger Endpunkt. Wenn Sie einen verbindungsfähigen Endpunkt benötigen, sollten Sie '127.0.0.1' verwenden.
family ist der Typ des Sockets (oder der Named-Pipe). Dies kann einer der Strings
'AF_INET'(für einen TCP-Socket),'AF_UNIX'(für einen Unix-Domain-Socket) oder'AF_PIPE'(für eine Windows-Named-Pipe) sein. Von diesen ist nur der erste garantiert verfügbar. Wenn familyNoneist, wird die Familie aus dem Format von address abgeleitet. Wenn address ebenfallsNoneist, wird ein Standardwert gewählt. Dieser Standardwert ist die Familie, die als die schnellste verfügbare angenommen wird. Siehe Adressformate. Beachten Sie, dass, wenn family'AF_UNIX'ist und addressNoneist, der Socket in einem privaten temporären Verzeichnis erstellt wird, das mittempfile.mkstemp()erstellt wurde.Wenn das Listener-Objekt einen Socket verwendet, wird backlog (standardmäßig 1) an die Methode
listen()des Sockets übergeben, sobald dieser gebunden wurde.Wenn authkey angegeben und nicht
Noneist, sollte es ein Byte-String sein und als geheimer Schlüssel für eine HMAC-basierte Authentifizierungsaufforderung verwendet werden. Wenn authkeyNoneist, findet keine Authentifizierung statt.AuthenticationErrorwird ausgelöst, wenn die Authentifizierung fehlschlägt. Siehe Authentifizierungsschlüssel.- accept()¶
Akzeptiert eine Verbindung auf dem gebundenen Socket oder der Named-Pipe des Listener-Objekts und gibt ein
Connection-Objekt zurück. Wenn die Authentifizierung versucht und fehlschlägt, wirdAuthenticationErrorausgelöst.
- close()¶
Schließt den gebundenen Socket oder die Named-Pipe des Listener-Objekts. Dies wird automatisch aufgerufen, wenn der Listener vom Garbage Collector eingesammelt wird. Es ist jedoch ratsam, ihn explizit aufzurufen.
Listener-Objekte haben die folgenden schreibgeschützten Eigenschaften
- address¶
Die Adresse, die vom Listener-Objekt verwendet wird.
- last_accepted¶
Die Adresse, von der die zuletzt akzeptierte Verbindung kam. Wenn dies nicht verfügbar ist, ist es
None.
Geändert in Version 3.3: Listener-Objekte unterstützen nun das Kontextmanager-Protokoll – siehe Context Manager Types.
__enter__()gibt das Listener-Objekt zurück, und__exit__()ruftclose()auf.
- multiprocessing.connection.wait(object_list, timeout=None)¶
Wartet, bis ein Objekt in object_list bereit ist. Gibt die Liste der Objekte in object_list zurück, die bereit sind. Wenn timeout ein Float ist, blockiert der Aufruf für höchstens so viele Sekunden. Wenn timeout
Noneist, blockiert er unbegrenzt. Ein negativer Timeout ist gleichbedeutend mit einem Null-Timeout.Sowohl unter POSIX als auch unter Windows kann ein Objekt in object_list erscheinen, wenn es
ein lesbares
Connection-Objekt ist;ein verbundenes und lesbares
socket.socket-Objekt ist; oder
Eine Verbindung oder ein Socket-Objekt ist bereit, wenn Daten zum Lesen verfügbar sind oder das andere Ende geschlossen wurde.
POSIX:
wait(object_list, timeout)ist fast äquivalent zuselect.select(object_list, [], [], timeout). Der Unterschied besteht darin, dass, wennselect.select()durch ein Signal unterbrochen wird, esOSErrormit der FehlernummerEINTRauslösen kann, währendwait()dies nicht tut.Windows: Ein Element in object_list muss entweder ein integer Handle sein, das wartbar ist (gemäß der Definition in der Dokumentation der Win32-Funktion
WaitForMultipleObjects()), oder es kann ein Objekt mit einer Methodefileno()sein, die ein Socket-Handle oder ein Pipe-Handle zurückgibt. (Beachten Sie, dass Pipe-Handles und Socket-Handles keine wartbaren Handles sind.)Hinzugefügt in Version 3.3.
Beispiele
Der folgende Server-Code erstellt einen Listener, der 'secret password' als Authentifizierungsschlüssel verwendet. Er wartet dann auf eine Verbindung und sendet einige Daten an den Client.
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Der folgende Code stellt eine Verbindung zum Server her und empfängt einige Daten vom Server.
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Der folgende Code verwendet wait(), um gleichzeitig auf Nachrichten von mehreren Prozessen zu warten.
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Adressformate¶
Eine
'AF_INET'-Adresse ist ein Tupel der Form(hostname, port), wobei hostname ein String und port eine Ganzzahl ist.Eine
'AF_UNIX'-Adresse ist ein String, der einen Dateinamen im Dateisystem darstellt.Eine Adresse vom Typ
'AF_PIPE'ist ein String der Formr'\\.\pipe\PipeName'. UmClient()zum Verbinden mit einer benannten Pipe auf einem entfernten Computer namens ServerName zu verwenden, sollte stattdessen eine Adresse der Formr'\\ServerName\pipe\PipeName'verwendet werden.
Beachten Sie, dass jeder String, der mit zwei Backslashes beginnt, standardmäßig als eine 'AF_PIPE'-Adresse und nicht als eine 'AF_UNIX'-Adresse angenommen wird.
Authentifizierungsschlüssel¶
Wenn man Connection.recv verwendet, werden die empfangenen Daten automatisch entpickelt. Das Entpickeln von Daten aus einer nicht vertrauenswürdigen Quelle ist leider ein Sicherheitsrisiko. Daher verwenden Listener und Client() das Modul hmac, um eine Digest-Authentifizierung bereitzustellen.
Ein Authentifizierungsschlüssel ist ein Byte-String, der als Passwort betrachtet werden kann: Sobald eine Verbindung hergestellt ist, verlangt jeder Endpunkt einen Nachweis, dass der andere den Authentifizierungsschlüssel kennt. (Der Nachweis, dass beide Endpunkte denselben Schlüssel verwenden, beinhaltet **nicht** das Senden des Schlüssels über die Verbindung.)
Wenn Authentifizierung angefordert wird, aber kein Authentifizierungsschlüssel angegeben ist, wird der Rückgabewert von current_process().authkey verwendet (siehe Process). Dieser Wert wird automatisch an jedes Process-Objekt vererbt, das der aktuelle Prozess erstellt. Das bedeutet, dass (standardmäßig) alle Prozesse eines Mehrprozessprogramms einen einzigen Authentifizierungsschlüssel teilen, der beim Einrichten von Verbindungen zwischen ihnen verwendet werden kann.
Geeignete Authentifizierungsschlüssel können auch mit os.urandom() generiert werden.
Protokollierung¶
Es gibt eine gewisse Unterstützung für die Protokollierung. Beachten Sie jedoch, dass das Paket logging keine prozessübergreifenden Sperren verwendet, so dass es möglich ist (abhängig vom Handler-Typ), dass Nachrichten von verschiedenen Prozessen durcheinander geraten.
- multiprocessing.get_logger()¶
Gibt den von
multiprocessingverwendeten Logger zurück. Falls erforderlich, wird ein neuer erstellt.Wenn der Logger zum ersten Mal erstellt wird, hat er die Stufe
logging.NOTSETund keinen Standard-Handler. Nachrichten, die an diesen Logger gesendet werden, werden standardmäßig nicht an den Root-Logger weitergeleitet.Beachten Sie, dass unter Windows Kindprozesse nur die Stufe des Loggers des Elternprozesses erben – jede andere Anpassung des Loggers wird nicht geerbt.
- multiprocessing.log_to_stderr(level=None)¶
Diese Funktion ruft
get_logger()auf, fügt aber zusätzlich zum zurückgegebenen Logger einen Handler hinzu, der die Ausgabe übersys.stderrmit dem Format'[%(levelname)s/%(processName)s] %(message)s'sendet. Sie könnenlevelnamedes Loggers durch Übergabe eines Argumentsleveländern.
Unten sehen Sie eine Beispielsitzung mit aktiviertem Logging
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Eine vollständige Tabelle der Logging-Stufen finden Sie im Modul logging.
Das Modul multiprocessing.dummy¶
Das Modul multiprocessing.dummy repliziert die API von multiprocessing, ist aber nichts weiter als ein Wrapper um das Modul threading.
Insbesondere gibt die Funktion Pool von multiprocessing.dummy eine Instanz von ThreadPool zurück, die eine Unterklasse von Pool ist und alle gleichen Methodenaufrufe unterstützt, aber eine Gruppe von Worker-Threads anstelle von Worker-Prozessen verwendet.
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
Ein Thread-Pool-Objekt, das einen Pool von Worker-Threads steuert, an den Jobs übermittelt werden können.
ThreadPool-Instanzen sind vollständig mitPool-Instanzen kompatibel, und ihre Ressourcen müssen entweder durch Verwendung des Pools als Kontextmanager oder durch manuelles Aufrufen vonclose()undterminate()ordnungsgemäß verwaltet werden.processes ist die Anzahl der zu verwendenden Worker-Threads. Wenn processes
Noneist, wird die vonos.process_cpu_count()zurückgegebene Anzahl verwendet.Wenn initializer nicht
Noneist, ruft jeder Worker-Prozessinitializer(*initargs)auf, wenn er startet.Im Gegensatz zu
Poolkönnen maxtasksperchild und context nicht angegeben werden.Hinweis
Ein
ThreadPoolteilt sich die gleiche Schnittstelle wiePool, die um einen Prozesspool herum aufgebaut ist und vor der Einführung des Modulsconcurrent.futuresexistierte. Daher erbt es einige Operationen, die für einen Thread-basierten Pool keinen Sinn ergeben, und es hat seinen eigenen Typ zur Darstellung des Status von asynchronen Jobs,AsyncResult, der von keiner anderen Bibliothek verstanden wird.Benutzer sollten generell
concurrent.futures.ThreadPoolExecutorbevorzugen, das eine einfachere Schnittstelle hat, die von Anfang an für Threads konzipiert wurde, und dasconcurrent.futures.Future-Instanzen zurückgibt, die mit vielen anderen Bibliotheken kompatibel sind, einschließlichasyncio.
Programmierrichtlinien¶
Es gibt bestimmte Richtlinien und Idiome, die bei der Verwendung von multiprocessing beachtet werden sollten.
Alle Startmethoden¶
Das Folgende gilt für alle Startmethoden.
Vermeiden Sie geteilten Zustand
Soweit wie möglich sollte versucht werden, die Übertragung großer Datenmengen zwischen Prozessen zu vermeiden.
Es ist wahrscheinlich am besten, für die Kommunikation zwischen Prozessen Warteschlangen oder Pipes zu verwenden, anstatt auf synchronisationsprimitive der unteren Ebene zurückzugreifen.
Pickelbarkeit
Stellen Sie sicher, dass die Argumente der Methoden von Proxys pickelbar sind.
Thread-Sicherheit von Proxys
Verwenden Sie ein Proxy-Objekt nicht aus mehr als einem Thread, es sei denn, Sie schützen es mit einer Sperre.
(Es gibt niemals ein Problem, wenn verschiedene Prozesse denselben Proxy verwenden.)
Zombifizierung von Prozessen vermeiden
Auf POSIX-Systemen wird ein beendeter, aber noch nicht gekoppelter Prozess zu einem Zombie. Davon sollte es nie sehr viele geben, da jedes Mal, wenn ein neuer Prozess gestartet wird (oder
active_children()aufgerufen wird), alle abgeschlossenen, noch nicht gekoppelten Prozesse gekoppelt werden. Auch das Aufrufen vonProcess.is_aliveeines abgeschlossenen Prozesses koppelt den Prozess. Dennoch ist es wahrscheinlich gute Praxis, alle gestarteten Prozesse explizit zu koppeln.
Besser vererben als picklen/entpicklen
Bei Verwendung der Startmethoden spawn oder forkserver müssen viele Typen aus
multiprocessingpickelbar sein, damit Kindprozesse sie verwenden können. Es sollte jedoch im Allgemeinen vermieden werden, geteilte Objekte über Pipes oder Warteschlangen an andere Prozesse zu senden. Stattdessen sollten Sie das Programm so gestalten, dass ein Prozess, der Zugriff auf eine anderswo erstellte geteilte Ressource benötigt, diese von einem Vorfahren erben kann.
Vermeiden Sie das Beenden von Prozessen
Die Verwendung der Methode
Process.terminatezum Stoppen eines Prozesses kann dazu führen, dass gemeinsam genutzte Ressourcen (wie Sperren, Semaphoren, Pipes und Warteschlangen), die gerade von dem Prozess verwendet werden, beschädigt oder für andere Prozesse unzugänglich werden.Daher ist es wahrscheinlich am besten,
Process.terminatenur für Prozesse in Betracht zu ziehen, die niemals gemeinsam genutzte Ressourcen verwenden.
Prozesse mit Warteschlangen koppeln
Beachten Sie, dass ein Prozess, der Elemente in eine Warteschlange gelegt hat, wartet, bevor er beendet wird, bis alle gepufferten Elemente vom "Feeder"-Thread an die zugrunde liegende Pipe übergeben wurden. (Der Kindprozess kann die Methode
Queue.cancel_join_threadder Warteschlange aufrufen, um dieses Verhalten zu vermeiden.)Das bedeutet, dass Sie jedes Mal, wenn Sie eine Warteschlange verwenden, sicherstellen müssen, dass alle Elemente, die in die Warteschlange gestellt wurden, schließlich entfernt werden, bevor der Prozess gekoppelt wird. Andernfalls können Sie nicht sicher sein, dass Prozesse, die Elemente in die Warteschlange gelegt haben, beendet werden. Denken Sie auch daran, dass nicht-daimonische Prozesse automatisch gekoppelt werden.
Ein Beispiel, das zu einem Deadlock führt, ist folgendes
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Eine Korrektur hier wäre, die letzten beiden Zeilen zu tauschen (oder einfach die Zeile
p.join()zu entfernen).
Ressourcen explizit an Kindprozesse übergeben
Unter POSIX kann ein Kindprozess bei Verwendung der Startmethode fork eine in einem Elternprozess erstellte gemeinsam genutzte Ressource über eine globale Ressource nutzen. Es ist jedoch besser, das Objekt als Argument an den Konstruktor für den Kindprozess zu übergeben.
Abgesehen davon, dass der Code (potenziell) mit Windows und den anderen Startmethoden kompatibel ist, stellt dies auch sicher, dass das Objekt nicht vom Elternprozess durch die Garbage Collection entfernt wird, solange der Kindprozess noch lebt. Dies kann wichtig sein, wenn eine Ressource freigegeben wird, wenn das Objekt im Elternprozess durch die Garbage Collection entfernt wird.
Zum Beispiel
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()sollte umgeschrieben werden als
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Vorsicht beim Ersetzen von sys.stdin durch ein „file-like object“
multiprocessing hat ursprünglich bedingungslos aufgerufen
os.close(sys.stdin.fileno())in der Methode
multiprocessing.Process._bootstrap()— dies führte zu Problemen mit Prozessen innerhalb von Prozessen. Dies wurde geändert zusys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)was das grundlegende Problem von kollidierenden Prozessen, die zu einem Fehler mit ungültigem Dateideskriptor führen, löst, aber eine potenzielle Gefahr für Anwendungen birgt, die
sys.stdin()durch ein „file-like object“ mit Ausgabe-Pufferung ersetzen. Diese Gefahr besteht darin, dass, wenn mehrere Prozesseclose()auf diesem file-like object aufrufen, dies dazu führen kann, dass dieselben Daten mehrmals in das Objekt geschrieben werden, was zu Korruption führt.Wenn Sie ein file-like object schreiben und Ihren eigenen Cache implementieren, können Sie es fork-sicher machen, indem Sie die PID speichern, wann immer Sie an den Cache anhängen, und den Cache verwerfen, wenn sich die PID ändert. Zum Beispiel
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheWeitere Informationen finden Sie unter bpo-5155, bpo-5313 und bpo-5331
Die Startmethoden spawn und forkserver¶
Es gibt einige zusätzliche Einschränkungen, die für die Startmethode fork nicht gelten.
Mehr Pickelbarkeit
Stellen Sie sicher, dass alle Argumente für
Processpickelbar sind. Wenn Sie zudemProcess.__init__überschreiben, müssen Sie sicherstellen, dass Instanzen pickelbar sind, wenn die MethodeProcess.startaufgerufen wird.
Globale Variablen
Beachten Sie, dass, wenn Code, der in einem Kindprozess ausgeführt wird, versucht, auf eine globale Variable zuzugreifen, der von ihm gesehene Wert (falls vorhanden) möglicherweise nicht mit dem Wert im Elternprozess zum Zeitpunkt des Aufrufs von
Process.startübereinstimmt.Globale Variablen, die nur Modulkonstanten sind, verursachen jedoch keine Probleme.
Sicheres Importieren des Hauptmoduls
Stellen Sie sicher, dass das Hauptmodul von einem neuen Python-Interpreter sicher importiert werden kann, ohne unbeabsichtigte Nebenwirkungen zu verursachen (wie z. B. den Start eines neuen Prozesses).
Zum Beispiel würde die Ausführung des folgenden Moduls mit der Startmethode spawn oder forkserver mit einem
RuntimeErrorfehlschlagenfrom multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Stattdessen sollte der „Einstiegspunkt“ des Programms durch Verwendung von
if __name__ == '__main__':wie folgt geschützt werdenfrom multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Die Zeile
freeze_support()kann weggelassen werden, wenn das Programm normal und nicht als ausführbare Datei ausgeführt wird.)Dies ermöglicht es dem neu erzeugten Python-Interpreter, das Modul sicher zu importieren und dann die Funktion
foo()des Moduls auszuführen.Ähnliche Einschränkungen gelten, wenn ein Pool oder Manager im Hauptmodul erstellt wird.
Beispiele¶
Demonstration der Erstellung und Verwendung von benutzerdefinierten Managern und Proxys
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Verwendung von Pool
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Ein Beispiel, das zeigt, wie Warteschlangen verwendet werden, um Aufgaben an eine Sammlung von Worker-Prozessen zu übergeben und die Ergebnisse zu sammeln
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()