Luồng¶
Source code: Lib/asyncio/streams.py
Luồng là các nguyên gốc không đồng bộ/sẵn sàng chờ cấp cao để hoạt động với các kết nối mạng. Các luồng cho phép gửi và nhận dữ liệu mà không cần sử dụng lệnh gọi lại hoặc các giao thức và vận chuyển cấp thấp.
Dưới đây là ví dụ về ứng dụng khách TCP echo được viết bằng luồng asyncio
nhập asyncio
async def tcp_echo_client(tin nhắn):
người đọc, người viết = đang chờ asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
đang chờ writer.drain()
dữ liệu = đang chờ reader.read(100)
print(f'Đã nhận: {data.decode()!r}')
print('Đóng kết nối')
nhà văn.close()
đang chờ nhà văn.wait_closed()
asyncio.run(tcp_echo_client('Xin chào thế giới!'))
Xem thêm phần Examples bên dưới.
Chức năng truyền phát
Bạn có thể sử dụng các hàm asyncio cấp cao nhất sau đây để tạo và làm việc với luồng:
- async asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
Thiết lập kết nối mạng và trả về một cặp đối tượng
(reader, writer).Các đối tượng reader và writer được trả về là các thể hiện của các lớp
StreamReadervàStreamWriter.limit xác định giới hạn kích thước bộ đệm được sử dụng bởi phiên bản
StreamReaderđược trả về. Theo mặc định, limit được đặt thành 64 KiB.Các đối số còn lại được chuyển trực tiếp tới
loop.create_connection().Ghi chú
Đối số sock chuyển quyền sở hữu socket sang
StreamWriterđã tạo. Để đóng socket, hãy gọi phương thứcclose()của nó.Thay đổi trong phiên bản 3.7: Đã thêm tham số ssl_handshake_timeout.
Thay đổi trong phiên bản 3.8: Đã thêm thông số happy_eyeballs_delay và interleave.
Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.
Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.
- async asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Khởi động một máy chủ socket.
Lệnh gọi lại client_connected_cb được gọi bất cứ khi nào kết nối máy khách mới được thiết lập. Nó nhận một cặp
(reader, writer)làm hai đối số, các thể hiện của lớpStreamReadervàStreamWriter.client_connected_cb có thể là một lệnh gọi đơn giản hoặc coroutine function; nếu là hàm coroutine, nó sẽ tự động được lên lịch dưới dạng
Task.limit xác định giới hạn kích thước bộ đệm được sử dụng bởi phiên bản
StreamReaderđược trả về. Theo mặc định, limit được đặt thành 64 KiB.Các đối số còn lại được chuyển trực tiếp tới
loop.create_server().Ghi chú
Đối số sock chuyển quyền sở hữu socket sang máy chủ đã tạo. Để đóng socket, hãy gọi phương thức
close()của máy chủ.Thay đổi trong phiên bản 3.7: Đã thêm thông số ssl_handshake_timeout và start_serving.
Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.
Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.
Thay đổi trong phiên bản 3.13: Đã thêm tham số keep_alive.
Ổ cắm Unix
- async asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Thiết lập kết nối ổ cắm Unix và trả về một cặp
(reader, writer).Tương tự như
open_connection()nhưng hoạt động trên socket Unix.Xem thêm tài liệu của
loop.create_unix_connection().Ghi chú
Đối số sock chuyển quyền sở hữu socket sang
StreamWriterđã tạo. Để đóng socket, hãy gọi phương thứcclose()của nó.sẵn có: Unix.
Thay đổi trong phiên bản 3.7: Đã thêm tham số ssl_handshake_timeout. Tham số path bây giờ có thể là path-like object
Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.
Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.
- async asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)¶
Khởi động một máy chủ socket Unix.
Tương tự như
start_server()nhưng hoạt động với socket Unix.Nếu cleanup_socket là đúng thì ổ cắm Unix sẽ tự động bị xóa khỏi hệ thống tập tin khi máy chủ đóng, trừ khi ổ cắm đó đã được thay thế sau khi máy chủ được tạo.
Xem thêm tài liệu của
loop.create_unix_server().Ghi chú
Đối số sock chuyển quyền sở hữu socket sang máy chủ đã tạo. Để đóng socket, hãy gọi phương thức
close()của máy chủ.sẵn có: Unix.
Thay đổi trong phiên bản 3.7: Đã thêm thông số ssl_handshake_timeout và start_serving. Tham số path bây giờ có thể là path-like object.
Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.
Thay đổi trong phiên bản 3.11: Đã thêm tham số ssl_shutdown_timeout.
Thay đổi trong phiên bản 3.13: Đã thêm tham số cleanup_socket.
StreamReader¶
- class asyncio.StreamReader¶
Đại diện cho một đối tượng reader cung cấp các API để đọc dữ liệu từ luồng IO. Là một asynchronous iterable, đối tượng hỗ trợ câu lệnh
async for.Không nên khởi tạo trực tiếp các đối tượng StreamReader; thay vào đó hãy sử dụng
open_connection()vàstart_server().- feed_eof()¶
Thừa nhận EOF.
- async read(n=-1)¶
Đọc tới n byte từ luồng.
Nếu n không được cung cấp hoặc được đặt thành
-1, hãy đọc cho đến EOF, sau đó trả về tất cảbytesđã đọc. Nếu EOF được nhận và bộ đệm bên trong trống, hãy trả về một đối tượngbytestrống.Nếu n là
0, hãy trả về một đối tượngbytestrống ngay lập tức.Nếu n là dương, hãy trả về tối đa n
bytescó sẵn ngay khi có ít nhất 1 byte trong bộ đệm trong. Nếu nhận được EOF trước khi đọc bất kỳ byte nào, hãy trả về một đối tượngbytestrống.
- async readline()¶
Đọc một dòng, trong đó "dòng" là một chuỗi byte kết thúc bằng
\n.Nếu nhận được EOF và không tìm thấy
\n, phương thức sẽ trả về dữ liệu đã đọc một phần.Nếu nhận được EOF và bộ đệm bên trong trống, hãy trả về một đối tượng
bytestrống.
- async readexactly(n)¶
Đọc chính xác byte n.
Tăng
IncompleteReadErrornếu đạt đến EOF trước khi có thể đọc được n. Sử dụng thuộc tínhIncompleteReadError.partialđể lấy dữ liệu đã đọc một phần.
- async readuntil(separator=b'\n')¶
Đọc dữ liệu từ luồng cho đến khi tìm thấy separator.
Nếu thành công, dữ liệu và dấu phân cách sẽ bị xóa khỏi bộ đệm bên trong (đã tiêu thụ). Dữ liệu trả về sẽ bao gồm dấu phân cách ở cuối.
Nếu lượng dữ liệu được đọc vượt quá giới hạn luồng đã định cấu hình, ngoại lệ
LimitOverrunErrorsẽ xuất hiện và dữ liệu sẽ được lưu lại trong bộ đệm bên trong và có thể được đọc lại.Nếu đạt đến EOF trước khi tìm thấy dấu phân cách hoàn chỉnh, một ngoại lệ
IncompleteReadErrorsẽ xuất hiện và bộ đệm bên trong được đặt lại. Thuộc tínhIncompleteReadError.partialcó thể chứa một phần dấu phân cách.Zz001zz cũng có thể là một bộ dấu phân cách. Trong trường hợp này, giá trị trả về sẽ là giá trị ngắn nhất có thể có bất kỳ dấu phân cách nào làm hậu tố. Vì mục đích của
LimitOverrunError, dấu phân cách ngắn nhất có thể được coi là dấu phân cách phù hợp.Added in version 3.5.2.
Thay đổi trong phiên bản 3.13: Tham số separator bây giờ có thể là
tuplecủa dấu phân cách.
- at_eof()¶
Trả về
Truenếu bộ đệm trống vàfeed_eof()được gọi.
StreamWriter¶
- class asyncio.StreamWriter¶
Đại diện cho một đối tượng nhà văn cung cấp API để ghi dữ liệu vào luồng IO.
Không nên khởi tạo trực tiếp các đối tượng StreamWriter; thay vào đó hãy sử dụng
open_connection()vàstart_server().- write(data)¶
Phương thức này cố gắng ghi data vào ổ cắm bên dưới ngay lập tức. Nếu không thành công, dữ liệu sẽ được xếp hàng đợi trong bộ đệm ghi bên trong cho đến khi có thể gửi đi.
Bộ đệm data phải là đối tượng byte, bytearray hoặc đối tượng xem bộ nhớ một chiều liền kề C.
Phương thức này nên được sử dụng cùng với phương thức
drain()luồng.write(dữ liệu) đang chờ luồng.drain()
- writelines(data)¶
Phương thức này ghi ngay một danh sách (hoặc bất kỳ byte nào có thể lặp lại) vào ổ cắm bên dưới. Nếu không thành công, dữ liệu sẽ được xếp hàng đợi trong bộ đệm ghi bên trong cho đến khi có thể gửi đi.
Phương thức này nên được sử dụng cùng với phương thức
drain()suối.writelines(dòng) đang chờ luồng.drain()
- close()¶
Phương thức này đóng luồng và ổ cắm bên dưới.
Phương pháp này nên được sử dụng, mặc dù không bắt buộc, cùng với phương pháp
wait_closed():luồng.close() đang chờ luồng.wait_closed()
- can_write_eof()¶
Trả về
Truenếu phương thức vận chuyển cơ bản hỗ trợ phương thứcwrite_eof(), ngược lại làFalse.
- write_eof()¶
Đóng phần cuối ghi của luồng sau khi xóa dữ liệu ghi vào bộ đệm.
- transport¶
Trả lại vận chuyển asyncio cơ bản.
- get_extra_info(name, default=None)¶
Truy cập thông tin vận chuyển tùy chọn; xem
BaseTransport.get_extra_info()để biết chi tiết.
- async drain()¶
Đợi cho đến khi thích hợp để tiếp tục ghi vào luồng. Ví dụ:
writer.write(dữ liệu) đang chờ writer.drain()
Đây là phương pháp kiểm soát luồng tương tác với bộ đệm ghi IO bên dưới. Khi kích thước của bộ đệm đạt đến hình mờ cao, drain() sẽ chặn cho đến khi kích thước của bộ đệm giảm xuống hình mờ thấp và có thể tiếp tục ghi. Khi không có gì phải chờ đợi,
drain()sẽ quay trở lại ngay lập tức.
- async start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Nâng cấp kết nối dựa trên luồng hiện có lên TLS.
Thông số:
sslcontext: phiên bản được định cấu hình của
SSLContext.server_hostname: đặt hoặc ghi đè tên máy chủ mà chứng chỉ của máy chủ mục tiêu sẽ được so khớp.
ssl_handshake_timeout là thời gian tính bằng giây để chờ quá trình bắt tay TLS hoàn tất trước khi hủy kết nối.
60.0giây nếuNone(mặc định).ssl_shutdown_timeout là thời gian tính bằng giây để chờ quá trình tắt SSL hoàn tất trước khi hủy kết nối.
30.0giây nếuNone(mặc định).
Added in version 3.11.
Thay đổi trong phiên bản 3.12: Đã thêm tham số ssl_shutdown_timeout.
- is_closing()¶
Trả về
Truenếu luồng bị đóng hoặc đang trong quá trình đóng.Added in version 3.7.
Ví dụ¶
TCP echo client sử dụng luồng¶
TCP echo client sử dụng chức năng asyncio.open_connection():
nhập asyncio
async def tcp_echo_client(tin nhắn):
người đọc, người viết = đang chờ asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
đang chờ writer.drain()
dữ liệu = đang chờ reader.read(100)
print(f'Đã nhận: {data.decode()!r}')
print('Đóng kết nối')
nhà văn.close()
đang chờ nhà văn.wait_closed()
asyncio.run(tcp_echo_client('Xin chào thế giới!'))
Xem thêm
Ví dụ TCP echo client protocol sử dụng phương pháp loop.create_connection() cấp thấp.
máy chủ echo TCP sử dụng luồng¶
Máy chủ echo TCP sử dụng chức năng asyncio.start_server()
nhập asyncio
async def hand_echo(người đọc, người viết):
dữ liệu = đang chờ reader.read(100)
tin nhắn = data.decode()
addr = writer.get_extra_info('peername')
print(f"Đã nhận được {tin nhắn!r} từ {addr!r}")
print(f"Gửi: {message!r}")
writer.write(dữ liệu)
đang chờ writer.drain()
print("Đóng kết nối")
nhà văn.close()
đang chờ nhà văn.wait_closed()
async def main():
máy chủ = đang chờ asyncio.start_server(
hand_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) cho sock trong server.sockets)
print(f'Phục vụ trên {addrs}')
không đồng bộ với máy chủ:
đang chờ server.serve_forever()
asyncio.run(chính())
Xem thêm
Ví dụ TCP echo server protocol sử dụng phương thức loop.create_server().
Nhận tiêu đề HTTP¶
Ví dụ đơn giản truy vấn các tiêu đề HTTP của URL được truyền trên dòng lệnh
nhập asyncio
nhập urllib.parse
hệ thống nhập khẩu
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
nếu url.scheme == 'https':
người đọc, người viết = đang chờ asyncio.open_connection(
url.tên máy chủ, 443, ssl=True)
khác:
người đọc, người viết = đang chờ asyncio.open_connection(
url.tên máy chủ, 80)
truy vấn = (
f"HEAD {url.path hoặc '/'} HTTP/1.0\r\n"
f"Máy chủ: {url.tên máy chủ}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
trong khi Đúng:
dòng = đang chờ reader.readline()
nếu không phải dòng:
phá vỡ
line = line.decode('latin1').rstrip()
nếu dòng:
print(tiêu đề f'HTTP> {line}')
# Ignore thân máy, đóng ổ cắm lại
nhà văn.close()
đang chờ nhà văn.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Cách sử dụng:
ví dụ về python.py http://example.com/path/page.html
hoặc với HTTPS:
ví dụ về python.py https://example.com/path/page.html
Đăng ký một ổ cắm mở để chờ dữ liệu bằng luồng¶
Coroutine chờ cho đến khi socket nhận được dữ liệu bằng hàm open_connection()
nhập asyncio
ổ cắm nhập khẩu
async def wait_for_data():
# Get tham chiếu đến vòng lặp sự kiện hiện tại vì
# we muốn truy cập API cấp thấp.
vòng lặp = asyncio.get_running_loop()
# Create một cặp ổ cắm được kết nối.
rsock, wsock = socket.socketpair()
# Register ổ cắm mở để chờ dữ liệu.
người đọc, người viết = đang chờ asyncio.open_connection(sock=rsock)
# Simulate việc tiếp nhận dữ liệu từ mạng
loop.call_soon(wsock.send, 'abc'.encode())
# Wait cho dữ liệu
dữ liệu = đang chờ reader.read(100)
Dữ liệu # Got, chúng ta đã hoàn tất: đóng ổ cắm
print("Đã nhận:", data.decode())
nhà văn.close()
đang chờ nhà văn.wait_closed()
# Close ổ cắm thứ hai
wsock.close()
asyncio.run(wait_for_data())
Xem thêm
Ví dụ register an open socket to wait for data using a protocol sử dụng giao thức cấp thấp và phương thức loop.create_connection().
Ví dụ watch a file descriptor for read events sử dụng phương pháp loop.add_reader() cấp thấp để xem bộ mô tả tệp.