Transports und Protokolle

Vorwort

Transports und Protokolle werden von den **Low-Level** Event-Loop-APIs wie loop.create_connection() verwendet. Sie verwenden einen Callback-basierten Programmierstil und ermöglichen Hochleistungs-Implementierungen von Netzwerk- oder IPC-Protokollen (z.B. HTTP).

Im Wesentlichen sollten Transports und Protokolle nur in Bibliotheken und Frameworks verwendet werden und niemals in High-Level-Asyncio-Anwendungen.

Diese Dokumentationsseite behandelt sowohl Transports als auch Protokolle.

Introduction

Auf höchster Ebene beschäftigt sich der Transport damit, *wie* Bytes übertragen werden, während das Protokoll bestimmt, *welche* Bytes übertragen werden sollen (und bis zu einem gewissen Grad wann).

Eine andere Formulierung desselben Gedankens: Ein Transport ist eine Abstraktion für einen Socket (oder einen ähnlichen E/A-Endpunkt), während ein Protokoll eine Abstraktion für eine Anwendung ist, aus Sicht des Transports.

Noch eine andere Sichtweise ist, dass die Transport- und Protokollschnittstellen zusammen eine abstrakte Schnittstelle für die Verwendung von Netzwerk-E/A und Interprozess-E/A definieren.

Zwischen Transport- und Protokollobjekten besteht immer eine 1:1-Beziehung: Das Protokoll ruft Transportmethoden auf, um Daten zu senden, während der Transport Protokollmethoden aufruft, um ihm empfangene Daten zu übergeben.

Die meisten verbindungsorientierten Event-Loop-Methoden (wie loop.create_connection()) akzeptieren normalerweise ein *protocol_factory*-Argument, das verwendet wird, um ein *Protocol*-Objekt für eine akzeptierte Verbindung zu erstellen, die durch ein *Transport*-Objekt dargestellt wird. Solche Methoden geben normalerweise ein Tupel von (transport, protocol) zurück.

Inhalt

Diese Dokumentationsseite enthält die folgenden Abschnitte

Transports

Quellcode: Lib/asyncio/transports.py


Transports sind Klassen, die von asyncio bereitgestellt werden, um verschiedene Arten von Kommunikationskanälen zu abstrahieren.

Transportobjekte werden immer von einer asyncio Event-Loop instanziiert.

asyncio implementiert Transports für TCP, UDP, SSL und Subprozess-Pipes. Die auf einem Transport verfügbaren Methoden hängen von der Art des Transports ab.

Die Transportklassen sind nicht Thread-sicher.

Hierarchie der Transports

class asyncio.BaseTransport

Basisklasse für alle Transports. Enthält Methoden, die alle asyncio-Transports gemeinsam haben.

class asyncio.WriteTransport(BaseTransport)

Ein Basis-Transport für Verbindungen, die nur zum Schreiben dienen.

Instanzen der Klasse *WriteTransport* werden von der Event-Loop-Methode loop.connect_write_pipe() zurückgegeben und auch von Subprozess-bezogenen Methoden wie loop.subprocess_exec() verwendet.

class asyncio.ReadTransport(BaseTransport)

Ein Basis-Transport für Lese-only-Verbindungen.

Instanzen der Klasse *ReadTransport* werden von der Event-Loop-Methode loop.connect_read_pipe() zurückgegeben und auch von Subprozess-bezogenen Methoden wie loop.subprocess_exec() verwendet.

class asyncio.Transport(WriteTransport, ReadTransport)

Schnittstelle, die einen bidirektionalen Transport darstellt, wie z.B. eine TCP-Verbindung.

Der Benutzer instanziiert einen Transport nicht direkt; er ruft eine Hilfsfunktion auf, übergibt ihr eine Protokollfabrik und andere notwendige Informationen, um den Transport und das Protokoll zu erstellen.

Instanzen der Klasse *Transport* werden von oder durch Event-Loop-Methoden wie loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile() usw. zurückgegeben.

class asyncio.DatagramTransport(BaseTransport)

Ein Transport für Datagramm-(UDP)-Verbindungen.

Instanzen der Klasse *DatagramTransport* werden von der Event-Loop-Methode loop.create_datagram_endpoint() zurückgegeben.

class asyncio.SubprocessTransport(BaseTransport)

Eine Abstraktion zur Darstellung einer Verbindung zwischen einem Eltern- und seinem Kind-Betriebssystemprozess.

Instanzen der Klasse *SubprocessTransport* werden von den Event-Loop-Methoden loop.subprocess_shell() und loop.subprocess_exec() zurückgegeben.

Basis-Transport

BaseTransport.close()

Schließt den Transport.

Wenn der Transport einen Puffer für ausgehende Daten hat, werden gepufferte Daten asynchron geleert. Es werden keine weiteren Daten empfangen. Nachdem alle gepufferten Daten geleert sind, wird die Methode protocol.connection_lost() des Protokolls mit None als Argument aufgerufen. Der Transport sollte nicht mehr verwendet werden, sobald er geschlossen ist.

BaseTransport.is_closing()

Gibt True zurück, wenn der Transport geschlossen wird oder bereits geschlossen ist.

BaseTransport.get_extra_info(name, default=None)

Gibt Informationen über den Transport oder die zugrunde liegenden Ressourcen zurück, die er verwendet.

name ist ein String, der das Stück transport-spezifischer Informationen darstellt, das abgefragt werden soll.

default ist der Wert, der zurückgegeben wird, wenn die Information nicht verfügbar ist oder wenn der Transport sie mit der gegebenen Drittanbieter-Event-Loop-Implementierung oder auf der aktuellen Plattform nicht abfragen kann.

Zum Beispiel versucht der folgende Code, das zugrunde liegende Socket-Objekt des Transports abzurufen

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

Kategorien von Informationen, die bei einigen Transports abgefragt werden können

BaseTransport.set_protocol(protocol)

Legt ein neues Protokoll fest.

Das Wechseln des Protokolls sollte nur erfolgen, wenn beide Protokolle den Wechsel unterstützen.

BaseTransport.get_protocol()

Gibt das aktuelle Protokoll zurück.

Schreibgeschützte Transports

ReadTransport.is_reading()

Gibt True zurück, wenn der Transport neue Daten empfängt.

Hinzugefügt in Version 3.7.

ReadTransport.pause_reading()

Pausiert die Empfangsseite des Transports. Es werden keine Daten an die Methode protocol.data_received() des Protokolls weitergegeben, bis resume_reading() aufgerufen wird.

Geändert in Version 3.7: Die Methode ist idempotent, d.h. sie kann aufgerufen werden, wenn der Transport bereits pausiert oder geschlossen ist.

ReadTransport.resume_reading()

Setzt den Empfang fort. Die Methode protocol.data_received() des Protokolls wird wieder aufgerufen, wenn Daten zum Lesen verfügbar sind.

Geändert in Version 3.7: Die Methode ist idempotent, d.h. sie kann aufgerufen werden, wenn der Transport bereits liest.

Nur Schreib-Transports

WriteTransport.abort()

Schließt den Transport sofort, ohne auf den Abschluss ausstehender Operationen zu warten. Gepufferte Daten gehen verloren. Es werden keine weiteren Daten empfangen. Die Methode protocol.connection_lost() des Protokolls wird schließlich mit None als Argument aufgerufen.

WriteTransport.can_write_eof()

Gibt True zurück, wenn der Transport write_eof() unterstützt, False wenn nicht.

WriteTransport.get_write_buffer_size()

Gibt die aktuelle Größe des vom Transport verwendeten Ausgabepuffers zurück.

WriteTransport.get_write_buffer_limits()

Ruft die oberen und unteren Schwellenwerte (Watermarks) für die Schreibflusskontrolle ab. Gibt ein Tupel (low, high) zurück, wobei low und high positive Byte-Zahlen sind.

Verwenden Sie set_write_buffer_limits(), um die Grenzen festzulegen.

Hinzugefügt in Version 3.4.2.

WriteTransport.set_write_buffer_limits(high=None, low=None)

Setzt die oberen und unteren Schwellenwerte (Watermarks) für die Schreibflusskontrolle.

Diese beiden Werte (gemessen in Bytes) steuern, wann die Methoden protocol.pause_writing() und protocol.resume_writing() des Protokolls aufgerufen werden. Wenn angegeben, muss der niedrige Schwellenwert kleiner oder gleich dem hohen Schwellenwert sein. Weder *high* noch *low* dürfen negativ sein.

pause_writing() wird aufgerufen, wenn die Puffergröße den *high*-Wert erreicht oder überschreitet. Wenn das Schreiben pausiert wurde, wird resume_writing() aufgerufen, wenn die Puffergröße den *low*-Wert erreicht oder unterschreitet.

Die Standardwerte sind implementierungsspezifisch. Wenn nur der hohe Schwellenwert angegeben wird, ist der niedrige Schwellenwert standardmäßig ein implementierungsspezifischer Wert, der kleiner oder gleich dem hohen Schwellenwert ist. Das Setzen von *high* auf Null zwingt auch *low* auf Null und führt dazu, dass pause_writing() aufgerufen wird, wann immer der Puffer nicht leer ist. Das Setzen von *low* auf Null bewirkt, dass resume_writing() nur aufgerufen wird, wenn der Puffer leer ist. Die Verwendung von Null für einen der Grenzwerte ist im Allgemeinen suboptimal, da sie die Möglichkeiten zur gleichzeitigen Ausführung von E/A und Berechnungen einschränkt.

Verwenden Sie get_write_buffer_limits(), um die Grenzen abzurufen.

WriteTransport.write(data)

Schreibt einige *data*-Bytes in den Transport.

Diese Methode blockiert nicht; sie puffert die Daten und arrangiert, dass sie asynchron gesendet werden.

WriteTransport.writelines(list_of_data)

Schreibt eine Liste (oder ein beliebiges iterierbares Objekt) von Datenbytes in den Transport. Dies ist funktional äquivalent zum Aufruf von write() für jedes von der Iterierbaren gelieferte Element, kann aber effizienter implementiert sein.

WriteTransport.write_eof()

Schließt das Schreibende des Transports nach dem Leeren aller gepufferten Daten. Daten können immer noch empfangen werden.

Diese Methode kann NotImplementedError auslösen, wenn der Transport (z.B. SSL) keine halb geschlossenen Verbindungen unterstützt.

Datagramm-Transports

DatagramTransport.sendto(data, addr=None)

Sendet die *data*-Bytes an den Remote-Peer, der durch addr angegeben wird (eine transportabhängige Zieladresse). Wenn addr None ist, werden die Daten an die bei der Transporterstellung angegebene Zieladresse gesendet.

Diese Methode blockiert nicht; sie puffert die Daten und arrangiert, dass sie asynchron gesendet werden.

Geändert in Version 3.13: Diese Methode kann mit einem leeren Bytes-Objekt aufgerufen werden, um ein Null-Längen-Datagramm zu senden. Die für die Flusskontrolle verwendete Puffergrößenberechnung wird ebenfalls aktualisiert, um den Datagramm-Header zu berücksichtigen.

DatagramTransport.abort()

Schließt den Transport sofort, ohne auf den Abschluss ausstehender Operationen zu warten. Gepufferte Daten gehen verloren. Es werden keine weiteren Daten empfangen. Die Methode protocol.connection_lost() des Protokolls wird schließlich mit None als Argument aufgerufen.

Subprozess-Transports

SubprocessTransport.get_pid()

Gibt die Prozess-ID des Subprozesses als Integer zurück.

SubprocessTransport.get_pipe_transport(fd)

Gibt den Transport für die Kommunikationspipe zurück, die dem ganzzahligen Dateideskriptor *fd* entspricht.

  • 0: beschreibbarer Streaming-Transport der Standardeingabe (*stdin*) oder None, wenn der Subprozess nicht mit stdin=PIPE erstellt wurde.

  • 1: lesbarer Streaming-Transport der Standardausgabe (*stdout*) oder None, wenn der Subprozess nicht mit stdout=PIPE erstellt wurde.

  • 2: lesbarer Streaming-Transport des Standardfehlers (*stderr*) oder None, wenn der Subprozess nicht mit stderr=PIPE erstellt wurde.

  • anderes *fd*: None

SubprocessTransport.get_returncode()

Gibt den Rückgabecode des Subprozesses als Integer oder None zurück, wenn er noch nicht zurückgekehrt ist, was dem Attribut subprocess.Popen.returncode ähnelt.

SubprocessTransport.kill()

Beendet den Subprozess.

Auf POSIX-Systemen sendet die Funktion SIGKILL an den Subprozess. Unter Windows ist diese Methode ein Alias für terminate().

Siehe auch subprocess.Popen.kill().

SubprocessTransport.send_signal(signal)

Sendet die Signalnummer *signal* an den Subprozess, wie in subprocess.Popen.send_signal().

SubprocessTransport.terminate()

Stoppt den Subprozess.

Auf POSIX-Systemen sendet diese Methode SIGTERM an den Subprozess. Unter Windows wird die Windows-API-Funktion TerminateProcess() aufgerufen, um den Subprozess zu beenden.

Siehe auch subprocess.Popen.terminate().

SubprocessTransport.close()

Beenden Sie den Subprozess durch Aufrufen der Methode kill().

Wenn der Subprozess noch nicht zurückgekehrt ist, schließen Sie die Transporte der Pipes stdin, stdout und stderr.

Protokolle

Quellcode: Lib/asyncio/protocols.py


asyncio bietet eine Reihe von abstrakten Basisklassen, die zur Implementierung von Netzwerkprotokollen verwendet werden sollten. Diese Klassen sind für die Verwendung zusammen mit Transports gedacht.

Unterklassen von abstrakten Basisprotokollklassen können einige oder alle Methoden implementieren. Alle diese Methoden sind Rückrufe (callbacks): sie werden von Transports bei bestimmten Ereignissen aufgerufen, zum Beispiel, wenn Daten empfangen werden. Eine Basisprotokollmethode sollte vom entsprechenden Transport aufgerufen werden.

Basisprotokolle

class asyncio.BaseProtocol

Basisprotokoll mit Methoden, die alle Protokolle gemeinsam haben.

class asyncio.Protocol(BaseProtocol)

Die Basisklasse für die Implementierung von Streaming-Protokollen (TCP, Unix-Sockets usw.).

class asyncio.BufferedProtocol(BaseProtocol)

Eine Basisklasse für die Implementierung von Streaming-Protokollen mit manueller Steuerung des Empfangspuffers.

class asyncio.DatagramProtocol(BaseProtocol)

Die Basisklasse für die Implementierung von Datagramm- (UDP-) Protokollen.

class asyncio.SubprocessProtocol(BaseProtocol)

Die Basisklasse für die Implementierung von Protokollen zur Kommunikation mit Kindprozessen (unidirektionale Pipes).

Basisprotokoll

Alle asyncio-Protokolle können Basisprotokoll-Rückrufe implementieren.

Verbindungsrückrufe

Verbindungsrückrufe werden für alle Protokolle aufgerufen, genau einmal pro erfolgreicher Verbindung. Alle anderen Protokollrückrufe können nur zwischen diesen beiden Methoden aufgerufen werden.

BaseProtocol.connection_made(transport)

Aufgerufen, wenn eine Verbindung hergestellt wird.

Das Argument transport ist der Transport, der die Verbindung repräsentiert. Das Protokoll ist dafür verantwortlich, eine Referenz auf seinen Transport zu speichern.

BaseProtocol.connection_lost(exc)

Aufgerufen, wenn die Verbindung verloren geht oder geschlossen wird.

Das Argument ist entweder ein Ausnahmeobjekt oder None. Letzteres bedeutet, dass ein reguläres EOF empfangen wurde oder die Verbindung von dieser Seite der Verbindung abgebrochen oder geschlossen wurde.

Flusskontrollrückrufe

Flusskontrollrückrufe können von Transports aufgerufen werden, um das vom Protokoll durchgeführte Schreiben zu pausieren oder fortzusetzen.

Weitere Details finden Sie in der Dokumentation der Methode set_write_buffer_limits().

BaseProtocol.pause_writing()

Aufgerufen, wenn der Puffer des Transports die obere Schwelle überschreitet.

BaseProtocol.resume_writing()

Aufgerufen, wenn der Puffer des Transports unter die untere Schwelle fällt.

Wenn die Puffergröße der oberen Schwelle entspricht, wird pause_writing() nicht aufgerufen: die Puffergröße muss strenger überschritten werden.

Umgekehrt wird resume_writing() aufgerufen, wenn die Puffergröße gleich oder kleiner als die untere Schwelle ist. Diese Endbedingungen sind wichtig, um sicherzustellen, dass die Dinge wie erwartet ablaufen, wenn eine der Marken Null ist.

Streaming-Protokolle

Ereignismethoden wie loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() und loop.connect_write_pipe() akzeptieren Fabriken, die Streaming-Protokolle zurückgeben.

Protocol.data_received(data)

Aufgerufen, wenn Daten empfangen werden. data ist ein nicht leerer Bytes-Objekt, das die eingehenden Daten enthält.

Ob die Daten gepuffert, stückweise oder wieder zusammengefügt werden, hängt vom Transport ab. Im Allgemeinen sollten Sie sich nicht auf bestimmte Semantiken verlassen und stattdessen Ihre Analysen generisch und flexibel gestalten. Die Daten werden jedoch immer in der richtigen Reihenfolge empfangen.

Die Methode kann beliebig oft aufgerufen werden, solange eine Verbindung geöffnet ist.

Jedoch wird protocol.eof_received() höchstens einmal aufgerufen. Sobald eof_received() aufgerufen wird, wird data_received() nicht mehr aufgerufen.

Protocol.eof_received()

Aufgerufen, wenn das andere Ende signalisiert, dass es keine weiteren Daten senden wird (z. B. durch Aufruf von transport.write_eof(), wenn das andere Ende ebenfalls asyncio verwendet).

Diese Methode kann einen falschen Wert (einschließlich None) zurückgeben, in welchem Fall der Transport sich selbst schließt. Umgekehrt, wenn diese Methode einen wahren Wert zurückgibt, bestimmt das verwendete Protokoll, ob der Transport geschlossen wird. Da die Standardimplementierung None zurückgibt, schließt sie implizit die Verbindung.

Einige Transporte, einschließlich SSL, unterstützen keine halb geschlossenen Verbindungen. In diesem Fall führt die Rückgabe von True aus dieser Methode dazu, dass die Verbindung geschlossen wird.

Zustandsautomat

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

Gepufferte Streaming-Protokolle

Hinzugefügt in Version 3.7.

Gepufferte Protokolle können mit jeder Ereignisschleifenmethode verwendet werden, die Streaming-Protokolle unterstützt.

BufferedProtocol-Implementierungen ermöglichen die explizite manuelle Zuweisung und Steuerung des Empfangspuffers. Ereignisschleifen können dann den vom Protokoll bereitgestellten Puffer verwenden, um unnötige Datenkopien zu vermeiden. Dies kann zu einer spürbaren Leistungssteigerung bei Protokollen führen, die große Datenmengen empfangen. Anspruchsvolle Protokollimplementierungen können die Anzahl der Pufferzuweisungen erheblich reduzieren.

Die folgenden Rückrufe werden für BufferedProtocol-Instanzen aufgerufen

BufferedProtocol.get_buffer(sizehint)

Aufgerufen, um einen neuen Empfangspuffer zuzuweisen.

sizehint ist die empfohlene Mindestgröße für den zurückgegebenen Puffer. Es ist akzeptabel, kleinere oder größere Puffer als von sizehint vorgeschlagen zurückzugeben. Wenn auf -1 gesetzt, kann die Puffergröße beliebig sein. Die Rückgabe eines Puffers mit Nullgröße ist ein Fehler.

get_buffer() muss ein Objekt zurückgeben, das das Pufferprotokoll implementiert.

BufferedProtocol.buffer_updated(nbytes)

Aufgerufen, wenn der Puffer mit den empfangenen Daten aktualisiert wurde.

nbytes ist die Gesamtzahl der Bytes, die in den Puffer geschrieben wurden.

BufferedProtocol.eof_received()

Siehe die Dokumentation der Methode protocol.eof_received().

get_buffer() kann während einer Verbindung beliebig oft aufgerufen werden. Jedoch wird protocol.eof_received() höchstens einmal aufgerufen und, wenn es aufgerufen wird, werden get_buffer() und buffer_updated() danach nicht mehr aufgerufen.

Zustandsautomat

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

Datagramm-Protokolle

Datagramm-Protokollinstanzen sollten von Protokollfabriken erstellt werden, die an die Methode loop.create_datagram_endpoint() übergeben werden.

DatagramProtocol.datagram_received(data, addr)

Aufgerufen, wenn ein Datagramm empfangen wird. data ist ein Bytes-Objekt, das die eingehenden Daten enthält. addr ist die Adresse des Peers, der die Daten sendet; das genaue Format hängt vom Transport ab.

DatagramProtocol.error_received(exc)

Aufgerufen, wenn eine vorherige Sende- oder Empfangsoperation eine OSError auslöst. exc ist die OSError-Instanz.

Diese Methode wird unter seltenen Bedingungen aufgerufen, wenn der Transport (z. B. UDP) feststellt, dass ein Datagramm nicht an seinen Empfänger zugestellt werden konnte. In vielen Fällen werden jedoch nicht zustellbare Datagramme stillschweigend verworfen.

Hinweis

Auf BSD-Systemen (macOS, FreeBSD usw.) wird die Flusskontrolle für Datagramm-Protokolle nicht unterstützt, da es keine zuverlässige Möglichkeit gibt, Sendeausfälle zu erkennen, die durch das Schreiben zu vieler Pakete verursacht werden.

Der Socket erscheint immer als 'bereit' und überschüssige Pakete werden verworfen. Eine OSError mit errno auf errno.ENOBUFS gesetzt, kann ausgelöst werden oder auch nicht; wenn sie ausgelöst wird, wird sie an DatagramProtocol.error_received() gemeldet, aber ansonsten ignoriert.

Subprozess-Protokolle

Subprozess-Protokollinstanzen sollten von Protokollfabriken erstellt werden, die an die Methoden loop.subprocess_exec() und loop.subprocess_shell() übergeben werden.

SubprocessProtocol.pipe_data_received(fd, data)

Aufgerufen, wenn der Kindprozess Daten in seine stdout- oder stderr-Pipe schreibt.

fd ist der ganzzahlige Dateideskriptor der Pipe.

data ist ein nicht leerer Bytes-Objekt, das die empfangenen Daten enthält.

SubprocessProtocol.pipe_connection_lost(fd, exc)

Aufgerufen, wenn eine der Pipes, die mit dem Kindprozess kommunizieren, geschlossen wird.

fd ist der ganzzahlige Dateideskriptor, der geschlossen wurde.

SubprocessProtocol.process_exited()

Aufgerufen, wenn der Kindprozess beendet wurde.

Sie kann vor den Methoden pipe_data_received() und pipe_connection_lost() aufgerufen werden.

Beispiele

TCP-Echo-Server

Erstellen Sie einen TCP-Echo-Server mit der Methode loop.create_server(), senden Sie empfangene Daten zurück und schließen Sie die Verbindung.

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        EchoServerProtocol,
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

Siehe auch

Das Beispiel TCP-Echo-Server mit Streams verwendet die High-Level-Funktion asyncio.start_server().

TCP-Echo-Client

Ein TCP-Echo-Client, der die Methode loop.create_connection() verwendet, Daten sendet und wartet, bis die Verbindung geschlossen ist.

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Siehe auch

Das Beispiel TCP-Echo-Client mit Streams verwendet die High-Level-Funktion asyncio.open_connection().

UDP-Echo-Server

Ein UDP-Echo-Server, der die Methode loop.create_datagram_endpoint() verwendet und empfangene Daten zurücksendet.

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        EchoServerProtocol,
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

UDP-Echo-Client

Ein UDP-Echo-Client, der die Methode loop.create_datagram_endpoint() verwendet, Daten sendet und den Transport schließt, wenn die Antwort empfangen wird.

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Verbindung zu bestehenden Sockets

Warten, bis ein Socket Daten empfängt, mit der Methode loop.create_connection() mit einem Protokoll.

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

Siehe auch

Das Beispiel einen Dateideskriptor auf Leseevents überwachen verwendet die Low-Level-Methode loop.add_reader(), um eine FD zu registrieren.

Das Beispiel einen offenen Socket zum Warten auf Daten mit Streams registrieren verwendet High-Level-Streams, die von der Funktion open_connection() in einer Coroutine erstellt werden.

loop.subprocess_exec() und SubprocessProtocol

Ein Beispiel für ein Subprozess-Protokoll, das verwendet wird, um die Ausgabe eines Subprozesses zu erhalten und auf dessen Beendigung zu warten.

Der Subprozess wird von der Methode loop.subprocess_exec() erstellt.

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()
        self.pipe_closed = False
        self.exited = False

    def pipe_connection_lost(self, fd, exc):
        self.pipe_closed = True
        self.check_for_exit()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exited = True
        # process_exited() method can be called before
        # pipe_connection_lost() method: wait until both methods are
        # called.
        self.check_for_exit()

    def check_for_exit(self):
        if self.pipe_closed and self.exited:
            self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

Siehe auch das gleiche Beispiel, das mit High-Level-APIs geschrieben wurde.