Streams

Quellcode: Lib/asyncio/streams.py


Streams sind High-Level-Primitives, die für die Arbeit mit Netzwerkverbindungen asynchron/await-fähig sind. Streams ermöglichen das Senden und Empfangen von Daten, ohne Callbacks oder Low-Level-Protokolle und Transports zu verwenden.

Hier ist ein Beispiel für einen TCP-Echo-Client, der mit asyncio Streams geschrieben wurde

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Siehe auch den Abschnitt Beispiele unten.

Stream-Funktionen

Die folgenden Top-Level-asyncio-Funktionen können zum Erstellen und Arbeiten mit Streams verwendet werden

async asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Stellt eine Netzwerkverbindung her und gibt ein Paar von (reader, writer) Objekten zurück.

Die zurückgegebenen *reader*- und *writer*-Objekte sind Instanzen der Klassen StreamReader und StreamWriter.

Die Größe des Puffers des zurückgegebenen StreamReader-Objekts wird durch *limit* bestimmt. Standardmäßig ist *limit* auf 64 KiB eingestellt.

Die restlichen Argumente werden direkt an loop.create_connection() übergeben.

Hinweis

Das Argument *sock* überträgt den Besitz des Sockets an den erstellten StreamWriter. Um den Socket zu schließen, rufen Sie seine close()-Methode auf.

Geändert in Version 3.7: Parameter *ssl_handshake_timeout* hinzugefügt.

Geändert in Version 3.8: Parameter *happy_eyeballs_delay* und *interleave* hinzugefügt.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.

async asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Startet einen Socket-Server.

Der Callback *client_connected_cb* wird jedes Mal aufgerufen, wenn eine neue Client-Verbindung hergestellt wird. Er empfängt ein (reader, writer)-Paar als zwei Argumente, Instanzen der Klassen StreamReader und StreamWriter.

*client_connected_cb* kann ein einfacher Callable oder eine Coroutine-Funktion sein; wenn es sich um eine Coroutine-Funktion handelt, wird sie automatisch als Task geplant.

Die Größe des Puffers des zurückgegebenen StreamReader-Objekts wird durch *limit* bestimmt. Standardmäßig ist *limit* auf 64 KiB eingestellt.

Die restlichen Argumente werden direkt an loop.create_server() übergeben.

Hinweis

Das Argument *sock* überträgt den Besitz des Sockets an den erstellten Server. Um den Socket zu schließen, rufen Sie die close()-Methode des Servers auf.

Geändert in Version 3.7: Parameter *ssl_handshake_timeout* und *start_serving* hinzugefügt.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.

Geändert in Version 3.13: Parameter *keep_alive* hinzugefügt.

Unix-Sockets

async asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Stellt eine Unix-Socket-Verbindung her und gibt ein Paar von (reader, writer) zurück.

Ähnlich wie open_connection(), operiert aber auf Unix-Sockets.

Siehe auch die Dokumentation von loop.create_unix_connection().

Hinweis

Das Argument *sock* überträgt den Besitz des Sockets an den erstellten StreamWriter. Um den Socket zu schließen, rufen Sie seine close()-Methode auf.

Geändert in Version 3.7: Parameter *ssl_handshake_timeout* hinzugefügt. Der Parameter *path* kann jetzt ein pfadähnliches Objekt sein.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.

async asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)

Startet einen Unix-Socket-Server.

Ähnlich wie start_server(), funktioniert aber mit Unix-Sockets.

Wenn *cleanup_socket* true ist, wird der Unix-Socket automatisch aus dem Dateisystem entfernt, wenn der Server geschlossen wird, es sei denn, der Socket wurde nach der Erstellung des Servers ersetzt.

Siehe auch die Dokumentation von loop.create_unix_server().

Hinweis

Das Argument *sock* überträgt den Besitz des Sockets an den erstellten Server. Um den Socket zu schließen, rufen Sie die close()-Methode des Servers auf.

Geändert in Version 3.7: Parameter *ssl_handshake_timeout* und *start_serving* hinzugefügt. Der Parameter *path* kann jetzt ein pfadähnliches Objekt sein.

Geändert in Version 3.10: Das Argument loop wurde entfernt.

Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.

Geändert in Version 3.13: Parameter *cleanup_socket* hinzugefügt.

StreamReader

class asyncio.StreamReader

Stellt ein Reader-Objekt dar, das APIs zum Lesen von Daten aus dem IO-Stream bereitstellt. Als asynchrones Iterable unterstützt das Objekt die async for-Anweisung.

Es wird nicht empfohlen, *StreamReader*-Objekte direkt zu instanziieren; verwenden Sie stattdessen open_connection() und start_server().

feed_eof()

Bestätigt das EOF (End of File).

async read(n=-1)

Liest bis zu *n* Bytes aus dem Stream.

Wenn *n* nicht angegeben oder auf -1 gesetzt ist, liest bis zum EOF und gibt dann alle gelesenen bytes zurück. Wenn EOF empfangen wurde und der interne Puffer leer ist, wird ein leerer bytes-Objekt zurückgegeben.

Wenn *n* 0 ist, wird ein leerer bytes-Objekt sofort zurückgegeben.

Wenn *n* positiv ist, werden maximal *n* verfügbare bytes zurückgegeben, sobald mindestens 1 Byte im internen Puffer verfügbar ist. Wenn EOF vor dem Lesen von Bytes empfangen wird, wird ein leerer bytes-Objekt zurückgegeben.

async readline()

Liest eine Zeile, wobei eine „Zeile“ eine Byte-Sequenz ist, die mit \n endet.

Wenn EOF empfangen wird und \n nicht gefunden wurde, gibt die Methode teilweise gelesene Daten zurück.

Wenn EOF empfangen wird und der interne Puffer leer ist, wird ein leerer bytes-Objekt zurückgegeben.

async readexactly(n)

Liest exakt *n* Bytes.

Löst eine IncompleteReadError aus, wenn EOF erreicht wird, bevor *n* gelesen werden kann. Verwenden Sie das Attribut IncompleteReadError.partial, um die teilweise gelesenen Daten zu erhalten.

async readuntil(separator=b'\n')

Liest Daten aus dem Stream, bis der *separator* gefunden wird.

Bei Erfolg werden die Daten und der Separator aus dem internen Puffer entfernt (konsumiert). Die zurückgegebenen Daten enthalten den Separator am Ende.

Wenn die Menge der gelesenen Daten das konfigurierte Stream-Limit überschreitet, wird eine LimitOverrunError ausgelöst, und die Daten verbleiben im internen Puffer und können erneut gelesen werden.

Wenn EOF erreicht wird, bevor der vollständige Separator gefunden wurde, wird eine IncompleteReadError ausgelöst, und der interne Puffer wird zurückgesetzt. Das Attribut IncompleteReadError.partial kann einen Teil des Separators enthalten.

Der *separator* kann auch ein Tupel von Separatoren sein. In diesem Fall ist der Rückgabewert der kürzestmögliche, der einen beliebigen Separator als Suffix hat. Für die Zwecke von LimitOverrunError wird der kürzeste mögliche Separator als derjenige betrachtet, der übereinstimmte.

Hinzugefügt in Version 3.5.2.

Geändert in Version 3.13: Der Parameter *separator* kann jetzt ein Tupel von Separatoren sein.

at_eof()

Gibt True zurück, wenn der Puffer leer ist und feed_eof() aufgerufen wurde.

StreamWriter

class asyncio.StreamWriter

Stellt ein Writer-Objekt dar, das APIs zum Schreiben von Daten in den IO-Stream bereitstellt.

Es wird nicht empfohlen, *StreamWriter*-Objekte direkt zu instanziieren; verwenden Sie stattdessen open_connection() und start_server().

write(data)

Die Methode versucht, die *Daten* sofort an den zugrunde liegenden Socket zu schreiben. Wenn dies fehlschlägt, werden die Daten in einem internen Schreibpuffer gepuffert, bis sie gesendet werden können.

Der *data*-Puffer sollte ein Objekt vom Typ bytes, bytearray oder eine C-kontinuierliche eindimensionale Memoryview sein.

Die Methode sollte zusammen mit der drain()-Methode verwendet werden.

stream.write(data)
await stream.drain()
writelines(data)

Die Methode schreibt eine Liste (oder ein beliebiges Iterable) von Bytes sofort an den zugrunde liegenden Socket. Wenn dies fehlschlägt, werden die Daten in einem internen Schreibpuffer gepuffert, bis sie gesendet werden können.

Die Methode sollte zusammen mit der drain()-Methode verwendet werden.

stream.writelines(lines)
await stream.drain()
close()

Die Methode schließt den Stream und den zugrunde liegenden Socket.

Die Methode sollte verwendet werden, obwohl nicht zwingend erforderlich, zusammen mit der wait_closed()-Methode.

stream.close()
await stream.wait_closed()
can_write_eof()

Gibt True zurück, wenn der zugrunde liegende Transport die write_eof()-Methode unterstützt, andernfalls False.

write_eof()

Schließt das Schreibende des Streams, nachdem die gepufferten Schreibdaten geleert wurden.

transport

Gibt den zugrunde liegenden asyncio-Transport zurück.

get_extra_info(name, default=None)

Greift auf optionale Transportinformationen zu; siehe BaseTransport.get_extra_info() für Details.

async drain()

Wartet, bis es angemessen ist, mit dem Schreiben in den Stream fortzufahren. Beispiel

writer.write(data)
await writer.drain()

Dies ist eine Flusskontrollmethode, die mit dem zugrunde liegenden IO-Schreibpuffer interagiert. Wenn die Größe des Puffers die Obergrenze erreicht, blockiert *drain()*, bis die Größe des Puffers die Untergrenze unterschreitet und das Schreiben wieder aufgenommen werden kann. Wenn nichts zu warten ist, gibt drain() sofort zurück.

async start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Upgrade einer bestehenden Stream-basierten Verbindung zu TLS.

Parameter

  • *sslcontext*: eine konfigurierte Instanz von SSLContext.

  • *server_hostname*: setzt oder überschreibt den Hostnamen, gegen den das Zertifikat des Zielservers abgeglichen wird.

  • *ssl_handshake_timeout* ist die Zeit in Sekunden, die auf den Abschluss des TLS-Handshakes gewartet wird, bevor die Verbindung abgebrochen wird. 60.0 Sekunden, wenn None (Standard).

  • *ssl_shutdown_timeout* ist die Zeit in Sekunden, die auf den Abschluss des SSL-Shutdowns gewartet wird, bevor die Verbindung abgebrochen wird. 30.0 Sekunden, wenn None (Standard).

Hinzugefügt in Version 3.11.

Geändert in Version 3.12: Parameter *ssl_shutdown_timeout* hinzugefügt.

is_closing()

Gibt True zurück, wenn der Stream geschlossen ist oder gerade geschlossen wird.

Hinzugefügt in Version 3.7.

async wait_closed()

Wartet, bis der Stream geschlossen ist.

Sollte nach close() aufgerufen werden, um zu warten, bis die zugrunde liegende Verbindung geschlossen ist, und sicherzustellen, dass alle Daten geleert wurden, bevor z. B. das Programm beendet wird.

Hinzugefügt in Version 3.7.

Beispiele

TCP-Echo-Client mit Streams

TCP-Echo-Client unter Verwendung der Funktion asyncio.open_connection()

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Siehe auch

Das Beispiel TCP-Echo-Client-Protokoll verwendet die Low-Level-Methode loop.create_connection().

TCP-Echo-Server mit Streams

TCP-Echo-Server unter Verwendung der Funktion asyncio.start_server()

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Siehe auch

Das Beispiel TCP-Echo-Server-Protokoll verwendet die Methode loop.create_server().

HTTP-Header abrufen

Einfaches Beispiel zum Abfragen der HTTP-Header der über die Kommandozeile übergebenen URL

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Verwendung

python example.py http://example.com/path/page.html

oder mit HTTPS

python example.py https://example.com/path/page.html

Einen geöffneten Socket registrieren, um mit Streams auf Daten zu warten

Coroutine, die wartet, bis ein Socket Daten empfängt, unter Verwendung der Funktion open_connection()

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Siehe auch

Das Beispiel einen geöffneten Socket registrieren, um mit einem Protokoll auf Daten zu warten verwendet die Low-Level-Methode loop.create_connection().

Das Beispiel einen Dateideskriptor auf Leseevents überwachen verwendet die Low-Level-Methode loop.add_reader(), um einen Dateideskriptor zu überwachen.