Nguyên thủy đồng bộ hóa

Source code: Lib/asyncio/locks.py


Các nguyên hàm đồng bộ hóa asyncio được thiết kế tương tự như mô-đun threading với hai lưu ý quan trọng:

  • các nguyên hàm asyncio không an toàn cho luồng, do đó chúng không nên được sử dụng để đồng bộ hóa luồng của hệ điều hành (sử dụng threading cho việc đó);

  • các phương thức đồng bộ hóa nguyên thủy này không chấp nhận đối số timeout; sử dụng chức năng asyncio.wait_for() để thực hiện các thao tác khi hết thời gian chờ.

asyncio có các nguyên tắc đồng bộ hóa cơ bản sau:


Khóa

class asyncio.Lock

Triển khai khóa mutex cho các tác vụ không đồng bộ. Không an toàn chủ đề.

Khóa asyncio có thể được sử dụng để đảm bảo quyền truy cập độc quyền vào tài nguyên được chia sẻ.

Cách ưa thích để sử dụng Khóa là câu lệnh async with

khóa = asyncio.Lock()

# ... sau
không đồng bộ với khóa:
    trạng thái chia sẻ # access

tương đương với:

khóa = asyncio.Lock()

# ... sau
đang chờ khóa.acquire()
thử:
    trạng thái chia sẻ # access
cuối cùng:
    lock.release()

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

async acquire()

Lấy ổ khóa.

Phương thức này đợi cho đến khi khóa là unlocked, đặt thành locked và trả về True.

Khi có nhiều hơn một coroutine bị chặn trong acquire() đang chờ khóa được mở khóa thì cuối cùng chỉ có một coroutine tiếp tục.

Việc nhận được khóa là fair: coroutine tiến hành sẽ là coroutine đầu tiên bắt đầu chờ trên khóa.

release()

Mở khóa.

Khi khóa là locked thì reset về unlocked rồi quay lại.

Nếu khóa là unlocked, RuntimeError sẽ được nâng lên.

locked()

Trả về True nếu khóa là locked.

Sự kiện

class asyncio.Event

Một đối tượng sự kiện Không an toàn chủ đề.

Một sự kiện asyncio có thể được sử dụng để thông báo cho nhiều tác vụ asyncio rằng một số sự kiện đã xảy ra.

Một đối tượng Sự kiện quản lý một cờ nội bộ có thể được đặt thành true bằng phương thức set() và đặt lại thành false bằng phương thức clear(). Phương thức wait() chặn cho đến khi cờ được đặt thành true. Cờ ban đầu được đặt thành false.

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Ví dụ:

người phục vụ không đồng bộ (sự kiện):
    print('đang chờ nó ...')
    đang chờ sự kiện.wait()
    print('... hiểu rồi!')

async def main():
    # Create một đối tượng Sự kiện.
    sự kiện = asyncio.Event()

    # Spawn một Nhiệm vụ để đợi cho đến khi 'sự kiện' được đặt.
    bồi bàn_task = asyncio.create_task(người phục vụ (sự kiện))

    # Sleep trong 1 giây và thiết lập sự kiện.
    đang chờ asyncio.sleep(1)
    sự kiện.set()

    # Wait cho đến khi hoàn thành nhiệm vụ bồi bàn.
    chờ đợi bồi bàn_task

asyncio.run(chính())
async wait()

Đợi cho đến khi sự kiện được thiết lập.

Nếu sự kiện được đặt, hãy trả lại True ngay lập tức. Nếu không thì chặn cho đến khi tác vụ khác gọi set().

set()

Đặt sự kiện.

Tất cả các nhiệm vụ đang chờ sự kiện được thiết lập sẽ được đánh thức ngay lập tức.

clear()

Xóa (bỏ đặt) sự kiện.

Các tác vụ tiếp theo đang chờ trên wait() giờ đây sẽ bị chặn cho đến khi phương thức set() được gọi lại.

is_set()

Trả về True nếu sự kiện được đặt.

tình trạng

class asyncio.Condition(lock=None)

Một đối tượng Điều kiện. Không an toàn chủ đề.

Một tác vụ có thể sử dụng điều kiện asyncio nguyên thủy để chờ một số sự kiện xảy ra và sau đó có quyền truy cập độc quyền vào tài nguyên được chia sẻ.

Về bản chất, đối tượng Điều kiện kết hợp chức năng của EventLock. Có thể có nhiều đối tượng Điều kiện chia sẻ một Khóa, điều này cho phép phối hợp quyền truy cập độc quyền vào tài nguyên được chia sẻ giữa các tác vụ khác nhau quan tâm đến trạng thái cụ thể của tài nguyên được chia sẻ đó.

Đối số lock tùy chọn phải là đối tượng Lock hoặc None. Trong trường hợp sau, một đối tượng Lock mới sẽ được tạo tự động.

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Cách ưa thích để sử dụng Điều kiện là câu lệnh async with

cond = asyncio.Condition()

# ... sau
không đồng bộ với cond:
    đang chờ cond.wait()

tương đương với:

cond = asyncio.Condition()

# ... sau
đang chờ cond.acquire()
thử:
    đang chờ cond.wait()
cuối cùng:
    cond.release()
async acquire()

Có được khóa cơ bản.

Phương thức này đợi cho đến khi khóa cơ bản là unlocked, đặt nó thành locked và trả về True.

notify(n=1)

Đánh thức các tác vụ n (1 theo mặc định) đang chờ điều kiện này. Nếu có ít hơn nhiệm vụ n đang chờ thì tất cả chúng đều được đánh thức.

Khóa phải được lấy trước khi phương thức này được gọi và giải phóng ngay sau đó. Nếu được gọi bằng khóa unlocked, lỗi RuntimeError sẽ xuất hiện.

locked()

Trả về True nếu có được khóa cơ bản.

notify_all()

Đánh thức tất cả các nhiệm vụ đang chờ với điều kiện này.

Phương pháp này hoạt động giống như notify(), nhưng đánh thức tất cả các tác vụ đang chờ.

Khóa phải được lấy trước khi phương thức này được gọi và giải phóng ngay sau đó. Nếu được gọi bằng khóa unlocked, lỗi RuntimeError sẽ xuất hiện.

release()

Nhả khóa bên dưới.

Khi được gọi trên ổ khóa đã mở khóa, RuntimeError sẽ xuất hiện.

async wait()

Chờ cho đến khi được thông báo.

Nếu tác vụ gọi không nhận được khóa khi phương thức này được gọi, thì RuntimeError sẽ được đưa ra.

Phương thức này giải phóng khóa cơ bản, sau đó chặn cho đến khi nó được đánh thức bởi lệnh gọi notify() hoặc notify_all(). Sau khi được đánh thức, Điều kiện lấy lại khóa của nó và phương thức này trả về True.

Lưu ý rằng tác vụ may quay trở lại từ cuộc gọi này một cách giả mạo, đó là lý do tại sao người gọi phải luôn kiểm tra lại trạng thái và chuẩn bị thực hiện lại wait(). Vì lý do này, bạn có thể thích sử dụng wait_for() hơn.

async wait_for(predicate)

Đợi cho đến khi vị từ trở thành true.

Vị từ phải là một giá trị có thể gọi được và kết quả sẽ được hiểu là giá trị boolean. Phương thức này sẽ lặp lại wait() cho đến khi vị từ có giá trị là true. Giá trị cuối cùng là giá trị trả về.

Semaphore

class asyncio.Semaphore(value=1)

Một đối tượng Semaphore. Không an toàn chủ đề.

Một semaphore quản lý một bộ đếm nội bộ được giảm dần theo mỗi lệnh gọi acquire() và tăng lên theo mỗi lệnh gọi release(). Bộ đếm không bao giờ có thể xuống dưới 0; khi acquire() thấy nó bằng 0, nó sẽ chặn, đợi cho đến khi tác vụ nào đó gọi release().

Đối số value tùy chọn cung cấp giá trị ban đầu cho bộ đếm bên trong (1 theo mặc định). Nếu giá trị đã cho nhỏ hơn 0 thì ValueError sẽ được tăng lên.

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

Cách ưa thích để sử dụng Semaphore là câu lệnh async with

sem = asyncio.Semaphore(10)

# ... sau
không đồng bộ với sem:
    # work với tài nguyên được chia sẻ

tương đương với:

sem = asyncio.Semaphore(10)

# ... sau
đang chờ sem.acquire()
thử:
    # work với tài nguyên được chia sẻ
cuối cùng:
    sem.release()
async acquire()

Có được một semaphore.

Nếu bộ đếm bên trong lớn hơn 0, hãy giảm nó đi một và trả về True ngay lập tức. Nếu nó bằng 0, hãy đợi cho đến khi release() được gọi và trả về True.

locked()

Trả về True nếu không thể lấy được semaphore ngay lập tức.

release()

Giải phóng một đèn hiệu, tăng bộ đếm bên trong lên một. Có thể đánh thức một tác vụ đang chờ lấy semaphore.

Không giống như BoundedSemaphore, Semaphore cho phép thực hiện nhiều cuộc gọi release() hơn cuộc gọi acquire().

Giới hạnSemaphore

class asyncio.BoundedSemaphore(value=1)

Một đối tượng semaphore giới hạn. Không an toàn chủ đề.

Semaphore giới hạn là một phiên bản của Semaphore tăng ValueError trong release() nếu nó tăng bộ đếm bên trong lên trên value ban đầu.

Thay đổi trong phiên bản 3.10: Đã xóa tham số loop.

rào cản

class asyncio.Barrier(parties)

Một vật cản. Không an toàn chủ đề.

Rào cản là một nguyên tắc đồng bộ hóa đơn giản cho phép chặn cho đến khi số lượng nhiệm vụ parties đang chờ trên đó. Các tác vụ có thể chờ trên phương thức wait() và sẽ bị chặn cho đến khi số lượng tác vụ được chỉ định kết thúc ở trạng thái chờ trên wait(). Tại thời điểm đó, tất cả các tác vụ đang chờ sẽ được mở khóa đồng thời.

async with có thể được sử dụng thay thế cho việc chờ trên wait().

Rào chắn có thể được tái sử dụng bất kỳ số lần nào.

Ví dụ:

async def example_barrier():
   # barrier cùng 3 bên
   b = asyncio.Barrier(3)

   # create 2 nhiệm vụ chờ mới
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   đang chờ asyncio.sleep(0)
   in(b)

   Cuộc gọi .wait() thứ ba # The vượt qua rào cản
   đang chờ b.wait()
   in(b)
   print("đã vượt qua rào cản")

   đang chờ asyncio.sleep(0)
   in(b)

asyncio.run(example_barrier())

Kết quả của ví dụ này là:

<asyncio.locks.Barrier đối tượng  0x... [điền, bồi bàn:2/3]>
<asyncio.locks.Barrier đối tượng  0x... [thoát nước, bồi bàn:0/3]>
rào cản đã vượt qua
<asyncio.locks.Barrier đối tượng  0x... [điền, bồi bàn:0/3]>

Added in version 3.11.

async wait()

Vượt qua rào cản. Khi tất cả các bên nhiệm vụ của rào chắn đã gọi chức năng này, tất cả chúng đều được bỏ chặn đồng thời.

Khi một nhiệm vụ đang chờ hoặc bị chặn trong rào cản bị hủy, nhiệm vụ này sẽ thoát khỏi rào cản và vẫn ở trạng thái tương tự. Nếu trạng thái của rào cản là "lấp đầy", số lượng nhiệm vụ chờ sẽ giảm đi 1.

Giá trị trả về là một số nguyên trong khoảng từ 0 đến parties-1, khác nhau đối với từng tác vụ. Điều này có thể được sử dụng để chọn một nhiệm vụ thực hiện một số công việc dọn phòng đặc biệt, ví dụ:

...
không đồng bộ với rào cản  vị trí:
   nếu vị trí == 0:
      # Only một tác vụ in cái này
      print('Kết thúc *draining phase*')

Phương pháp này có thể đưa ra ngoại lệ BrokenBarrierError nếu rào cản bị hỏng hoặc được đặt lại trong khi tác vụ đang chờ. Nó có thể tăng CancelledError nếu một tác vụ bị hủy.

async reset()

Đưa rào cản về trạng thái trống, mặc định. Bất kỳ nhiệm vụ nào đang chờ nó sẽ nhận được ngoại lệ BrokenBarrierError.

Nếu một rào cản bị phá vỡ, tốt hơn hết là bạn nên bỏ nó đi và tạo một rào cản mới.

async abort()

Đặt rào cản vào trạng thái bị phá vỡ. Điều này khiến mọi lệnh gọi đang hoạt động hoặc trong tương lai tới wait() đều không thành công với BrokenBarrierError. Ví dụ: sử dụng điều này nếu một trong các tác vụ cần hủy bỏ để tránh các tác vụ phải chờ vô hạn.

parties

Số lượng nhiệm vụ cần thiết để vượt qua rào cản.

n_waiting

Số lượng nhiệm vụ hiện đang chờ trong rào chắn trong khi lấp đầy.

broken

Một boolean có giá trị True nếu rào cản ở trạng thái bị hỏng.

exception asyncio.BrokenBarrierError

Ngoại lệ này, một lớp con của RuntimeError, được đưa ra khi đối tượng Barrier bị đặt lại hoặc bị hỏng.


Thay đổi trong phiên bản 3.9: Việc lấy khóa bằng câu lệnh await lock hoặc yield from lock và/hoặc with (with await lock, with (yield from lock)) đã bị xóa. Thay vào đó hãy sử dụng async with lock.