Hàng đợi¶
Source code: Lib/asyncio/queues.py
Hàng đợi asyncio được thiết kế tương tự như các lớp của mô-đun queue. Mặc dù hàng đợi asyncio không an toàn theo luồng nhưng chúng được thiết kế để sử dụng cụ thể trong mã async/await.
Lưu ý rằng các phương thức của hàng đợi asyncio không có tham số timeout; sử dụng chức năng asyncio.wait_for() để thực hiện các thao tác xếp hàng có thời gian chờ.
Xem thêm phần Examples bên dưới.
Hàng đợi¶
- class asyncio.Queue(maxsize=0)¶
Hàng đợi vào trước ra trước (FIFO).
Nếu maxsize nhỏ hơn hoặc bằng 0 thì kích thước hàng đợi là vô hạn. Nếu đó là số nguyên lớn hơn
0thìawait put()sẽ chặn khi hàng đợi đạt tới maxsize cho đến khi một mục bịget()xóa.Không giống như luồng thư viện tiêu chuẩn
queue, kích thước của hàng đợi luôn được biết trước và có thể được trả về bằng cách gọi phương thứcqsize().Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.
Lớp này là not thread safe.
- maxsize¶
Số lượng mục được phép trong hàng đợi.
- empty()¶
Trả về
Truenếu hàng đợi trống,Falsenếu không.
- full()¶
Trả về
Truenếu có các mụcmaxsizetrong hàng đợi.Nếu hàng đợi được khởi tạo bằng
maxsize=0(mặc định) thìfull()không bao giờ trả vềTrue.
- async get()¶
Xóa và trả lại một mục từ hàng đợi. Nếu hàng đợi trống, hãy đợi cho đến khi có mặt hàng.
Tăng
QueueShutDownnếu hàng đợi đã bị tắt và trống hoặc nếu hàng đợi đã bị tắt ngay lập tức.
- get_nowait()¶
Trả lại một vật phẩm nếu có sẵn một vật phẩm, nếu không thì tăng
QueueEmpty.
- async join()¶
Chặn cho đến khi tất cả các mục trong hàng đợi đã được nhận và xử lý.
Số lượng nhiệm vụ chưa hoàn thành sẽ tăng lên bất cứ khi nào một mục được thêm vào hàng đợi. Số lượng giảm xuống bất cứ khi nào một coroutine tiêu dùng gọi
task_done()để cho biết rằng mục đó đã được truy xuất và mọi công việc trên đó đã hoàn tất. Khi số lượng nhiệm vụ chưa hoàn thành giảm xuống 0,join()sẽ mở khóa.
- async put(item)¶
Đặt một mục vào hàng đợi. Nếu hàng đợi đã đầy, hãy đợi cho đến khi còn chỗ trống trước khi thêm vật phẩm.
Tăng
QueueShutDownnếu hàng đợi đã bị tắt.
- put_nowait(item)¶
Đặt một mục vào hàng đợi mà không bị chặn.
Nếu không có slot trống ngay lập tức, hãy raise
QueueFull.
- qsize()¶
Trả về số lượng mục trong hàng đợi.
- shutdown(immediate=False)¶
Đặt phiên bản
Queuevào chế độ tắt máy.Hàng đợi không thể phát triển được nữa. Các cuộc gọi trong tương lai tới
put()sẽ tăngQueueShutDown. Những người gọiput()hiện bị chặn sẽ được bỏ chặn và sẽ gọiQueueShutDowntrong nhiệm vụ đang chờ trước đó.Nếu immediate là sai (mặc định), hàng đợi có thể được kết thúc bình thường bằng các lệnh gọi
get()để trích xuất các tác vụ đã được tải.Và nếu
task_done()được gọi cho mỗi nhiệm vụ còn lại,join()đang chờ xử lý sẽ được bỏ chặn bình thường.Khi hàng đợi trống, các lệnh gọi tới
get()trong tương lai sẽ tăngQueueShutDown.Nếu immediate đúng, hàng đợi sẽ bị chấm dứt ngay lập tức. Hàng đợi được rút hết hoàn toàn trống và số lượng nhiệm vụ chưa hoàn thành sẽ giảm đi theo số lượng nhiệm vụ đã được rút hết. Nếu nhiệm vụ chưa hoàn thành bằng 0, người gọi
join()sẽ được bỏ chặn. Ngoài ra, những người gọi bị chặn củaget()sẽ được bỏ chặn và sẽ gọiQueueShutDownvì hàng đợi trống.Hãy thận trọng khi sử dụng
join()với immediate được đặt thành true. Điều này sẽ bỏ chặn sự tham gia ngay cả khi không có công việc nào được thực hiện đối với các tác vụ, vi phạm tính bất biến thông thường khi tham gia hàng đợi.Added in version 3.13.
- task_done()¶
Cho biết rằng một mục công việc được xếp hàng trước đây đã hoàn tất.
Được sử dụng bởi người tiêu dùng xếp hàng. Đối với mỗi
get()được sử dụng để tìm nạp một mục công việc, một lệnh gọi tiếp theo tớitask_done()sẽ thông báo cho hàng đợi rằng quá trình xử lý mục công việc đó đã hoàn tất.Nếu một
join()hiện đang bị chặn thì nó sẽ tiếp tục lại khi tất cả các mục đã được xử lý (có nghĩa là đã nhận được lệnh gọitask_done()cho mọi mục đã đượcput()đưa vào hàng đợi).Tăng
ValueErrornếu được gọi nhiều lần hơn số mục được đặt trong hàng đợi.
Hàng đợi ưu tiên¶
hàng đợi LIFO¶
Ngoại lệ¶
- exception asyncio.QueueEmpty¶
Ngoại lệ này được đưa ra khi phương thức
get_nowait()được gọi trên hàng đợi trống.
- exception asyncio.QueueFull¶
Ngoại lệ nảy sinh khi phương thức
put_nowait()được gọi trên hàng đợi đã đạt đến maxsize.
Ví dụ¶
Hàng đợi có thể được sử dụng để phân phối khối lượng công việc giữa một số tác vụ đồng thời:
nhập asyncio
nhập khẩu ngẫu nhiên
thời gian nhập khẩu
async def worker(tên, hàng đợi):
trong khi Đúng:
# Get một "mục công việc" ra khỏi hàng đợi.
sleep_for = đang chờ queue.get()
# Sleep trong giây "sleep_for".
đang chờ asyncio.sleep(sleep_for)
# Notify hàng đợi "mục công việc" đã được xử lý.
hàng đợi.task_done()
print(f'{name} đã ngủ được {sleep_for:.2f} giây')
async def main():
# Create một hàng đợi mà chúng ta sẽ sử dụng để lưu trữ "khối lượng công việc" của mình.
hàng đợi = asyncio.Queue()
# Generate thời gian ngẫu nhiên và đưa chúng vào hàng đợi.
tổng_thời gian ngủ = 0
cho _ trong phạm vi (20):
sleep_for = ngẫu nhiên.uniform(0,05, 1,0)
tổng_thời gian ngủ += ngủ_cho
queue.put_nowait(sleep_for)
# Create ba tác vụ công nhân để xử lý hàng đợi đồng thời.
nhiệm vụ = []
cho tôi trong phạm vi (3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
nhiệm vụ.append(nhiệm vụ)
# Wait cho đến khi hàng đợi được xử lý hoàn toàn.
started_at = time.monotonic()
đang chờ hàng đợi.join()
Total_slept_for = time.monotonic() - started_at
# Cancel nhiệm vụ công nhân của chúng tôi.
cho nhiệm vụ trong nhiệm vụ:
nhiệm vụ.cancel()
# Wait cho đến khi tất cả nhiệm vụ của nhân viên bị hủy.
đang chờ asyncio.gather(*tasks, return_Exceptions=True)
in('====')
print(f'3 công nhân ngủ song song trong {total_slept_for:.2f} giây')
print(f'tổng thời gian ngủ dự kiến: {total_sleep_time:.2f} giây')
asyncio.run(chính())