Vận chuyển và giao thức¶
Lời nói đầu
Phương thức vận chuyển và Giao thức được sử dụng bởi các API vòng lặp sự kiện low-level như loop.create_connection(). Họ sử dụng phong cách lập trình dựa trên cuộc gọi lại và cho phép triển khai mạng hoặc giao thức IPC hiệu suất cao (ví dụ: HTTP).
Về cơ bản, việc vận chuyển và giao thức chỉ nên được sử dụng trong các thư viện và khung công tác chứ không bao giờ được sử dụng trong các ứng dụng asyncio cấp cao.
Trang tài liệu này bao gồm cả Transports và Protocols.
Giới thiệu
Ở mức cao nhất, quá trình vận chuyển liên quan đến các byte how được truyền đi, trong khi giao thức xác định các byte which sẽ truyền (và ở một mức độ nào đó khi nào).
Một cách khác để nói điều tương tự: vận chuyển là một sự trừu tượng hóa cho một ổ cắm (hoặc điểm cuối I/O tương tự) trong khi giao thức là một sự trừu tượng hóa cho một ứng dụng, theo quan điểm của phương tiện vận chuyển.
Tuy nhiên, một cách nhìn khác là các giao diện truyền tải và giao thức cùng nhau xác định một giao diện trừu tượng để sử dụng I/O mạng và I/O liên tiến trình.
Luôn có mối quan hệ 1:1 giữa các đối tượng vận chuyển và giao thức: giao thức gọi các phương thức vận chuyển để gửi dữ liệu, trong khi phương thức vận chuyển gọi các phương thức giao thức để truyền dữ liệu đã nhận được.
Hầu hết các phương thức vòng lặp sự kiện hướng kết nối (chẳng hạn như loop.create_connection()) thường chấp nhận đối số protocol_factory được sử dụng để tạo đối tượng Protocol cho kết nối được chấp nhận, được biểu thị bằng đối tượng Transport. Các phương thức như vậy thường trả về một bộ (transport, protocol).
Nội dung
Trang tài liệu này chứa các phần sau:
Phần Transports ghi lại các lớp asyncio
BaseTransport,ReadTransport,WriteTransport,Transport,DatagramTransportvàSubprocessTransport.Phần Protocols ghi lại các lớp asyncio
BaseProtocol,Protocol,BufferedProtocol,DatagramProtocolvàSubprocessProtocol.Phần Examples giới thiệu cách làm việc với các phương tiện truyền tải, giao thức và API vòng lặp sự kiện cấp thấp.
Vận chuyển¶
Source code: Lib/asyncio/transports.py
Vận chuyển là các lớp do asyncio cung cấp nhằm trừu tượng hóa các loại kênh liên lạc khác nhau.
Các đối tượng vận chuyển luôn được khởi tạo bởi asyncio event loop.
asyncio triển khai vận chuyển cho các ống TCP, UDP, SSL và subprocess. Các phương pháp có sẵn trên một phương tiện vận tải phụ thuộc vào loại phương tiện vận chuyển.
Các lớp vận chuyển là not thread safe.
Hệ thống phân cấp vận chuyển¶
- class asyncio.BaseTransport¶
Lớp cơ sở cho tất cả các phương tiện vận chuyển. Chứa các phương thức mà tất cả các phương thức vận chuyển asyncio đều chia sẻ.
- class asyncio.WriteTransport(BaseTransport)¶
Một phương thức vận chuyển cơ sở cho các kết nối chỉ ghi.
Các phiên bản của lớp WriteTransport được trả về từ phương thức vòng lặp sự kiện
loop.connect_write_pipe()và cũng được sử dụng bởi các phương thức liên quan đến quy trình con nhưloop.subprocess_exec().
- class asyncio.ReadTransport(BaseTransport)¶
Một phương tiện vận chuyển cơ sở cho các kết nối chỉ đọc.
Các phiên bản của lớp ReadTransport được trả về từ phương thức vòng lặp sự kiện
loop.connect_read_pipe()và cũng được sử dụng bởi các phương thức liên quan đến quy trình con nhưloop.subprocess_exec().
- class asyncio.Transport(WriteTransport, ReadTransport)¶
Giao diện đại diện cho việc vận chuyển hai chiều, chẳng hạn như kết nối TCP.
Người dùng không trực tiếp khởi tạo việc vận chuyển; họ gọi một hàm tiện ích, truyền cho nó một nhà máy giao thức và các thông tin khác cần thiết để tạo ra phương thức vận chuyển và giao thức.
Các thể hiện của lớp Transport được trả về hoặc sử dụng bởi các phương thức vòng lặp sự kiện như
loop.create_connection(),loop.create_unix_connection(),loop.create_server(),loop.sendfile(), v.v.
- class asyncio.DatagramTransport(BaseTransport)¶
Một phương thức vận chuyển cho các kết nối datagram (UDP).
Các phiên bản của lớp DatagramTransport được trả về từ phương thức vòng lặp sự kiện
loop.create_datagram_endpoint().
- class asyncio.SubprocessTransport(BaseTransport)¶
Một sự trừu tượng hóa để thể hiện kết nối giữa tiến trình hệ điều hành gốc và hệ điều hành con của nó.
Các thể hiện của lớp SubprocessTransport được trả về từ các phương thức vòng lặp sự kiện
loop.subprocess_shell()vàloop.subprocess_exec().
Vận chuyển cơ sở¶
- BaseTransport.close()¶
Đóng phương tiện vận chuyển.
Nếu phương tiện vận chuyển có bộ đệm cho dữ liệu gửi đi, dữ liệu được lưu trong bộ đệm sẽ bị xóa không đồng bộ. Sẽ không có thêm dữ liệu nào được nhận. Sau khi tất cả dữ liệu được lưu vào bộ đệm được xóa, phương thức
protocol.connection_lost()của giao thức sẽ được gọi vớiNonelàm đối số. Việc vận chuyển không nên được sử dụng khi nó đã đóng cửa.
- BaseTransport.is_closing()¶
Trả về
Truenếu quá trình vận chuyển đang đóng hoặc đã đóng.
- BaseTransport.get_extra_info(name, default=None)¶
Trả về thông tin về việc vận chuyển hoặc các tài nguyên cơ bản mà nó sử dụng.
name là một chuỗi biểu thị phần thông tin dành riêng cho phương tiện giao thông cần lấy.
default là giá trị trả về nếu thông tin không có sẵn hoặc nếu phương tiện truyền tải không hỗ trợ truy vấn thông tin đó bằng cách triển khai vòng lặp sự kiện của bên thứ ba nhất định hoặc trên nền tảng hiện tại.
Ví dụ: đoạn mã sau cố gắng lấy đối tượng socket cơ bản của phương tiện truyền tải:
sock = Transport.get_extra_info('socket') nếu tất không phải là Không có: print(sock.getsockopt(...))
Các loại thông tin có thể được truy vấn trên một số phương tiện vận chuyển:
ổ cắm:
'peername': địa chỉ từ xa mà ổ cắm được kết nối, kết quả củasocket.socket.getpeername()(Nonebị lỗi)'socket': phiên bảnsocket.socket'sockname': địa chỉ riêng của socket, kết quả củasocket.socket.getsockname()
ổ cắm SSL:
'compression': thuật toán nén đang được sử dụng dưới dạng chuỗi hoặcNonenếu kết nối không được nén; kết quả củassl.SSLSocket.compression()'cipher': một bộ ba giá trị chứa tên của mật mã đang được sử dụng, phiên bản của giao thức SSL xác định cách sử dụng nó và số lượng bit bí mật đang được sử dụng; kết quả củassl.SSLSocket.cipher()'peercert': chứng chỉ ngang hàng; kết quả củassl.SSLSocket.getpeercert()'sslcontext': phiên bảnssl.SSLContext'ssl_object': phiên bảnssl.SSLObjecthoặcssl.SSLSocket
ống:
'pipe': đối tượng ống
quy trình con:
'subprocess': phiên bảnsubprocess.Popen
- BaseTransport.set_protocol(protocol)¶
Đặt một giao thức mới.
Giao thức chuyển đổi chỉ nên được thực hiện khi cả hai giao thức đều được ghi lại để hỗ trợ chuyển đổi.
- BaseTransport.get_protocol()¶
Trả về giao thức hiện tại.
Vận chuyển chỉ đọc¶
- ReadTransport.is_reading()¶
Trả về
Truenếu phương tiện đang nhận dữ liệu mới.Added in version 3.7.
- ReadTransport.pause_reading()¶
Tạm dừng kết thúc nhận của việc vận chuyển. Sẽ không có dữ liệu nào được chuyển đến phương thức
protocol.data_received()của giao thức cho đến khiresume_reading()được gọi.Thay đổi trong phiên bản 3.7: Phương thức này là bình thường, tức là nó có thể được gọi khi quá trình vận chuyển đã bị tạm dừng hoặc đóng.
- ReadTransport.resume_reading()¶
Tiếp tục kết thúc nhận. Phương thức
protocol.data_received()của giao thức sẽ được gọi lại một lần nữa nếu có sẵn một số dữ liệu để đọc.Thay đổi trong phiên bản 3.7: Phương thức này là bình thường, tức là nó có thể được gọi khi phương tiện vận chuyển đã đọc.
Vận chuyển chỉ ghi¶
- WriteTransport.abort()¶
Đóng quá trình vận chuyển ngay lập tức mà không cần đợi các hoạt động đang chờ xử lý hoàn tất. Dữ liệu được đệm sẽ bị mất. Sẽ không có thêm dữ liệu nào được nhận. Phương thức
protocol.connection_lost()của giao thức cuối cùng sẽ được gọi vớiNonelàm đối số.
- WriteTransport.can_write_eof()¶
Trả về
Truenếu phương tiện vận chuyển hỗ trợwrite_eof(),Falsenếu không.
- WriteTransport.get_write_buffer_size()¶
Trả về kích thước hiện tại của bộ đệm đầu ra được phương tiện vận chuyển sử dụng.
- WriteTransport.get_write_buffer_limits()¶
Nhận hình mờ high và low để kiểm soát luồng ghi. Trả về một bộ
(low, high)trong đó low và high là số byte dương.Sử dụng
set_write_buffer_limits()để đặt giới hạn.Added in version 3.4.2.
- WriteTransport.set_write_buffer_limits(high=None, low=None)¶
Đặt hình mờ high và low để kiểm soát luồng ghi.
Hai giá trị này (được đo bằng số byte) kiểm soát khi các phương thức
protocol.pause_writing()vàprotocol.resume_writing()của giao thức được gọi. Nếu được chỉ định, hình mờ thấp phải nhỏ hơn hoặc bằng hình mờ cao. Cả high và low đều không thể âm.pause_writing()được gọi khi kích thước bộ đệm lớn hơn hoặc bằng giá trị high. Nếu việc ghi đã bị tạm dừng,resume_writing()được gọi khi kích thước bộ đệm nhỏ hơn hoặc bằng giá trị low.Các giá trị mặc định là dành riêng cho việc triển khai. Nếu chỉ có hình mờ cao được cung cấp thì hình mờ thấp sẽ mặc định có giá trị dành riêng cho việc triển khai nhỏ hơn hoặc bằng hình mờ cao. Việc đặt high về 0 cũng sẽ buộc low về 0 và khiến
pause_writing()được gọi bất cứ khi nào bộ đệm không trống. Đặt low về 0 khiếnresume_writing()chỉ được gọi khi bộ đệm trống. Việc sử dụng số 0 cho một trong hai giới hạn thường là dưới mức tối ưu vì nó làm giảm cơ hội thực hiện I/O và tính toán đồng thời.Sử dụng
get_write_buffer_limits()để nhận giới hạn.
- WriteTransport.write(data)¶
Viết một số byte data vào phương tiện vận chuyển.
Phương pháp này không chặn; nó đệm dữ liệu và sắp xếp để dữ liệu được gửi đi một cách không đồng bộ.
- WriteTransport.writelines(list_of_data)¶
Viết một danh sách (hoặc bất kỳ lần lặp nào) các byte dữ liệu vào phương tiện vận chuyển. Điều này có chức năng tương đương với việc gọi
write()trên mỗi phần tử do iterable mang lại, nhưng có thể được triển khai hiệu quả hơn.
- WriteTransport.write_eof()¶
Đóng phần cuối ghi của quá trình vận chuyển sau khi xóa tất cả dữ liệu được lưu vào bộ đệm. Dữ liệu vẫn có thể được nhận.
Phương pháp này có thể tăng
NotImplementedErrornếu phương tiện truyền tải (ví dụ: SSL) không hỗ trợ các kết nối nửa kín.
Vận chuyển gói dữ liệu¶
- DatagramTransport.sendto(data, addr=None)¶
Gửi byte data đến thiết bị ngang hàng từ xa được cung cấp bởi addr (địa chỉ mục tiêu phụ thuộc vào vận chuyển). Nếu addr là
None, dữ liệu sẽ được gửi đến địa chỉ đích được cung cấp khi tạo phương tiện giao thông.Phương pháp này không chặn; nó đệm dữ liệu và sắp xếp để dữ liệu được gửi đi một cách không đồng bộ.
Thay đổi trong phiên bản 3.13: Phương thức này có thể được gọi với một đối tượng byte trống để gửi một datagram có độ dài bằng 0. Tính toán kích thước bộ đệm được sử dụng để kiểm soát luồng cũng được cập nhật để tính đến tiêu đề datagram.
- DatagramTransport.abort()¶
Đóng quá trình vận chuyển ngay lập tức mà không cần đợi các hoạt động đang chờ xử lý hoàn tất. Dữ liệu được đệm sẽ bị mất. Sẽ không có thêm dữ liệu nào được nhận. Phương thức
protocol.connection_lost()của giao thức cuối cùng sẽ được gọi vớiNonelàm đối số.
Vận chuyển quy trình con¶
- SubprocessTransport.get_pid()¶
Trả về id tiến trình con dưới dạng số nguyên.
- SubprocessTransport.get_pipe_transport(fd)¶
Trả về phương thức vận chuyển cho đường ống truyền thông tương ứng với bộ mô tả tệp số nguyên fd:
0: truyền phát trực tuyến có thể ghi của đầu vào tiêu chuẩn (stdin) hoặcNonenếu quy trình con không được tạo bằngstdin=PIPE1: truyền phát trực tuyến có thể đọc được của đầu ra tiêu chuẩn (stdout) hoặcNonenếu quy trình con không được tạo bằngstdout=PIPE2: truyền phát trực tuyến có thể đọc được lỗi tiêu chuẩn (stderr) hoặcNonenếu quy trình con không được tạo bằngstderr=PIPEfd khác:
None
- SubprocessTransport.get_returncode()¶
Trả về mã trả về của quy trình con dưới dạng số nguyên hoặc
Nonenếu nó chưa được trả về, tương tự như thuộc tínhsubprocess.Popen.returncode.
- SubprocessTransport.kill()¶
Giết quá trình con.
Trên hệ thống POSIX, hàm sẽ gửi SIGKILL đến quy trình con. Trên Windows, phương pháp này là bí danh cho
terminate().Xem thêm
subprocess.Popen.kill().
- SubprocessTransport.send_signal(signal)¶
Gửi số signal tới quy trình con, như trong
subprocess.Popen.send_signal().
- SubprocessTransport.terminate()¶
Dừng quá trình con.
Trên hệ thống POSIX, phương thức này gửi
SIGTERMđến quy trình con. Trên Windows, hàm Windows APITerminateProcess()được gọi để dừng tiến trình con.Xem thêm
subprocess.Popen.terminate().
Giao thức¶
Source code: Lib/asyncio/protocols.py
asyncio cung cấp một tập hợp các lớp cơ sở trừu tượng nên được sử dụng để triển khai các giao thức mạng. Những lớp đó được sử dụng cùng với transports.
Các lớp con của các lớp giao thức cơ sở trừu tượng có thể triển khai một số hoặc tất cả các phương thức. Tất cả các phương thức này đều là lệnh gọi lại: chúng được gọi bằng phương thức vận chuyển trong một số sự kiện nhất định, chẳng hạn như khi nhận được một số dữ liệu. Một phương thức giao thức cơ sở sẽ được gọi bằng phương thức vận chuyển tương ứng.
Giao thức cơ bản¶
- class asyncio.BaseProtocol¶
Giao thức cơ sở với các phương thức mà tất cả các giao thức đều chia sẻ.
- class asyncio.Protocol(BaseProtocol)¶
Lớp cơ sở để triển khai các giao thức phát trực tuyến (TCP, ổ cắm Unix, v.v.).
- class asyncio.BufferedProtocol(BaseProtocol)¶
Một lớp cơ sở để triển khai các giao thức truyền phát với điều khiển thủ công bộ đệm nhận.
- class asyncio.DatagramProtocol(BaseProtocol)¶
Lớp cơ sở để triển khai các giao thức datagram (UDP).
- class asyncio.SubprocessProtocol(BaseProtocol)¶
Lớp cơ sở để triển khai các giao thức giao tiếp với các tiến trình con (ống đơn hướng).
Giao thức cơ sở¶
Tất cả các giao thức asyncio đều có thể triển khai các lệnh gọi lại Giao thức cơ sở.
Cuộc gọi lại kết nối
Lệnh gọi lại kết nối được gọi trên tất cả các giao thức, chính xác một lần cho mỗi kết nối thành công. Tất cả các lệnh gọi lại giao thức khác chỉ có thể được gọi giữa hai phương thức đó.
- BaseProtocol.connection_made(transport)¶
Được gọi khi kết nối được thực hiện.
Đối số transport là phương tiện vận chuyển đại diện cho kết nối. Giao thức có trách nhiệm lưu trữ tham chiếu đến quá trình vận chuyển của nó.
- BaseProtocol.connection_lost(exc)¶
Được gọi khi kết nối bị mất hoặc đóng.
Đối số là một đối tượng ngoại lệ hoặc
None. Điều thứ hai có nghĩa là nhận được EOF thông thường hoặc kết nối bị hủy bỏ hoặc đóng bởi phía kết nối này.
Lệnh gọi lại kiểm soát luồng
Các cuộc gọi lại điều khiển luồng có thể được gọi bằng các phương tiện vận chuyển để tạm dừng hoặc tiếp tục ghi do giao thức thực hiện.
Xem tài liệu về phương pháp set_write_buffer_limits() để biết thêm chi tiết.
- BaseProtocol.pause_writing()¶
Được gọi khi bộ đệm của phương tiện vận chuyển vượt quá hình mờ cao.
- BaseProtocol.resume_writing()¶
Được gọi khi bộ đệm của phương tiện truyền tải thoát xuống dưới hình mờ thấp.
Nếu kích thước bộ đệm bằng hình mờ cao thì pause_writing() không được gọi: kích thước bộ đệm phải vượt quá giới hạn.
Ngược lại, resume_writing() được gọi khi kích thước bộ đệm bằng hoặc thấp hơn hình mờ thấp. Những điều kiện cuối này rất quan trọng để đảm bảo mọi thứ diễn ra như mong đợi khi một trong hai điểm bằng 0.
Giao thức truyền phát¶
Các phương thức sự kiện, chẳng hạn như loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() và loop.connect_write_pipe() chấp nhận các nhà máy trả về giao thức phát trực tuyến.
- Protocol.data_received(data)¶
Được gọi khi nhận được một số dữ liệu. data là một đối tượng byte không trống chứa dữ liệu đến.
Việc dữ liệu được đệm, phân đoạn hay tập hợp lại đều phụ thuộc vào việc vận chuyển. Nói chung, bạn không nên dựa vào ngữ nghĩa cụ thể mà thay vào đó hãy thực hiện phân tích cú pháp chung chung và linh hoạt. Tuy nhiên, dữ liệu luôn được nhận theo đúng thứ tự.
Phương thức này có thể được gọi với số lần tùy ý trong khi kết nối được mở.
Tuy nhiên,
protocol.eof_received()được gọi nhiều nhất một lần. Khieof_received()được gọi,data_received()sẽ không được gọi nữa.
- Protocol.eof_received()¶
Được gọi khi đầu bên kia báo hiệu nó sẽ không gửi thêm dữ liệu nào nữa (ví dụ bằng cách gọi
transport.write_eof(), nếu đầu bên kia cũng sử dụng asyncio).Phương thức này có thể trả về giá trị sai (bao gồm
None), trong trường hợp đó quá trình vận chuyển sẽ tự đóng. Ngược lại, nếu phương thức này trả về giá trị thực, giao thức được sử dụng sẽ xác định xem có đóng quá trình vận chuyển hay không. Vì việc triển khai mặc định trả vềNonenên nó ngầm đóng kết nối.Một số phương tiện vận chuyển, bao gồm SSL, không hỗ trợ các kết nối nửa kín, trong trường hợp trả về true từ phương thức này sẽ dẫn đến kết nối bị đóng.
Máy trạng thái:
bắt đầu -> kết nối_made
[-> data_received]*
[-> eof_received]?
-> kết nối_lost -> kết thúc
Giao thức truyền phát được đệm¶
Added in version 3.7.
Giao thức đệm có thể được sử dụng với bất kỳ phương thức vòng lặp sự kiện nào hỗ trợ Streaming Protocols.
Việc triển khai BufferedProtocol cho phép phân bổ và kiểm soát thủ công rõ ràng bộ đệm nhận. Sau đó, các vòng lặp sự kiện có thể sử dụng bộ đệm do giao thức cung cấp để tránh các bản sao dữ liệu không cần thiết. Điều này có thể dẫn đến cải thiện hiệu suất đáng chú ý cho các giao thức nhận được lượng dữ liệu lớn. Việc triển khai giao thức phức tạp có thể làm giảm đáng kể số lượng phân bổ bộ đệm.
Các lệnh gọi lại sau đây được gọi trên các phiên bản BufferedProtocol:
- BufferedProtocol.get_buffer(sizehint)¶
Được gọi để phân bổ bộ đệm nhận mới.
sizehint là kích thước tối thiểu được đề xuất cho bộ đệm được trả về. Có thể chấp nhận trả lại bộ đệm nhỏ hơn hoặc lớn hơn những gì sizehint gợi ý. Khi được đặt thành -1, kích thước bộ đệm có thể tùy ý. Đó là một lỗi khi trả về bộ đệm có kích thước bằng 0.
get_buffer()phải trả về một đối tượng đang triển khai buffer protocol.
- BufferedProtocol.buffer_updated(nbytes)¶
Được gọi khi bộ đệm được cập nhật với dữ liệu nhận được.
nbytes là tổng số byte được ghi vào bộ đệm.
- BufferedProtocol.eof_received()¶
Xem tài liệu về phương pháp
protocol.eof_received().
get_buffer() có thể được gọi với số lần tùy ý trong quá trình kết nối. Tuy nhiên, protocol.eof_received() chỉ được gọi một lần và nếu được gọi, get_buffer() và buffer_updated() sẽ không được gọi sau nó.
Máy trạng thái:
bắt đầu -> kết nối_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> kết nối_lost -> kết thúc
Giao thức gói dữ liệu¶
Các phiên bản Giao thức gói dữ liệu phải được xây dựng bởi các nhà máy giao thức được truyền cho phương thức loop.create_datagram_endpoint().
- DatagramProtocol.datagram_received(data, addr)¶
Được gọi khi nhận được datagram. data là đối tượng byte chứa dữ liệu đến. addr là địa chỉ của thiết bị ngang hàng gửi dữ liệu; định dạng chính xác phụ thuộc vào việc vận chuyển.
- DatagramProtocol.error_received(exc)¶
Được gọi khi thao tác gửi hoặc nhận trước đó tạo ra
OSError. exc là phiên bảnOSError.Phương thức này được gọi trong các điều kiện hiếm gặp, khi phương thức vận chuyển (ví dụ: UDP) phát hiện rằng một datagram không thể được gửi đến người nhận. Tuy nhiên, trong nhiều điều kiện, các datagram không thể gửi được sẽ bị loại bỏ một cách âm thầm.
Ghi chú
Trên các hệ thống BSD (macOS, FreeBSD, v.v.), điều khiển luồng không được hỗ trợ cho các giao thức datagram vì không có cách nào đáng tin cậy để phát hiện lỗi gửi do ghi quá nhiều gói.
Ổ cắm luôn xuất hiện ở trạng thái 'sẵn sàng' và các gói dư thừa sẽ bị loại bỏ. Một OSError với errno được đặt thành errno.ENOBUFS có thể được nâng lên hoặc không; nếu nó được nâng lên, nó sẽ được báo cáo tới DatagramProtocol.error_received() nhưng nếu không thì sẽ bị bỏ qua.
Giao thức quy trình con¶
Các phiên bản Giao thức quy trình con phải được xây dựng bởi các nhà máy giao thức được truyền cho các phương thức loop.subprocess_exec() và loop.subprocess_shell().
- SubprocessProtocol.pipe_data_received(fd, data)¶
Được gọi khi tiến trình con ghi dữ liệu vào ống stdout hoặc stderr của nó.
fd là bộ mô tả tệp số nguyên của đường ống.
data là một đối tượng byte không trống chứa dữ liệu đã nhận.
- SubprocessProtocol.pipe_connection_lost(fd, exc)¶
Được gọi khi một trong các đường ống giao tiếp với tiến trình con bị đóng.
fd là bộ mô tả tệp số nguyên đã bị đóng.
- SubprocessProtocol.process_exited()¶
Được gọi khi tiến trình con đã thoát.
Nó có thể được gọi trước các phương thức
pipe_data_received()vàpipe_connection_lost().
Ví dụ¶
Máy chủ Echo TCP¶
Tạo máy chủ echo TCP bằng phương thức loop.create_server(), gửi lại dữ liệu đã nhận và đóng kết nối
nhập asyncio
lớp EchoServerProtocol(asyncio.Protocol):
def Connection_made(tự, vận chuyển):
tên ngang hàng = Transport.get_extra_info('tên ngang hàng')
print('Kết nối từ {}'.format(peername))
self.transport = vận chuyển
def data_received(self, data):
tin nhắn = data.decode()
print('Dữ liệu đã nhận: {!r}'.format(message))
print('Gửi: {!r}'.format(tin nhắn))
self.transport.write(dữ liệu)
print('Đóng client socket')
self.transport.close()
async def main():
# Get tham chiếu đến vòng lặp sự kiện mà chúng tôi dự định sử dụng
API # low-level.
vòng lặp = asyncio.get_running_loop()
máy chủ = đang chờ loop.create_server(
Giao thức EchoServer,
'127.0.0.1', 8888)
không đồng bộ với máy chủ:
đang chờ server.serve_forever()
asyncio.run(chính())
Xem thêm
Ví dụ TCP echo server using streams sử dụng hàm asyncio.start_server() cấp cao.
TCP Echo khách hàng¶
Máy khách TCP echo sử dụng phương thức loop.create_connection(), gửi dữ liệu và đợi cho đến khi kết nối được đóng
nhập asyncio
lớp EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = tin nhắn
self.on_con_lost = on_con_lost
def Connection_made(tự, vận chuyển):
Transport.write(self.message.encode())
print('Dữ liệu đã gửi: {!r}'.format(self.message))
def data_received(self, data):
print('Dữ liệu đã nhận: {!r}'.format(data.decode()))
def Connection_lost(self, exc):
print('Máy chủ đã đóng kết nối')
self.on_con_lost.set_result(Đúng)
async def main():
# Get tham chiếu đến vòng lặp sự kiện mà chúng tôi dự định sử dụng
API # low-level.
vòng lặp = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Xin chào thế giới!'
vận chuyển, giao thức = đang chờ loop.create_connection(
lambda: EchoClientProtocol(tin nhắn, on_con_lost),
'127.0.0.1', 8888)
# Wait cho đến khi giao thức báo hiệu rằng kết nối
# is bị mất và đóng vận chuyển.
thử:
đang chờ on_con_lost
cuối cùng:
vận chuyển.close()
asyncio.run(chính())
Xem thêm
Ví dụ TCP echo client using streams sử dụng hàm asyncio.open_connection() cấp cao.
Máy chủ Echo UDP¶
Máy chủ echo UDP, sử dụng phương thức loop.create_datagram_endpoint(), gửi lại dữ liệu đã nhận:
nhập asyncio
lớp EchoServerGiao thức:
def Connection_made(tự, vận chuyển):
self.transport = vận chuyển
def datagram_received(self, data, addr):
tin nhắn = data.decode()
print('Đã nhận %r từ %s' % (tin nhắn, địa chỉ))
print('Gửi %r tới %s' % (tin nhắn, địa chỉ))
self.transport.sendto(dữ liệu, địa chỉ)
async def main():
print("Khởi động máy chủ UDP")
# Get tham chiếu đến vòng lặp sự kiện mà chúng tôi dự định sử dụng
API # low-level.
vòng lặp = asyncio.get_running_loop()
Phiên bản giao thức # One sẽ được tạo để phục vụ tất cả
yêu cầu # client.
vận chuyển, giao thức = chờ loop.create_datagram_endpoint(
Giao thức EchoServer,
local_addr=('127.0.0.1', 9999))
thử:
chờ asyncio.sleep(3600) # Serve trong 1 giờ.
cuối cùng:
vận chuyển.close()
asyncio.run(chính())
UDP Echo khách hàng¶
Máy khách UDP echo, sử dụng phương thức loop.create_datagram_endpoint(), gửi dữ liệu và đóng quá trình vận chuyển khi nhận được câu trả lời:
nhập asyncio
lớp EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = tin nhắn
self.on_con_lost = on_con_lost
self.transport = Không
def Connection_made(tự, vận chuyển):
self.transport = vận chuyển
print('Gửi:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Đã nhận:", data.decode())
print("Đóng ổ cắm")
self.transport.close()
def error_received(self, exc):
print('Đã nhận được lỗi:', ex)
def Connection_lost(self, exc):
print("Đã đóng kết nối")
self.on_con_lost.set_result(Đúng)
async def main():
# Get tham chiếu đến vòng lặp sự kiện mà chúng tôi dự định sử dụng
API # low-level.
vòng lặp = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Xin chào thế giới!"
vận chuyển, giao thức = chờ loop.create_datagram_endpoint(
lambda: EchoClientProtocol(tin nhắn, on_con_lost),
remote_addr=('127.0.0.1', 9999))
thử:
đang chờ on_con_lost
cuối cùng:
vận chuyển.close()
asyncio.run(chính())
Kết nối các ổ cắm hiện có¶
Đợi cho đến khi ổ cắm nhận dữ liệu bằng phương thức loop.create_connection() với giao thức
nhập asyncio
ổ cắm nhập khẩu
lớp MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = Không
self.on_con_lost = on_con_lost
def Connection_made(tự, vận chuyển):
self.transport = vận chuyển
def data_received(self, data):
print("Đã nhận:", data.decode())
# We đã xong: đóng phương tiện giao thông;
# connection_lost() sẽ được gọi tự động.
self.transport.close()
def Connection_lost(self, exc):
Ổ cắm # The đã bị đóng
self.on_con_lost.set_result(Đúng)
async def main():
# Get tham chiếu đến vòng lặp sự kiện mà chúng tôi dự định sử dụng
API # low-level.
vòng lặp = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create một cặp ổ cắm được kết nối
rsock, wsock = socket.socketpair()
# Register ổ cắm để chờ dữ liệu.
vận chuyển, giao thức = đang chờ loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate việc tiếp nhận dữ liệu từ mạng.
loop.call_soon(wsock.send, 'abc'.encode())
thử:
đang chờ giao thức.on_con_lost
cuối cùng:
vận chuyển.close()
wsock.close()
asyncio.run(chính())
Xem thêm
Ví dụ watch a file descriptor for read events sử dụng phương thức loop.add_reader() cấp thấp để đăng ký FD.
Ví dụ register an open socket to wait for data using streams sử dụng các luồng cấp cao được tạo bởi hàm open_connection() trong coroutine.
loop.subprocess_exec() và SubprocessProtocol¶
Một ví dụ về giao thức quy trình con được sử dụng để lấy đầu ra của quy trình con và chờ thoát khỏi quy trình con.
Quy trình con được tạo bằng phương thức loop.subprocess_exec()
nhập asyncio
hệ thống nhập khẩu
lớp DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
self.pipe_closed = Sai
self.exit = Sai
def pipe_connection_lost(self, fd, exc):
self.pipe_closed = Đúng
self.check_for_exit()
def pipe_data_received(self, fd, data):
self.output.extend(dữ liệu)
def process_exit(tự):
self.exit = Đúng
Phương thức # process_exited() có thể được gọi trước
Phương thức # pipe_connection_lost(): đợi cho đến khi cả hai phương thức đều được
# called.
self.check_for_exit()
def check_for_exit(tự):
nếu self.pipe_closed và self.exit:
self.exit_future.set_result(Đúng)
không đồng bộ get_date():
# Get tham chiếu đến vòng lặp sự kiện mà chúng tôi dự định sử dụng
API # low-level.
vòng lặp = asyncio.get_running_loop()
code = 'nhập ngày giờ dưới dạng dt; print(dt.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create quy trình con được điều khiển bởi DateProtocol;
# redirect đầu ra tiêu chuẩn thành một đường ống.
vận chuyển, giao thức = đang chờ loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', mã,
stdin=Không, stderr=Không)
# Wait để thoát khỏi quy trình con bằng cách sử dụng process_exited()
# method của giao thức.
đang chờ exit_future
# Close ống xuất chuẩn.
vận chuyển.close()
# Read đầu ra được thu thập bởi
phương thức # pipe_data_received() của giao thức.
dữ liệu = byte(protocol.output)
trả về data.decode('ascii').rstrip()
ngày = asyncio.run(get_date())
print(f"Ngày hiện tại: {date}")
Xem thêm same example được viết bằng API cấp cao.