Hàng đợi

Source code: Lib/asyncio/queues.py


Hàng đợi asyncio được thiết kế tương tự như các lớp của mô-đun queue. Mặc dù hàng đợi asyncio không an toàn theo luồng nhưng chúng được thiết kế để sử dụng cụ thể trong mã async/await.

Lưu ý rằng các phương thức của hàng đợi asyncio không có tham số timeout; sử dụng chức năng asyncio.wait_for() để thực hiện các thao tác xếp hàng có thời gian chờ.

Xem thêm phần Examples bên dưới.

Hàng đợi

class asyncio.Queue(maxsize=0)

Hàng đợi vào trước ra trước (FIFO).

Nếu maxsize nhỏ hơn hoặc bằng 0 thì kích thước hàng đợi là vô hạn. Nếu đó là số nguyên lớn hơn 0 thì await put() sẽ chặn khi hàng đợi đạt tới maxsize cho đến khi một mục bị get() xóa.

Không giống như luồng thư viện tiêu chuẩn queue, kích thước của hàng đợi luôn được biết trước và có thể được trả về bằng cách gọi phương thức qsize().

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Lớp này là not thread safe.

maxsize

Số lượng mục được phép trong hàng đợi.

empty()

Trả về True nếu hàng đợi trống, False nếu không.

full()

Trả về True nếu có các mục maxsize trong hàng đợi.

Nếu hàng đợi được khởi tạo bằng maxsize=0 (mặc định) thì full() không bao giờ trả về True.

async get()

Xóa và trả lại một mục từ hàng đợi. Nếu hàng đợi trống, hãy đợi cho đến khi có mặt hàng.

Tăng QueueShutDown nếu hàng đợi đã bị tắt và trống hoặc nếu hàng đợi đã bị tắt ngay lập tức.

get_nowait()

Trả lại một vật phẩm nếu có sẵn một vật phẩm, nếu không thì tăng QueueEmpty.

async join()

Chặn cho đến khi tất cả các mục trong hàng đợi đã được nhận và xử lý.

Số lượng nhiệm vụ chưa hoàn thành sẽ tăng lên bất cứ khi nào một mục được thêm vào hàng đợi. Số lượng giảm xuống bất cứ khi nào một coroutine tiêu dùng gọi task_done() để cho biết rằng mục đó đã được truy xuất và mọi công việc trên đó đã hoàn tất. Khi số lượng nhiệm vụ chưa hoàn thành giảm xuống 0, join() sẽ mở khóa.

async put(item)

Đặt một mục vào hàng đợi. Nếu hàng đợi đã đầy, hãy đợi cho đến khi còn chỗ trống trước khi thêm vật phẩm.

Tăng QueueShutDown nếu hàng đợi đã bị tắt.

put_nowait(item)

Đặt một mục vào hàng đợi mà không bị chặn.

Nếu không có slot trống ngay lập tức, hãy raise QueueFull.

qsize()

Trả về số lượng mục trong hàng đợi.

shutdown(immediate=False)

Đặt phiên bản Queue vào chế độ tắt máy.

Hàng đợi không thể phát triển được nữa. Các cuộc gọi trong tương lai tới put() sẽ tăng QueueShutDown. Những người gọi put() hiện bị chặn sẽ được bỏ chặn và sẽ gọi QueueShutDown trong nhiệm vụ đang chờ trước đó.

Nếu immediate là sai (mặc định), hàng đợi có thể được kết thúc bình thường bằng các lệnh gọi get() để trích xuất các tác vụ đã được tải.

Và nếu task_done() được gọi cho mỗi nhiệm vụ còn lại, join() đang chờ xử lý sẽ được bỏ chặn bình thường.

Khi hàng đợi trống, các lệnh gọi tới get() trong tương lai sẽ tăng QueueShutDown.

Nếu immediate đúng, hàng đợi sẽ bị chấm dứt ngay lập tức. Hàng đợi được rút hết hoàn toàn trống và số lượng nhiệm vụ chưa hoàn thành sẽ giảm đi theo số lượng nhiệm vụ đã được rút hết. Nếu nhiệm vụ chưa hoàn thành bằng 0, người gọi join() sẽ được bỏ chặn. Ngoài ra, những người gọi bị chặn của get() sẽ được bỏ chặn và sẽ gọi QueueShutDown vì hàng đợi trống.

Hãy thận trọng khi sử dụng join() với immediate được đặt thành true. Điều này sẽ bỏ chặn sự tham gia ngay cả khi không có công việc nào được thực hiện đối với các tác vụ, vi phạm tính bất biến thông thường khi tham gia hàng đợi.

Added in version 3.13.

task_done()

Cho biết rằng một mục công việc được xếp hàng trước đây đã hoàn tất.

Được sử dụng bởi người tiêu dùng xếp hàng. Đối với mỗi get() được sử dụng để tìm nạp một mục công việc, một lệnh gọi tiếp theo tới task_done() sẽ thông báo cho hàng đợi rằng quá trình xử lý mục công việc đó đã hoàn tất.

Nếu một join() hiện đang bị chặn thì nó sẽ tiếp tục lại khi tất cả các mục đã được xử lý (có nghĩa là đã nhận được lệnh gọi task_done() cho mọi mục đã được put() đưa vào hàng đợi).

Tăng ValueError nếu được gọi nhiều lần hơn số mục được đặt trong hàng đợi.

Hàng đợi ưu tiên

class asyncio.PriorityQueue

Một biến thể của Queue; truy xuất các mục theo thứ tự ưu tiên (thấp nhất trước).

Các mục nhập thường là các bộ dữ liệu có dạng (priority_number, data).

hàng đợi LIFO

class asyncio.LifoQueue

Một biến thể của Queue truy xuất các mục được thêm gần đây nhất trước (vào sau, ra trước).

Ngoại lệ

exception asyncio.QueueEmpty

Ngoại lệ này được đưa ra khi phương thức get_nowait() được gọi trên hàng đợi trống.

exception asyncio.QueueFull

Ngoại lệ nảy sinh khi phương thức put_nowait() được gọi trên hàng đợi đã đạt đến maxsize.

exception asyncio.QueueShutDown

Ngoại lệ nảy sinh khi put() hoặc get() được gọi trên hàng đợi đã bị tắt.

Added in version 3.13.

Ví dụ

Hàng đợi có thể được sử dụng để phân phối khối lượng công việc giữa một số tác vụ đồng thời:

nhập asyncio
nhập khẩu ngẫu nhiên
thời gian nhập khẩu


async def worker(tên, hàng đợi):
    trong khi Đúng:
        # Get một "mục công việc" ra khỏi hàng đợi.
        sleep_for = đang chờ queue.get()

        # Sleep trong giây "sleep_for".
        đang chờ asyncio.sleep(sleep_for)

        # Notify hàng đợi "mục công việc" đã được xử lý.
        hàng đợi.task_done()

        print(f'{name} đã ngủ được {sleep_for:.2f} giây')


async def main():
    # Create một hàng đợi mà chúng ta sẽ sử dụng để lưu trữ "khối lượng công việc" của mình.
    hàng đợi = asyncio.Queue()

    # Generate thời gian ngẫu nhiên và đưa chúng vào hàng đợi.
    tổng_thời gian ngủ = 0
    cho _ trong phạm vi (20):
        sleep_for = ngẫu nhiên.uniform(0,05, 1,0)
        tổng_thời gian ngủ += ngủ_cho
        queue.put_nowait(sleep_for)

    # Create ba tác vụ công nhân để xử lý hàng đợi đồng thời.
    nhiệm vụ = []
    cho tôi trong phạm vi (3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        nhiệm vụ.append(nhiệm vụ)

    # Wait cho đến khi hàng đợi được xử lý hoàn toàn.
    started_at = time.monotonic()
    đang chờ hàng đợi.join()
    Total_slept_for = time.monotonic() - started_at

    # Cancel nhiệm vụ công nhân của chúng tôi.
    cho nhiệm vụ trong nhiệm vụ:
        nhiệm vụ.cancel()
    # Wait cho đến khi tất cả nhiệm vụ của nhân viên bị hủy.
    đang chờ asyncio.gather(*tasks, return_Exceptions=True)

    in('====')
    print(f'3 công nhân ngủ song song trong {total_slept_for:.2f} giây')
    print(f'tổng thời gian ngủ dự kiến: {total_sleep_time:.2f} giây')


asyncio.run(chính())