Streams¶
Quellcode: Lib/asyncio/streams.py
Streams sind High-Level-Primitives, die für die Arbeit mit Netzwerkverbindungen asynchron/await-fähig sind. Streams ermöglichen das Senden und Empfangen von Daten, ohne Callbacks oder Low-Level-Protokolle und Transports zu verwenden.
Hier ist ein Beispiel für einen TCP-Echo-Client, der mit asyncio Streams geschrieben wurde
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Siehe auch den Abschnitt Beispiele unten.
Stream-Funktionen
Die folgenden Top-Level-asyncio-Funktionen können zum Erstellen und Arbeiten mit Streams verwendet werden
- async asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
Stellt eine Netzwerkverbindung her und gibt ein Paar von
(reader, writer)Objekten zurück.Die zurückgegebenen *reader*- und *writer*-Objekte sind Instanzen der Klassen
StreamReaderundStreamWriter.Die Größe des Puffers des zurückgegebenen
StreamReader-Objekts wird durch *limit* bestimmt. Standardmäßig ist *limit* auf 64 KiB eingestellt.Die restlichen Argumente werden direkt an
loop.create_connection()übergeben.Hinweis
Das Argument *sock* überträgt den Besitz des Sockets an den erstellten
StreamWriter. Um den Socket zu schließen, rufen Sie seineclose()-Methode auf.Geändert in Version 3.7: Parameter *ssl_handshake_timeout* hinzugefügt.
Geändert in Version 3.8: Parameter *happy_eyeballs_delay* und *interleave* hinzugefügt.
Geändert in Version 3.10: Das Argument loop wurde entfernt.
Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.
- async asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Startet einen Socket-Server.
Der Callback *client_connected_cb* wird jedes Mal aufgerufen, wenn eine neue Client-Verbindung hergestellt wird. Er empfängt ein
(reader, writer)-Paar als zwei Argumente, Instanzen der KlassenStreamReaderundStreamWriter.*client_connected_cb* kann ein einfacher Callable oder eine Coroutine-Funktion sein; wenn es sich um eine Coroutine-Funktion handelt, wird sie automatisch als
Taskgeplant.Die Größe des Puffers des zurückgegebenen
StreamReader-Objekts wird durch *limit* bestimmt. Standardmäßig ist *limit* auf 64 KiB eingestellt.Die restlichen Argumente werden direkt an
loop.create_server()übergeben.Hinweis
Das Argument *sock* überträgt den Besitz des Sockets an den erstellten Server. Um den Socket zu schließen, rufen Sie die
close()-Methode des Servers auf.Geändert in Version 3.7: Parameter *ssl_handshake_timeout* und *start_serving* hinzugefügt.
Geändert in Version 3.10: Das Argument loop wurde entfernt.
Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.
Geändert in Version 3.13: Parameter *keep_alive* hinzugefügt.
Unix-Sockets
- async asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Stellt eine Unix-Socket-Verbindung her und gibt ein Paar von
(reader, writer)zurück.Ähnlich wie
open_connection(), operiert aber auf Unix-Sockets.Siehe auch die Dokumentation von
loop.create_unix_connection().Hinweis
Das Argument *sock* überträgt den Besitz des Sockets an den erstellten
StreamWriter. Um den Socket zu schließen, rufen Sie seineclose()-Methode auf.Verfügbarkeit: Unix.
Geändert in Version 3.7: Parameter *ssl_handshake_timeout* hinzugefügt. Der Parameter *path* kann jetzt ein pfadähnliches Objekt sein.
Geändert in Version 3.10: Das Argument loop wurde entfernt.
Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.
- async asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)¶
Startet einen Unix-Socket-Server.
Ähnlich wie
start_server(), funktioniert aber mit Unix-Sockets.Wenn *cleanup_socket* true ist, wird der Unix-Socket automatisch aus dem Dateisystem entfernt, wenn der Server geschlossen wird, es sei denn, der Socket wurde nach der Erstellung des Servers ersetzt.
Siehe auch die Dokumentation von
loop.create_unix_server().Hinweis
Das Argument *sock* überträgt den Besitz des Sockets an den erstellten Server. Um den Socket zu schließen, rufen Sie die
close()-Methode des Servers auf.Verfügbarkeit: Unix.
Geändert in Version 3.7: Parameter *ssl_handshake_timeout* und *start_serving* hinzugefügt. Der Parameter *path* kann jetzt ein pfadähnliches Objekt sein.
Geändert in Version 3.10: Das Argument loop wurde entfernt.
Geändert in Version 3.11: Parameter *ssl_shutdown_timeout* hinzugefügt.
Geändert in Version 3.13: Parameter *cleanup_socket* hinzugefügt.
StreamReader¶
- class asyncio.StreamReader¶
Stellt ein Reader-Objekt dar, das APIs zum Lesen von Daten aus dem IO-Stream bereitstellt. Als asynchrones Iterable unterstützt das Objekt die
async for-Anweisung.Es wird nicht empfohlen, *StreamReader*-Objekte direkt zu instanziieren; verwenden Sie stattdessen
open_connection()undstart_server().- feed_eof()¶
Bestätigt das EOF (End of File).
- async read(n=-1)¶
Liest bis zu *n* Bytes aus dem Stream.
Wenn *n* nicht angegeben oder auf
-1gesetzt ist, liest bis zum EOF und gibt dann alle gelesenenbyteszurück. Wenn EOF empfangen wurde und der interne Puffer leer ist, wird ein leererbytes-Objekt zurückgegeben.Wenn *n*
0ist, wird ein leererbytes-Objekt sofort zurückgegeben.Wenn *n* positiv ist, werden maximal *n* verfügbare
byteszurückgegeben, sobald mindestens 1 Byte im internen Puffer verfügbar ist. Wenn EOF vor dem Lesen von Bytes empfangen wird, wird ein leererbytes-Objekt zurückgegeben.
- async readline()¶
Liest eine Zeile, wobei eine „Zeile“ eine Byte-Sequenz ist, die mit
\nendet.Wenn EOF empfangen wird und
\nnicht gefunden wurde, gibt die Methode teilweise gelesene Daten zurück.Wenn EOF empfangen wird und der interne Puffer leer ist, wird ein leerer
bytes-Objekt zurückgegeben.
- async readexactly(n)¶
Liest exakt *n* Bytes.
Löst eine
IncompleteReadErroraus, wenn EOF erreicht wird, bevor *n* gelesen werden kann. Verwenden Sie das AttributIncompleteReadError.partial, um die teilweise gelesenen Daten zu erhalten.
- async readuntil(separator=b'\n')¶
Liest Daten aus dem Stream, bis der *separator* gefunden wird.
Bei Erfolg werden die Daten und der Separator aus dem internen Puffer entfernt (konsumiert). Die zurückgegebenen Daten enthalten den Separator am Ende.
Wenn die Menge der gelesenen Daten das konfigurierte Stream-Limit überschreitet, wird eine
LimitOverrunErrorausgelöst, und die Daten verbleiben im internen Puffer und können erneut gelesen werden.Wenn EOF erreicht wird, bevor der vollständige Separator gefunden wurde, wird eine
IncompleteReadErrorausgelöst, und der interne Puffer wird zurückgesetzt. Das AttributIncompleteReadError.partialkann einen Teil des Separators enthalten.Der *separator* kann auch ein Tupel von Separatoren sein. In diesem Fall ist der Rückgabewert der kürzestmögliche, der einen beliebigen Separator als Suffix hat. Für die Zwecke von
LimitOverrunErrorwird der kürzeste mögliche Separator als derjenige betrachtet, der übereinstimmte.Hinzugefügt in Version 3.5.2.
Geändert in Version 3.13: Der Parameter *separator* kann jetzt ein
Tupelvon Separatoren sein.
- at_eof()¶
Gibt
Truezurück, wenn der Puffer leer ist undfeed_eof()aufgerufen wurde.
StreamWriter¶
- class asyncio.StreamWriter¶
Stellt ein Writer-Objekt dar, das APIs zum Schreiben von Daten in den IO-Stream bereitstellt.
Es wird nicht empfohlen, *StreamWriter*-Objekte direkt zu instanziieren; verwenden Sie stattdessen
open_connection()undstart_server().- write(data)¶
Die Methode versucht, die *Daten* sofort an den zugrunde liegenden Socket zu schreiben. Wenn dies fehlschlägt, werden die Daten in einem internen Schreibpuffer gepuffert, bis sie gesendet werden können.
Der *data*-Puffer sollte ein Objekt vom Typ bytes, bytearray oder eine C-kontinuierliche eindimensionale Memoryview sein.
Die Methode sollte zusammen mit der
drain()-Methode verwendet werden.stream.write(data) await stream.drain()
- writelines(data)¶
Die Methode schreibt eine Liste (oder ein beliebiges Iterable) von Bytes sofort an den zugrunde liegenden Socket. Wenn dies fehlschlägt, werden die Daten in einem internen Schreibpuffer gepuffert, bis sie gesendet werden können.
Die Methode sollte zusammen mit der
drain()-Methode verwendet werden.stream.writelines(lines) await stream.drain()
- close()¶
Die Methode schließt den Stream und den zugrunde liegenden Socket.
Die Methode sollte verwendet werden, obwohl nicht zwingend erforderlich, zusammen mit der
wait_closed()-Methode.stream.close() await stream.wait_closed()
- can_write_eof()¶
Gibt
Truezurück, wenn der zugrunde liegende Transport diewrite_eof()-Methode unterstützt, andernfallsFalse.
- write_eof()¶
Schließt das Schreibende des Streams, nachdem die gepufferten Schreibdaten geleert wurden.
- transport¶
Gibt den zugrunde liegenden asyncio-Transport zurück.
- get_extra_info(name, default=None)¶
Greift auf optionale Transportinformationen zu; siehe
BaseTransport.get_extra_info()für Details.
- async drain()¶
Wartet, bis es angemessen ist, mit dem Schreiben in den Stream fortzufahren. Beispiel
writer.write(data) await writer.drain()
Dies ist eine Flusskontrollmethode, die mit dem zugrunde liegenden IO-Schreibpuffer interagiert. Wenn die Größe des Puffers die Obergrenze erreicht, blockiert *drain()*, bis die Größe des Puffers die Untergrenze unterschreitet und das Schreiben wieder aufgenommen werden kann. Wenn nichts zu warten ist, gibt
drain()sofort zurück.
- async start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Upgrade einer bestehenden Stream-basierten Verbindung zu TLS.
Parameter
*sslcontext*: eine konfigurierte Instanz von
SSLContext.*server_hostname*: setzt oder überschreibt den Hostnamen, gegen den das Zertifikat des Zielservers abgeglichen wird.
*ssl_handshake_timeout* ist die Zeit in Sekunden, die auf den Abschluss des TLS-Handshakes gewartet wird, bevor die Verbindung abgebrochen wird.
60.0Sekunden, wennNone(Standard).*ssl_shutdown_timeout* ist die Zeit in Sekunden, die auf den Abschluss des SSL-Shutdowns gewartet wird, bevor die Verbindung abgebrochen wird.
30.0Sekunden, wennNone(Standard).
Hinzugefügt in Version 3.11.
Geändert in Version 3.12: Parameter *ssl_shutdown_timeout* hinzugefügt.
- is_closing()¶
Gibt
Truezurück, wenn der Stream geschlossen ist oder gerade geschlossen wird.Hinzugefügt in Version 3.7.
Beispiele¶
TCP-Echo-Client mit Streams¶
TCP-Echo-Client unter Verwendung der Funktion asyncio.open_connection()
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Siehe auch
Das Beispiel TCP-Echo-Client-Protokoll verwendet die Low-Level-Methode loop.create_connection().
TCP-Echo-Server mit Streams¶
TCP-Echo-Server unter Verwendung der Funktion asyncio.start_server()
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
Siehe auch
Das Beispiel TCP-Echo-Server-Protokoll verwendet die Methode loop.create_server().
HTTP-Header abrufen¶
Einfaches Beispiel zum Abfragen der HTTP-Header der über die Kommandozeile übergebenen URL
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Verwendung
python example.py http://example.com/path/page.html
oder mit HTTPS
python example.py https://example.com/path/page.html
Einen geöffneten Socket registrieren, um mit Streams auf Daten zu warten¶
Coroutine, die wartet, bis ein Socket Daten empfängt, unter Verwendung der Funktion open_connection()
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
Siehe auch
Das Beispiel einen geöffneten Socket registrieren, um mit einem Protokoll auf Daten zu warten verwendet die Low-Level-Methode loop.create_connection().
Das Beispiel einen Dateideskriptor auf Leseevents überwachen verwendet die Low-Level-Methode loop.add_reader(), um einen Dateideskriptor zu überwachen.