Luồng

Source code: Lib/asyncio/streams.py


Luồng là các nguyên gốc không đồng bộ/sẵn sàng chờ cấp cao để hoạt động với các kết nối mạng. Các luồng cho phép gửi và nhận dữ liệu mà không cần sử dụng lệnh gọi lại hoặc các giao thức và vận chuyển cấp thấp.

Dưới đây là ví dụ về ứng dụng khách TCP echo được viết bằng luồng asyncio

nhập asyncio

async def tcp_echo_client(tin nhắn):
    người đọc, người viết = đang chờ asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    đang chờ writer.drain()

    dữ liệu = đang chờ reader.read(100)
    print(f'Đã nhận: {data.decode()!r}')

    print('Đóng kết nối')
    nhà văn.close()
    đang chờ nhà văn.wait_closed()

asyncio.run(tcp_echo_client('Xin chào thế giới!'))

Xem thêm phần Examples bên dưới.

Chức năng truyền phát

Bạn có thể sử dụng các hàm asyncio cấp cao nhất sau đây để tạo và làm việc với luồng:

async asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Thiết lập kết nối mạng và trả về một cặp đối tượng (reader, writer).

Các đối tượng readerwriter được trả về là các thể hiện của các lớp StreamReaderStreamWriter.

limit xác định giới hạn kích thước bộ đệm được sử dụng bởi phiên bản StreamReader được trả về. Theo mặc định, limit được đặt thành 64 KiB.

Các đối số còn lại được chuyển trực tiếp tới loop.create_connection().

Ghi chú

Đối số sock chuyển quyền sở hữu socket sang StreamWriter đã tạo. Để đóng socket, hãy gọi phương thức close() của nó.

Thay đổi trong phiên bản 3.7: Đã thêm tham số ssl_handshake_timeout.

Thay đổi trong phiên bản 3.8: Đã thêm thông số happy_eyeballs_delayinterleave.

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.

async asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Khởi động một máy chủ socket.

Lệnh gọi lại client_connected_cb được gọi bất cứ khi nào kết nối máy khách mới được thiết lập. Nó nhận một cặp (reader, writer) làm hai đối số, các thể hiện của lớp StreamReaderStreamWriter.

client_connected_cb có thể là một lệnh gọi đơn giản hoặc coroutine function; nếu là hàm coroutine, nó sẽ tự động được lên lịch dưới dạng Task.

limit xác định giới hạn kích thước bộ đệm được sử dụng bởi phiên bản StreamReader được trả về. Theo mặc định, limit được đặt thành 64 KiB.

Các đối số còn lại được chuyển trực tiếp tới loop.create_server().

Ghi chú

Đối số sock chuyển quyền sở hữu socket sang máy chủ đã tạo. Để đóng socket, hãy gọi phương thức close() của máy chủ.

Thay đổi trong phiên bản 3.7: Đã thêm thông số ssl_handshake_timeoutstart_serving.

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.

Thay đổi trong phiên bản 3.13: Đã thêm tham số keep_alive.

Ổ cắm Unix

async asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Thiết lập kết nối ổ cắm Unix và trả về một cặp (reader, writer).

Tương tự như open_connection() nhưng hoạt động trên socket Unix.

Xem thêm tài liệu của loop.create_unix_connection().

Ghi chú

Đối số sock chuyển quyền sở hữu socket sang StreamWriter đã tạo. Để đóng socket, hãy gọi phương thức close() của nó.

sẵn có: Unix.

Thay đổi trong phiên bản 3.7: Đã thêm tham số ssl_handshake_timeout. Tham số path bây giờ có thể là path-like object

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.

async asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)

Khởi động một máy chủ socket Unix.

Tương tự như start_server() nhưng hoạt động với socket Unix.

Nếu cleanup_socket là đúng thì ổ cắm Unix sẽ tự động bị xóa khỏi hệ thống tập tin khi máy chủ đóng, trừ khi ổ cắm đó đã được thay thế sau khi máy chủ được tạo.

Xem thêm tài liệu của loop.create_unix_server().

Ghi chú

Đối số sock chuyển quyền sở hữu socket sang máy chủ đã tạo. Để đóng socket, hãy gọi phương thức close() của máy chủ.

sẵn có: Unix.

Thay đổi trong phiên bản 3.7: Đã thêm thông số ssl_handshake_timeoutstart_serving. Tham số path bây giờ có thể là path-like object.

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.

Thay đổi trong phiên bản 3.13: Đã thêm tham số cleanup_socket.

StreamReader

class asyncio.StreamReader

Đại diện cho một đối tượng reader cung cấp các API để đọc dữ liệu từ luồng IO. Là một asynchronous iterable, đối tượng hỗ trợ câu lệnh async for.

Không nên khởi tạo trực tiếp các đối tượng StreamReader; thay vào đó hãy sử dụng open_connection()start_server().

feed_eof()

Thừa nhận EOF.

async read(n=-1)

Đọc tới n byte từ luồng.

Nếu n không được cung cấp hoặc được đặt thành -1, hãy đọc cho đến EOF, sau đó trả về tất cả bytes đã đọc. Nếu EOF được nhận và bộ đệm bên trong trống, hãy trả về một đối tượng bytes trống.

Nếu n0, hãy trả về một đối tượng bytes trống ngay lập tức.

Nếu n là dương, hãy trả về tối đa n bytes có sẵn ngay khi có ít nhất 1 byte trong bộ đệm trong. Nếu nhận được EOF trước khi đọc bất kỳ byte nào, hãy trả về một đối tượng bytes trống.

async readline()

Đọc một dòng, trong đó "dòng" là một chuỗi byte kết thúc bằng \n.

Nếu nhận được EOF và không tìm thấy \n, phương thức sẽ trả về dữ liệu đã đọc một phần.

Nếu nhận được EOF và bộ đệm bên trong trống, hãy trả về một đối tượng bytes trống.

async readexactly(n)

Đọc chính xác byte n.

Tăng IncompleteReadError nếu đạt đến EOF trước khi có thể đọc được n. Sử dụng thuộc tính IncompleteReadError.partial để lấy dữ liệu đã đọc một phần.

async readuntil(separator=b'\n')

Đọc dữ liệu từ luồng cho đến khi tìm thấy separator.

Nếu thành công, dữ liệu và dấu phân cách sẽ bị xóa khỏi bộ đệm bên trong (đã tiêu thụ). Dữ liệu trả về sẽ bao gồm dấu phân cách ở cuối.

Nếu lượng dữ liệu được đọc vượt quá giới hạn luồng đã định cấu hình, ngoại lệ LimitOverrunError sẽ xuất hiện và dữ liệu sẽ được lưu lại trong bộ đệm bên trong và có thể được đọc lại.

Nếu đạt đến EOF trước khi tìm thấy dấu phân cách hoàn chỉnh, một ngoại lệ IncompleteReadError sẽ xuất hiện và bộ đệm bên trong được đặt lại. Thuộc tính IncompleteReadError.partial có thể chứa một phần dấu phân cách.

Zz001zz cũng có thể là một bộ dấu phân cách. Trong trường hợp này, giá trị trả về sẽ là giá trị ngắn nhất có thể có bất kỳ dấu phân cách nào làm hậu tố. Vì mục đích của LimitOverrunError, dấu phân cách ngắn nhất có thể được coi là dấu phân cách phù hợp.

Added in version 3.5.2.

Thay đổi trong phiên bản 3.13: Tham số separator bây giờ có thể là tuple của dấu phân cách.

at_eof()

Trả về True nếu bộ đệm trống và feed_eof() được gọi.

StreamWriter

class asyncio.StreamWriter

Đại diện cho một đối tượng nhà văn cung cấp API để ghi dữ liệu vào luồng IO.

Không nên khởi tạo trực tiếp các đối tượng StreamWriter; thay vào đó hãy sử dụng open_connection()start_server().

write(data)

Phương thức này cố gắng ghi data vào ổ cắm bên dưới ngay lập tức. Nếu không thành công, dữ liệu sẽ được xếp hàng đợi trong bộ đệm ghi bên trong cho đến khi có thể gửi đi.

Bộ đệm data phải là đối tượng byte, bytearray hoặc đối tượng xem bộ nhớ một chiều liền kề C.

Phương thức này nên được sử dụng cùng với phương thức drain()

luồng.write(dữ liệu)
đang chờ luồng.drain()
writelines(data)

Phương thức này ghi ngay một danh sách (hoặc bất kỳ byte nào có thể lặp lại) vào ổ cắm bên dưới. Nếu không thành công, dữ liệu sẽ được xếp hàng đợi trong bộ đệm ghi bên trong cho đến khi có thể gửi đi.

Phương thức này nên được sử dụng cùng với phương thức drain()

suối.writelines(dòng)
đang chờ luồng.drain()
close()

Phương thức này đóng luồng và ổ cắm bên dưới.

Phương pháp này nên được sử dụng, mặc dù không bắt buộc, cùng với phương pháp wait_closed():

luồng.close()
đang chờ luồng.wait_closed()
can_write_eof()

Trả về True nếu phương thức vận chuyển cơ bản hỗ trợ phương thức write_eof(), ngược lại là False.

write_eof()

Đóng phần cuối ghi của luồng sau khi xóa dữ liệu ghi vào bộ đệm.

transport

Trả lại vận chuyển asyncio cơ bản.

get_extra_info(name, default=None)

Truy cập thông tin vận chuyển tùy chọn; xem BaseTransport.get_extra_info() để biết chi tiết.

async drain()

Đợi cho đến khi thích hợp để tiếp tục ghi vào luồng. Ví dụ:

writer.write(dữ liệu)
đang chờ writer.drain()

Đây là phương pháp kiểm soát luồng tương tác với bộ đệm ghi IO bên dưới. Khi kích thước của bộ đệm đạt đến hình mờ cao, drain() sẽ chặn cho đến khi kích thước của bộ đệm giảm xuống hình mờ thấp và có thể tiếp tục ghi. Khi không có gì phải chờ đợi, drain() sẽ quay trở lại ngay lập tức.

async start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Nâng cấp kết nối dựa trên luồng hiện có lên TLS.

Thông số:

  • sslcontext: phiên bản được định cấu hình của SSLContext.

  • server_hostname: đặt hoặc ghi đè tên máy chủ mà chứng chỉ của máy chủ mục tiêu sẽ được so khớp.

  • ssl_handshake_timeout là thời gian tính bằng giây để chờ quá trình bắt tay TLS hoàn tất trước khi hủy kết nối. 60.0 giây nếu None (mặc định).

  • ssl_shutdown_timeout là thời gian tính bằng giây để chờ quá trình tắt SSL hoàn tất trước khi hủy kết nối. 30.0 giây nếu None (mặc định).

Added in version 3.11.

Thay đổi trong phiên bản 3.12: Đã thêm tham số ssl_shutdown_timeout.

is_closing()

Trả về True nếu luồng bị đóng hoặc đang trong quá trình đóng.

Added in version 3.7.

async wait_closed()

Đợi cho đến khi luồng được đóng lại.

Nên gọi sau close() để đợi cho đến khi kết nối cơ bản bị đóng, đảm bảo rằng tất cả dữ liệu đã bị xóa trước đó, ví dụ: thoát khỏi chương trình.

Added in version 3.7.

Ví dụ

TCP echo client sử dụng luồng

TCP echo client sử dụng chức năng asyncio.open_connection():

nhập asyncio

async def tcp_echo_client(tin nhắn):
    người đọc, người viết = đang chờ asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    đang chờ writer.drain()

    dữ liệu = đang chờ reader.read(100)
    print(f'Đã nhận: {data.decode()!r}')

    print('Đóng kết nối')
    nhà văn.close()
    đang chờ nhà văn.wait_closed()

asyncio.run(tcp_echo_client('Xin chào thế giới!'))

Xem thêm

Ví dụ TCP echo client protocol sử dụng phương pháp loop.create_connection() cấp thấp.

máy chủ echo TCP sử dụng luồng

Máy chủ echo TCP sử dụng chức năng asyncio.start_server()

nhập asyncio

async def hand_echo(người đọc, người viết):
    dữ liệu = đang chờ reader.read(100)
    tin nhắn = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Đã nhận được {tin nhắn!r} từ {addr!r}")

    print(f"Gửi: {message!r}")
    writer.write(dữ liệu)
    đang chờ writer.drain()

    print("Đóng kết nối")
    nhà văn.close()
    đang chờ nhà văn.wait_closed()

async def main():
    máy chủ = đang chờ asyncio.start_server(
        hand_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) cho sock trong server.sockets)
    print(f'Phục vụ trên {addrs}')

    không đồng bộ với máy chủ:
        đang chờ server.serve_forever()

asyncio.run(chính())

Xem thêm

Ví dụ TCP echo server protocol sử dụng phương thức loop.create_server().

Nhận tiêu đề HTTP

Ví dụ đơn giản truy vấn các tiêu đề HTTP của URL được truyền trên dòng lệnh

nhập asyncio
nhập urllib.parse
hệ thống nhập khẩu

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    nếu url.scheme == 'https':
        người đọc, người viết = đang chờ asyncio.open_connection(
            url.tên máy chủ, 443, ssl=True)
    khác:
        người đọc, người viết = đang chờ asyncio.open_connection(
            url.tên máy chủ, 80)

    truy vấn = (
        f"HEAD {url.path hoặc '/'} HTTP/1.0\r\n"
        f"Máy chủ: {url.tên máy chủ}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    trong khi Đúng:
        dòng = đang chờ reader.readline()
        nếu không phải dòng:
            phá vỡ

        line = line.decode('latin1').rstrip()
        nếu dòng:
            print(tiêu đề f'HTTP> {line}')

    # Ignore thân máy, đóng ổ cắm lại
    nhà văn.close()
    đang chờ nhà văn.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Cách sử dụng:

 dụ về python.py http://example.com/path/page.html

hoặc với HTTPS:

 dụ về python.py https://example.com/path/page.html

Đăng ký một ổ cắm mở để chờ dữ liệu bằng luồng

Coroutine chờ cho đến khi socket nhận được dữ liệu bằng hàm open_connection()

nhập asyncio
 cắm nhập khẩu

async def wait_for_data():
    # Get tham chiếu đến vòng lặp sự kiện hiện tại vì
    # we muốn truy cập API cấp thấp.
    vòng lặp = asyncio.get_running_loop()

    # Create một cặp ổ cắm được kết nối.
    rsock, wsock = socket.socketpair()

    # Register ổ cắm mở để chờ dữ liệu.
    người đọc, người viết = đang chờ asyncio.open_connection(sock=rsock)

    # Simulate việc tiếp nhận dữ liệu từ mạng
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait cho dữ liệu
    dữ liệu = đang chờ reader.read(100)

    Dữ liệu # Got, chúng ta đã hoàn tất: đóng ổ cắm
    print("Đã nhận:", data.decode())
    nhà văn.close()
    đang chờ nhà văn.wait_closed()

    # Close ổ cắm thứ hai
    wsock.close()

asyncio.run(wait_for_data())

Xem thêm

Ví dụ register an open socket to wait for data using a protocol sử dụng giao thức cấp thấp và phương thức loop.create_connection().

Ví dụ watch a file descriptor for read events sử dụng phương pháp loop.add_reader() cấp thấp để xem bộ mô tả tệp.