multiprocessing --- Song song dựa trên quy trình¶
Source code: Lib/multiprocessing/
sẵn có: not Android, not iOS, not WASI.
Mô-đun này không được hỗ trợ trên mobile platforms hoặc WebAssembly platforms.
Giới thiệu¶
multiprocessing là gói hỗ trợ quá trình sinh sản bằng API tương tự như mô-đun threading. Gói multiprocessing cung cấp cả khả năng đồng thời cục bộ và từ xa, vượt qua Global Interpreter Lock một cách hiệu quả bằng cách sử dụng các quy trình con thay vì các luồng. Do đó, mô-đun multiprocessing cho phép lập trình viên tận dụng tối đa nhiều bộ xử lý trên một máy nhất định. Nó chạy trên cả POSIX và Windows.
Mô-đun multiprocessing cũng giới thiệu đối tượng Pool cung cấp một phương tiện thuận tiện để song song hóa việc thực thi một hàm trên nhiều giá trị đầu vào, phân phối dữ liệu đầu vào qua các quy trình (song song dữ liệu). Ví dụ sau đây minh họa cách thực hành phổ biến trong việc xác định các hàm như vậy trong một mô-đun để các tiến trình con có thể nhập thành công mô-đun đó. Ví dụ cơ bản này về song song dữ liệu bằng Pool,
từ nhóm nhập đa xử lý
định nghĩa f(x):
trả lại x*x
nếu __name__ == '__main__':
với Pool(5) là p:
print(p.map(f, [1, 2, 3]))
sẽ in ra đầu ra tiêu chuẩn
[1, 4, 9]
Mô-đun multiprocessing cũng giới thiệu các API không có các API tương tự trong mô-đun threading, như khả năng terminate, interrupt hoặc kill một quy trình đang chạy.
Xem thêm
concurrent.futures.ProcessPoolExecutor cung cấp giao diện cấp cao hơn để đẩy các tác vụ sang quy trình nền mà không chặn việc thực thi quy trình gọi. So với việc sử dụng trực tiếp giao diện Pool, concurrent.futures API dễ dàng hơn cho phép việc gửi công việc đến nhóm quy trình cơ bản được tách biệt khỏi việc chờ kết quả.
Lớp Process¶
Trong multiprocessing, các tiến trình được sinh ra bằng cách tạo một đối tượng Process và sau đó gọi phương thức start() của nó. Process theo sau API của threading.Thread. Một ví dụ tầm thường của chương trình đa xử lý là
từ quá trình nhập đa xử lý
định nghĩa f(tên):
in('xin chào', tên)
nếu __name__ == '__main__':
p = Quá trình(target=f, args=('bob',))
p.start()
p.join()
Để hiển thị ID tiến trình riêng lẻ có liên quan, đây là ví dụ mở rộng:
từ quá trình nhập đa xử lý
hệ điều hành nhập khẩu
thông tin def (tiêu đề):
in (tiêu đề)
print('tên mô-đun:', __name__)
print('quy trình cha:', os.getppid())
print('process id:', os.getpid())
định nghĩa f(tên):
thông tin('hàm f')
in('xin chào', tên)
nếu __name__ == '__main__':
thông tin('dòng chính')
p = Quá trình(target=f, args=('bob',))
p.start()
p.join()
Để biết lý do tại sao phần if __name__ == '__main__' lại cần thiết, hãy xem Hướng dẫn lập trình.
Các đối số của Process thường không thể được chọn từ bên trong tiến trình con. Nếu bạn thử nhập trực tiếp ví dụ trên vào REPL, điều đó có thể dẫn đến việc AttributeError trong tiến trình con đang cố gắng định vị hàm f trong mô-đun __main__.
Bối cảnh và phương pháp bắt đầu¶
Tùy thuộc vào nền tảng, multiprocessing hỗ trợ ba cách để bắt đầu một quy trình. Những start methods này là
- spawn
Tiến trình gốc bắt đầu một tiến trình thông dịch Python mới. Tiến trình con sẽ chỉ kế thừa những tài nguyên cần thiết để chạy phương thức
run()của đối tượng tiến trình. Đặc biệt, các bộ mô tả và xử lý tệp không cần thiết từ tiến trình gốc sẽ không được kế thừa. Bắt đầu một quá trình bằng phương pháp này khá chậm so với việc sử dụng fork hoặc forkserver.Có sẵn trên nền tảng POSIX và Windows. Mặc định trên Windows và macOS.
- fork
Quá trình gốc sử dụng
os.fork()để phân nhánh trình thông dịch Python. Tiến trình con khi bắt đầu thực sự giống với tiến trình cha. Tất cả các tài nguyên của tiến trình cha đều được kế thừa bởi tiến trình con. Lưu ý rằng việc phân nhánh một cách an toàn một quy trình đa luồng là có vấn đề.Có sẵn trên hệ thống POSIX.
Thay đổi trong phiên bản 3.14: Đây không còn là phương thức bắt đầu mặc định trên bất kỳ nền tảng nào. Mã yêu cầu fork phải chỉ định rõ ràng thông qua
get_context()hoặcset_start_method().Thay đổi trong phiên bản 3.12: Nếu Python có thể phát hiện rằng quy trình của bạn có nhiều luồng, thì hàm
os.fork()mà phương thức bắt đầu này gọi nội bộ sẽ đưa raDeprecationWarning. Sử dụng một phương pháp bắt đầu khác. Xem tài liệuos.fork()để được giải thích thêm.
- forkserver
Khi chương trình khởi động và chọn phương thức khởi động forkserver, một quy trình máy chủ sẽ được sinh ra. Từ đó trở đi, bất cứ khi nào cần một quy trình mới, quy trình gốc sẽ kết nối với máy chủ và yêu cầu nó phân nhánh một quy trình mới. Quá trình máy chủ ngã ba là một luồng trừ khi các thư viện hệ thống hoặc các luồng xuất hiện nhập được tải sẵn dưới dạng tác dụng phụ nên nhìn chung nó sẽ an toàn khi sử dụng
os.fork(). Không có tài nguyên không cần thiết được kế thừa.Có sẵn trên nền tảng POSIX hỗ trợ truyền bộ mô tả tệp qua hệ điều hành Unix như Linux. Mặc định trên đó.
Thay đổi trong phiên bản 3.14: Đây đã trở thành phương thức bắt đầu mặc định trên nền tảng POSIX.
Thay đổi trong phiên bản 3.4: spawn được thêm vào tất cả các nền tảng POSIX và forkserver được thêm vào một số nền tảng POSIX. Các tiến trình con không còn kế thừa tất cả các thẻ điều khiển có thể kế thừa của cha mẹ trên Windows.
Thay đổi trong phiên bản 3.8: Trên macOS, phương thức khởi động spawn hiện là mặc định. Phương thức khởi động fork được coi là không an toàn vì nó có thể dẫn đến sự cố của quy trình con vì các thư viện hệ thống macOS có thể khởi động các luồng. Xem bpo-33725.
Thay đổi trong phiên bản 3.14: Trên nền tảng POSIX, phương thức khởi động mặc định đã được thay đổi từ fork thành forkserver để duy trì hiệu suất nhưng tránh tình trạng không tương thích chung của quy trình đa luồng. Xem gh-84559.
Trên POSIX sử dụng các phương thức khởi động spawn hoặc forkserver cũng sẽ bắt đầu một quy trình resource tracker để theo dõi các tài nguyên hệ thống được đặt tên chưa được liên kết (chẳng hạn như các ngữ nghĩa có tên hoặc các đối tượng SharedMemory) được tạo bởi các quy trình của chương trình. Khi tất cả các quy trình đã thoát, trình theo dõi tài nguyên sẽ hủy liên kết mọi đối tượng được theo dõi còn lại. Thông thường sẽ không có, nhưng nếu một quá trình bị giết bởi tín hiệu thì có thể có một số tài nguyên "bị rò rỉ". (Cả các đèn hiệu bị rò rỉ lẫn các phân đoạn bộ nhớ dùng chung sẽ không tự động bị hủy liên kết cho đến lần khởi động lại tiếp theo. Đây là vấn đề đối với cả hai đối tượng vì hệ thống chỉ cho phép một số lượng hạn chế các đèn hiệu được đặt tên và các phân đoạn bộ nhớ dùng chung sẽ chiếm một số dung lượng trong bộ nhớ chính.)
Để chọn phương thức bắt đầu, bạn sử dụng set_start_method() trong mệnh đề if __name__ == '__main__' của mô-đun chính. Ví dụ:
nhập đa xử lý dưới dạng mp
def foo(q):
q.put('xin chào')
nếu __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
in(q.get())
p.join()
set_start_method() không nên được sử dụng nhiều lần trong chương trình.
Ngoài ra, bạn có thể sử dụng get_context() để lấy đối tượng ngữ cảnh. Các đối tượng bối cảnh có cùng API với mô-đun đa xử lý và cho phép một người sử dụng nhiều phương thức bắt đầu trong cùng một chương trình.
nhập đa xử lý dưới dạng mp
def foo(q):
q.put('xin chào')
nếu __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
in(q.get())
p.join()
Lưu ý rằng các đối tượng liên quan đến một ngữ cảnh có thể không tương thích với các quy trình cho một ngữ cảnh khác. Đặc biệt, các khóa được tạo bằng ngữ cảnh fork không thể được chuyển tới các tiến trình được bắt đầu bằng phương thức bắt đầu spawn hoặc forkserver.
Các thư viện sử dụng multiprocessing hoặc ProcessPoolExecutor phải được thiết kế để cho phép người dùng cung cấp bối cảnh đa xử lý của riêng họ. Việc sử dụng ngữ cảnh cụ thể của riêng bạn trong thư viện có thể dẫn đến sự không tương thích với phần còn lại của ứng dụng của người dùng thư viện. Luôn ghi lại nếu thư viện của bạn yêu cầu một phương pháp bắt đầu cụ thể.
Cảnh báo
Các phương thức khởi động 'spawn' và 'forkserver' thường không thể được sử dụng với các tệp thực thi "đóng băng" (tức là các tệp nhị phân được tạo bởi các gói như PyInstaller và cx_Freeze) trên hệ thống POSIX. Phương thức bắt đầu 'fork' có thể hoạt động nếu mã không sử dụng luồng.
Trao đổi đối tượng giữa các tiến trình¶
multiprocessing hỗ trợ hai loại kênh liên lạc giữa các tiến trình:
Queues
Lớp
Queuegần như là một bản sao củaqueue.Queue. Ví dụ:từ quá trình nhập đa xử lý, hàng đợi định nghĩa f(q): q.put([42, Không, 'xin chào']) nếu __name__ == '__main__': q = Hàng đợi() p = Quá trình(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, Không có, 'xin chào']" p.join()Hàng đợi là luồng và quy trình an toàn. Bất kỳ đối tượng nào được đưa vào hàng đợi
multiprocessingsẽ được tuần tự hóa.
Pipes
Hàm
Pipe()trả về một cặp đối tượng kết nối được kết nối bằng một đường ống theo mặc định là song công (hai chiều). Ví dụ:từ quy trình nhập đa xử lý, đường ống định nghĩa f(conn): conn.send([42, Không có, 'xin chào']) conn.close() nếu __name__ == '__main__': parent_conn, child_conn = Pipe() p = Quá trình(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, Không có, 'xin chào']" p.join()Hai đối tượng kết nối được trả về bởi
Pipe()đại diện cho hai đầu của đường ống. Mỗi đối tượng kết nối có các phương thứcsend()vàrecv()(trong số các phương thức khác). Lưu ý rằng dữ liệu trong một đường ống có thể bị hỏng nếu hai tiến trình (hoặc luồng) cố gắng đọc hoặc ghi vào đầu same của đường ống cùng một lúc. Tất nhiên là không có nguy cơ hỏng hóc từ các quy trình sử dụng các đầu ống khác nhau cùng một lúc.Phương thức
send()tuần tự hóa đối tượng vàrecv()tạo lại đối tượng.
Đồng bộ hóa giữa các tiến trình¶
multiprocessing chứa các phần tương đương của tất cả các nguyên hàm đồng bộ hóa từ threading. Chẳng hạn, người ta có thể sử dụng khóa để đảm bảo rằng mỗi lần chỉ có một quy trình được in ra đầu ra tiêu chuẩn
từ quá trình nhập đa xử lý, Khóa
def f(l, i):
l.acquire()
thử:
print('xin chào thế giới', i)
cuối cùng:
l.release()
nếu __name__ == '__main__':
khóa = Khóa()
cho số trong phạm vi (10):
Process(target=f, args=(lock, num)).start()
Nếu không sử dụng đầu ra khóa từ các quy trình khác nhau thì mọi thứ có thể bị lẫn lộn.
Sử dụng đội ngũ công nhân¶
Lớp Pool đại diện cho một nhóm các quy trình công nhân. Nó có các phương thức cho phép chuyển các tác vụ sang quy trình công nhân theo một số cách khác nhau.
Ví dụ:
từ nhóm nhập đa xử lý, TimeoutError
thời gian nhập khẩu
hệ điều hành nhập khẩu
định nghĩa f(x):
trả lại x*x
nếu __name__ == '__main__':
# start 4 quy trình công nhân
với Pool(processes=4) là nhóm:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print các số giống nhau theo thứ tự tùy ý
cho tôi trong pool.imap_unordered(f, range(10)):
in(i)
# evaluate "f(20)" không đồng bộ
res = pool.apply_async(f, (20,)) # runs trong *only* một quy trình
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" không đồng bộ
res = pool.apply_async(os.getpid, ()) # runs trong *only* một quy trình
print(res.get(timeout=1)) # prints PID của quá trình đó
# launching nhiều đánh giá không đồng bộ *may* sử dụng nhiều quy trình hơn
multiple_results = [pool.apply_async(os.getpid, ()) cho tôi trong phạm vi (4)]
print([res.get(timeout=1) cho độ phân giải trong multiple_results])
# make một công nhân ngủ trong 10 giây
res = pool.apply_async(time.sleep, (10,))
thử:
print(res.get(timeout=1))
ngoại trừ Hết thời gianLỗi:
print("Chúng tôi thiếu kiên nhẫn và gặp phải lỗi multiprocessing.TimeoutError")
print("Hiện tại, nhóm vẫn sẵn sàng để làm việc thêm")
# exiting khối 'with' đã dừng nhóm
print("Bây giờ nhóm đã đóng và không còn tồn tại nữa")
Lưu ý rằng các phương thức của một nhóm chỉ được sử dụng bởi quá trình đã tạo ra nó.
Ghi chú
Chức năng trong gói này yêu cầu trẻ em có thể nhập mô-đun __main__. Điều này được đề cập trong Hướng dẫn lập trình tuy nhiên nó đáng được chỉ ra ở đây. Điều này có nghĩa là một số ví dụ, chẳng hạn như ví dụ multiprocessing.pool.Pool sẽ không hoạt động trong trình thông dịch tương tác. Ví dụ:
>>> từ nhóm nhập đa xử lý
>>> p = Nhóm(5)
>>> định nghĩa f(x):
... trả về x*x
...
>>> với p:
... p.map(f, [1,2,3])
Xử lý PoolWorker-1:
Xử lý PoolWorker-2:
Xử lý PoolWorker-3:
Traceback (cuộc gọi gần đây nhất):
Traceback (cuộc gọi gần đây nhất):
Traceback (cuộc gọi gần đây nhất):
AttributionError: Không thể lấy thuộc tính 'f' trên <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributionError: Không thể lấy thuộc tính 'f' trên <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributionError: Không thể lấy thuộc tính 'f' trên <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(Nếu bạn thử cách này, nó thực sự sẽ xuất ra ba dấu vết đầy đủ được xen kẽ theo kiểu bán ngẫu nhiên và sau đó bạn có thể phải dừng quá trình gốc bằng cách nào đó.)
Tài liệu tham khảo¶
Gói multiprocessing chủ yếu sao chép API của mô-đun threading.
Phương pháp bắt đầu toàn cầu¶
Python hỗ trợ một số cách để tạo và khởi tạo một tiến trình. Phương thức bắt đầu toàn cục đặt cơ chế mặc định để tạo một quy trình.
Một số hàm và phương thức đa xử lý cũng có thể khởi tạo một số đối tượng nhất định sẽ ngầm đặt phương thức khởi động chung thành mặc định của hệ thống, nếu nó chưa được đặt. Phương thức bắt đầu chung chỉ có thể được đặt một lần. Nếu cần thay đổi phương thức bắt đầu từ mặc định của hệ thống, bạn phải chủ động đặt phương thức bắt đầu chung trước khi gọi các hàm hoặc phương thức hoặc tạo các đối tượng này.
Process và ngoại lệ¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
Các đối tượng quy trình đại diện cho hoạt động được chạy trong một quy trình riêng biệt. Lớp
Processcó các phương thức tương đương với tất cả các phương thức củathreading.Thread.Hàm tạo phải luôn được gọi với các đối số từ khóa. group phải luôn là
None; nó tồn tại chỉ để tương thích vớithreading.Thread. target là đối tượng có thể gọi được bằng phương thứcrun(). Nó mặc định làNone, nghĩa là không có gì được gọi. name là tên quy trình (xemnameđể biết thêm chi tiết). args là bộ đối số cho lệnh gọi mục tiêu. kwargs là từ điển các đối số từ khóa cho lệnh gọi mục tiêu. Nếu được cung cấp, đối số daemon chỉ từ khóa sẽ đặt cờdaemoncủa quá trình thànhTruehoặcFalse. NếuNone(mặc định), cờ này sẽ được kế thừa từ quá trình tạo.Theo mặc định, không có đối số nào được truyền tới target. Đối số args, mặc định là
(), có thể được sử dụng để chỉ định một danh sách hoặc bộ đối số cần chuyển tới target.Nếu một lớp con ghi đè hàm tạo, nó phải đảm bảo rằng nó gọi hàm tạo của lớp cơ sở (
super().__init__()) trước khi thực hiện bất kỳ điều gì khác với quy trình.Ghi chú
Nói chung, tất cả các đối số cho
Processđều phải có thể chọn được. Điều này thường được quan sát thấy khi cố gắng tạoProcesshoặc sử dụngconcurrent.futures.ProcessPoolExecutortừ REPL với hàm target được xác định cục bộ.Việc truyền một đối tượng có thể gọi được xác định trong phiên REPL hiện tại sẽ khiến tiến trình con ngừng hoạt động thông qua một ngoại lệ
AttributeErrorchưa được nắm bắt khi bắt đầu vì target phải được xác định trong một mô-đun có thể nhập để được tải trong quá trình giải nén.Ví dụ về lỗi không thể khắc phục này của trẻ:
>>> nhập đa xử lý dưới dạng mp >>> def knigit(): ... print("Ni!") ... >>> tiến trình = mp.Process(target=knigit) >>> quá trình.start() >>> Traceback (cuộc gọi gần đây nhất): Tệp ".../multiprocessing/spawn.py", dòng ..., trong spawn_main Tệp ".../multiprocessing/spawn.py", dòng ..., trong _main AttributionError: mô-đun '__main__' không có thuộc tính 'knigit' >>> quá trình <SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 đã dừng exitcode=1>
Xem Phương thức khởi động spawn và forkserver. Mặc dù hạn chế này không đúng nếu sử dụng phương thức khởi động
"fork", nhưng đối với Python3.14, nó không còn là mặc định trên bất kỳ nền tảng nào. Xem Bối cảnh và phương pháp bắt đầu. Xem thêm gh-132898.Thay đổi trong phiên bản 3.3: Đã thêm tham số daemon.
- run()¶
Phương thức biểu diễn hoạt động của tiến trình.
Bạn có thể ghi đè phương thức này trong một lớp con. Phương thức
run()tiêu chuẩn gọi đối tượng có thể gọi được truyền tới hàm tạo của đối tượng làm đối số đích, nếu có, với các đối số tuần tự và từ khóa được lấy từ các đối số args và kwargs tương ứng.Sử dụng danh sách hoặc bộ dữ liệu làm đối số args được truyền cho
Processsẽ đạt được hiệu quả tương tự.Ví dụ:
>>> từ quá trình nhập khẩu đa xử lý >>> p = Quá trình(target=print, args=[1]) >>> p.run() 1 >>> p = Quá trình(target=print, args=(1,)) >>> p.run() 1
- start()¶
Bắt đầu hoạt động của quá trình.
Điều này phải được gọi nhiều nhất một lần cho mỗi đối tượng tiến trình. Nó sắp xếp để phương thức
run()của đối tượng được gọi trong một quy trình riêng biệt.
- join([timeout])¶
Nếu đối số tùy chọn timeout là
None(mặc định), phương thức sẽ chặn cho đến khi quá trình có phương thứcjoin()được gọi kết thúc. Nếu timeout là số dương thì nó sẽ chặn tối đa timeout giây. Lưu ý rằng phương thức trả vềNonenếu quá trình của nó kết thúc hoặc nếu phương thức hết thời gian chờ. Kiểm traexitcodecủa tiến trình để xác định xem nó có kết thúc hay không.Một tiến trình có thể được nối nhiều lần.
Một tiến trình không thể tự tham gia vì điều này sẽ gây ra bế tắc. Sẽ là lỗi khi cố gắng tham gia một tiến trình trước khi nó được bắt đầu.
- name¶
Tên của quá trình. Tên là một chuỗi chỉ được sử dụng cho mục đích nhận dạng. Nó không có ngữ nghĩa. Nhiều quy trình có thể được đặt cùng tên.
Tên ban đầu được đặt bởi hàm tạo. Nếu không có tên rõ ràng nào được cung cấp cho hàm tạo, thì tên có dạng 'Process-N1:N2:...:Nk' sẽ được tạo, trong đó mỗi Nk là con thứ N của cha mẹ nó.
- is_alive()¶
Trả về liệu quá trình có còn hoạt động hay không.
Đại khái, một đối tượng tiến trình vẫn tồn tại kể từ thời điểm phương thức
start()trả về cho đến khi tiến trình con kết thúc.
- daemon¶
Cờ daemon của tiến trình, một giá trị Boolean. Điều này phải được đặt trước khi
start()được gọi.Giá trị ban đầu được kế thừa từ quá trình tạo.
Khi một tiến trình thoát ra, nó sẽ cố gắng chấm dứt tất cả các tiến trình con daemon của nó.
Lưu ý rằng quy trình daemon không được phép tạo quy trình con. Nếu không, một tiến trình daemon sẽ mồ côi các tiến trình con của nó nếu nó bị chấm dứt khi tiến trình cha của nó thoát ra. Ngoài ra, đây là các daemon hoặc dịch vụ Unix của not, chúng là các tiến trình thông thường sẽ bị chấm dứt (và không được tham gia) nếu các tiến trình không phải daemon đã thoát.
Ngoài
threading.ThreadAPI, các đối tượngProcesscòn hỗ trợ các thuộc tính và phương thức sau:- pid¶
Trả về ID tiến trình. Trước khi quá trình được sinh ra, đây sẽ là
None.
- exitcode¶
Mã thoát của trẻ. Đây sẽ là
Nonenếu quá trình này chưa kết thúc.Nếu phương thức
run()của trẻ trả về bình thường thì mã thoát sẽ là 0. Nếu nó kết thúc thông quasys.exit()với đối số nguyên N thì mã thoát sẽ là N.Nếu phần tử con bị chấm dứt do ngoại lệ không nằm trong
run(), mã thoát sẽ là 1. Nếu nó bị chấm dứt bởi tín hiệu N, mã thoát sẽ có giá trị âm -N.
- authkey¶
Khóa xác thực của quy trình (một chuỗi byte).
Khi
multiprocessingđược khởi tạo, quy trình chính được gán một chuỗi ngẫu nhiên bằngos.urandom().Khi một đối tượng
Processđược tạo, nó sẽ kế thừa khóa xác thực của tiến trình gốc, mặc dù điều này có thể được thay đổi bằng cách đặtauthkeythành một chuỗi byte khác.Xem Khóa xác thực.
- sentinel¶
Một bộ điều khiển số của một đối tượng hệ thống sẽ trở nên "sẵn sàng" khi quá trình kết thúc.
Bạn có thể sử dụng giá trị này nếu bạn muốn đợi nhiều sự kiện cùng lúc bằng
multiprocessing.connection.wait(). Nếu không thì gọijoin()sẽ đơn giản hơn.Trên Windows, đây là một hệ điều hành có thể sử dụng được với dòng lệnh gọi
WaitForSingleObjectvàWaitForMultipleObjectsAPI. Trên POSIX, đây là bộ mô tả tệp có thể sử dụng được với các phần gốc từ mô-đunselect.Added in version 3.3.
- interrupt()¶
Chấm dứt quá trình. Hoạt động trên POSIX bằng tín hiệu
SIGINT. Hành vi trên Windows không được xác định.Theo mặc định, điều này chấm dứt tiến trình con bằng cách tăng
KeyboardInterrupt. Hành vi này có thể được thay đổi bằng cách đặt trình xử lý tín hiệu tương ứng trong tiến trình consignal.signal()choSIGINT.Lưu ý: nếu tiến trình con bắt và loại bỏ
KeyboardInterrupt, tiến trình sẽ không bị chấm dứt.Lưu ý: hành vi mặc định cũng sẽ đặt
exitcodethành1như thể một ngoại lệ chưa được phát hiện đã được đưa ra trong tiến trình con. Để có mộtexitcodekhác, bạn có thể chỉ cần bắtKeyboardInterruptvà gọiexit(your_code).Added in version 3.14.
- terminate()¶
Chấm dứt quá trình. Trên POSIX, việc này được thực hiện bằng tín hiệu
SIGTERM; trên WindowsTerminateProcess()được sử dụng. Lưu ý rằng các trình xử lý thoát và các mệnh đề cuối cùng, v.v., sẽ không được thực thi.Lưu ý rằng các tiến trình con cháu của tiến trình này sẽ not bị chấm dứt -- chúng sẽ đơn giản trở thành mồ côi.
Cảnh báo
Nếu phương pháp này được sử dụng khi quy trình liên quan đang sử dụng một đường ống hoặc hàng đợi thì đường ống hoặc hàng đợi đó có thể bị hỏng và có thể trở nên không thể sử dụng được bởi quy trình khác. Tương tự, nếu tiến trình đã nhận được khóa hoặc đèn hiệu, v.v. thì việc chấm dứt nó có thể khiến các tiến trình khác bị bế tắc.
- kill()¶
Tương tự như
terminate()nhưng sử dụng tín hiệuSIGKILLtrên POSIX.Added in version 3.7.
- close()¶
Đóng đối tượng
Process, giải phóng tất cả tài nguyên liên quan đến nó.ValueErrorđược nâng lên nếu tiến trình cơ bản vẫn đang chạy. Khiclose()trả về thành công, hầu hết các phương thức và thuộc tính khác của đối tượngProcesssẽ tăngValueError.Added in version 3.7.
Lưu ý rằng các phương thức
start(),join(),is_alive(),terminate()vàexitcodechỉ nên được gọi bởi tiến trình đã tạo đối tượng tiến trình.Ví dụ sử dụng một số phương thức của
Process:>>> nhập đa xử lý, thời gian, tín hiệu >>> mp_context = multiprocessing.get_context('spawn') >>> p = mp_context.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <...Quy trình ... ban đầu> Sai >>> p.start() >>> print(p, p.is_alive()) <...Quá trình ... đã bắt đầu> Đúng >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <...Quá trình ... đã dừng exitcode=-SIGTERM> Sai >>> p.exitcode == -signal.SIGTERM đúng
- exception multiprocessing.ProcessError¶
Lớp cơ sở của tất cả các ngoại lệ
multiprocessing.
- exception multiprocessing.BufferTooShort¶
Ngoại lệ do
Connection.recv_bytes_into()đưa ra khi đối tượng bộ đệm được cung cấp quá nhỏ để đọc được thông báo.Nếu
elà một phiên bản củaBufferTooShortthìe.args[0]sẽ đưa ra thông báo dưới dạng chuỗi byte.
- exception multiprocessing.AuthenticationError¶
Xảy ra khi có lỗi xác thực.
- exception multiprocessing.TimeoutError¶
Được tăng lên bằng các phương thức có thời gian chờ khi hết thời gian chờ.
Đường ống và hàng đợi¶
Khi sử dụng nhiều quy trình, người ta thường sử dụng thông báo truyền đi để liên lạc giữa các quy trình và tránh phải sử dụng bất kỳ nguyên tắc đồng bộ hóa nào như khóa.
Để truyền tin nhắn, người ta có thể sử dụng Pipe() (để kết nối giữa hai quy trình) hoặc hàng đợi (cho phép nhiều nhà sản xuất và người tiêu dùng).
Các loại Queue, SimpleQueue và JoinableQueue là các hàng đợi FIFO đa nhà sản xuất, nhiều người tiêu dùng được mô hình hóa trên lớp queue.Queue trong thư viện chuẩn. Chúng khác nhau ở chỗ Queue thiếu các phương thức task_done() và join() được giới thiệu trong lớp queue.Queue của Python 2.5.
Nếu bạn sử dụng JoinableQueue thì must gọi JoinableQueue.task_done() cho mỗi tác vụ được xóa khỏi hàng đợi, nếu không, semaphore được sử dụng để đếm số lượng tác vụ chưa hoàn thành cuối cùng có thể bị tràn, gây ra ngoại lệ.
Một điểm khác biệt so với các triển khai hàng đợi Python khác là hàng đợi multiprocessing tuần tự hóa tất cả các đối tượng được đưa vào chúng bằng pickle. Đối tượng được trả về bởi phương thức get là đối tượng được tạo lại không chia sẻ bộ nhớ với đối tượng ban đầu.
Lưu ý rằng người ta cũng có thể tạo hàng đợi được chia sẻ bằng cách sử dụng đối tượng người quản lý -- xem Người quản lý.
Ghi chú
multiprocessing sử dụng các ngoại lệ queue.Empty và queue.Full thông thường để báo hiệu thời gian chờ. Chúng không có sẵn trong không gian tên multiprocessing nên bạn cần nhập chúng từ queue.
Ghi chú
Khi một đối tượng được đặt vào hàng đợi, đối tượng đó sẽ được chọn và một luồng nền sau đó sẽ chuyển dữ liệu đã được chọn vào một đường ống bên dưới. Điều này gây ra một số hậu quả hơi đáng ngạc nhiên nhưng sẽ không gây ra bất kỳ khó khăn thực tế nào -- nếu chúng thực sự làm phiền bạn thì thay vào đó bạn có thể sử dụng hàng đợi được tạo bằng manager.
Sau khi đặt một đối tượng vào hàng đợi trống, có thể có độ trễ vô cùng nhỏ trước khi phương thức
empty()của hàng đợi trả vềFalsevàget_nowait()có thể trả về mà không cần tăngqueue.Empty.Nếu có nhiều tiến trình đang xếp các đối tượng vào hàng đợi thì các đối tượng có thể được nhận ở đầu bên kia không theo thứ tự. Tuy nhiên, các đối tượng được sắp xếp theo cùng một quy trình sẽ luôn ở thứ tự mong đợi đối với nhau.
Cảnh báo
Nếu một tiến trình bị hủy khi sử dụng Process.terminate() hoặc os.kill() trong khi nó đang cố gắng sử dụng Queue thì dữ liệu trong hàng đợi có thể bị hỏng. Điều này có thể khiến bất kỳ tiến trình nào khác gặp ngoại lệ khi nó cố gắng sử dụng hàng đợi sau này.
Cảnh báo
Như đã đề cập ở trên, nếu một tiến trình con đã đặt các mục vào hàng đợi (và nó chưa sử dụng JoinableQueue.cancel_join_thread), thì quá trình đó sẽ không kết thúc cho đến khi tất cả các mục được lưu vào bộ đệm đã được chuyển vào đường ống.
Điều này có nghĩa là nếu bạn thử tham gia quá trình đó, bạn có thể gặp bế tắc trừ khi bạn chắc chắn rằng tất cả các mục được đưa vào hàng đợi đã được tiêu thụ. Tương tự, nếu tiến trình con không phải daemon thì tiến trình cha có thể bị treo khi thoát khi nó cố gắng tham gia tất cả các tiến trình con không phải daemon.
Lưu ý rằng hàng đợi được tạo bằng trình quản lý không gặp phải vấn đề này. Xem Hướng dẫn lập trình.
Để biết ví dụ về cách sử dụng hàng đợi để liên lạc giữa các quá trình, hãy xem Ví dụ.
- multiprocessing.Pipe(duplex=True)¶
Trả về một cặp
(conn1, conn2)gồm các đối tượngConnectionđại diện cho các đầu của một đường ống.Nếu duplex là
True(mặc định) thì đường ống là hai chiều. Nếu duplex làFalsethì đường ống là một chiều:conn1chỉ có thể được sử dụng để nhận tin nhắn vàconn2chỉ có thể được sử dụng để gửi tin nhắn.Phương thức
send()tuần tự hóa đối tượng bằngpicklevàrecv()tạo lại đối tượng.
- class multiprocessing.Queue([maxsize])¶
Trả về một hàng đợi chia sẻ quy trình được triển khai bằng cách sử dụng một ống dẫn và một vài khóa/semaphores. Khi một tiến trình lần đầu tiên đặt một mục vào hàng đợi, một luồng cấp dữ liệu sẽ được khởi động để chuyển các đối tượng từ bộ đệm vào đường ống.
Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
Các ngoại lệ
queue.Emptyvàqueue.Fullthông thường từ mô-đunqueuecủa thư viện chuẩn được nâng lên để báo hiệu thời gian chờ.Queuethực hiện tất cả các phương thức củaqueue.Queuengoại trừtask_done(),join()vàshutdown().- qsize()¶
Trả về kích thước gần đúng của hàng đợi. Do ngữ nghĩa đa luồng/đa xử lý nên con số này không đáng tin cậy.
Lưu ý rằng điều này có thể tăng
NotImplementedErrortrên các nền tảng như macOS nơisem_getvalue()không được triển khai.
- empty()¶
Trả về
Truenếu hàng đợi trống,Falsenếu không. Do ngữ nghĩa đa luồng/đa xử lý, điều này không đáng tin cậy.Có thể tăng
OSErrortrên hàng đợi đã đóng. (không được đảm bảo)
- full()¶
Trả về
Truenếu hàng đợi đầy, nếu không thì trả vềFalse. Do ngữ nghĩa đa luồng/đa xử lý, điều này không đáng tin cậy.
- put(obj[, block[, timeout]])¶
Đặt obj vào hàng đợi. Nếu đối số tùy chọn block là
True(mặc định) và timeout làNone(mặc định), hãy chặn nếu cần cho đến khi có chỗ trống. Nếu timeout là số dương, nó sẽ chặn tối đa timeout giây và tăng ngoại lệqueue.Fullnếu không có chỗ trống nào trong thời gian đó. Ngược lại (block làFalse), hãy đặt một mục vào hàng đợi nếu ngay lập tức có một vị trí trống, nếu không thì sẽ đưa ra ngoại lệqueue.Full(timeout bị bỏ qua trong trường hợp đó).Thay đổi trong phiên bản 3.8: Nếu hàng đợi bị đóng,
ValueErrorsẽ được nâng lên thay vìAssertionError.
- put_nowait(obj)¶
Tương đương với
put(obj, False).
- get([block[, timeout]])¶
Xóa và trả lại một mục từ hàng đợi. Nếu đối số tùy chọn block là
True(mặc định) và timeout làNone(mặc định), hãy chặn nếu cần thiết cho đến khi có một mục. Nếu timeout là số dương, nó sẽ chặn tối đa timeout giây và tăng ngoại lệqueue.Emptynếu không có mục nào trong thời gian đó. Ngược lại (khối làFalse), trả lại một mục nếu có sẵn ngay lập tức, nếu không thì đưa ra ngoại lệqueue.Empty(timeout bị bỏ qua trong trường hợp đó).Thay đổi trong phiên bản 3.8: Nếu hàng đợi bị đóng,
ValueErrorsẽ được nâng lên thay vìOSError.
- get_nowait()¶
Tương đương với
get(False).
multiprocessing.Queuecó một số phương thức bổ sung không có trongqueue.Queue. Những phương pháp này thường không cần thiết đối với hầu hết các mã:- close()¶
Đóng hàng đợi: giải phóng tài nguyên nội bộ.
Hàng đợi không được sử dụng nữa sau khi nó bị đóng. Ví dụ: các phương thức
get(),put()vàempty()không còn được gọi nữa.Chuỗi nền sẽ thoát khi nó đã xóa tất cả dữ liệu được lưu vào bộ đệm vào đường ống. Điều này được gọi tự động khi hàng đợi được thu gom rác.
- join_thread()¶
Tham gia chủ đề nền. Điều này chỉ có thể được sử dụng sau khi
close()được gọi. Nó chặn cho đến khi luồng nền thoát ra, đảm bảo rằng tất cả dữ liệu trong bộ đệm đã được chuyển vào đường ống.Theo mặc định, nếu một tiến trình không phải là người tạo hàng đợi thì khi thoát, nó sẽ cố gắng tham gia luồng nền của hàng đợi. Quá trình này có thể gọi
cancel_join_thread()để khiếnjoin_thread()không làm gì cả.
- cancel_join_thread()¶
Ngăn chặn
join_thread()chặn. Đặc biệt, điều này ngăn không cho luồng nền tự động được nối khi quá trình thoát - xemjoin_thread().Tên hay hơn cho phương pháp này có thể là
allow_exit_without_flush(). Nó có thể khiến dữ liệu trong hàng đợi bị mất và bạn gần như chắc chắn sẽ không cần sử dụng đến nó. Nó thực sự chỉ ở đó nếu bạn cần quy trình hiện tại thoát ngay lập tức mà không cần chờ chuyển dữ liệu đã xếp hàng vào đường ống bên dưới và bạn không quan tâm đến dữ liệu bị mất.
Ghi chú
Chức năng của lớp này yêu cầu triển khai semaphore được chia sẻ hoạt động trên hệ điều hành máy chủ. Nếu không có, chức năng trong lớp này sẽ bị vô hiệu hóa và các nỗ lực khởi tạo
Queuesẽ dẫn đếnImportError. Xem bpo-3770 để biết thêm thông tin. Điều tương tự cũng đúng với bất kỳ loại hàng đợi chuyên biệt nào được liệt kê bên dưới.
- class multiprocessing.SimpleQueue¶
Nó là loại
Queueđược đơn giản hóa, rất gần với loạiPipebị khóa.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
- close()¶
Đóng hàng đợi: giải phóng tài nguyên nội bộ.
Hàng đợi không được sử dụng nữa sau khi nó bị đóng. Ví dụ: các phương thức
get(),put()vàempty()không còn được gọi nữa.Added in version 3.9.
- empty()¶
Trả về
Truenếu hàng đợi trống,Falsenếu không.Luôn tăng
OSErrornếu SimpleQueue bị đóng.
- get()¶
Xóa và trả lại một mục từ hàng đợi.
- put(item)¶
Đặt item vào hàng đợi.
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue, một lớp conQueue, là một hàng đợi có thêm các phương thứctask_done()vàjoin().Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
- task_done()¶
Cho biết rằng một nhiệm vụ được xếp hàng trước đó đã hoàn thành. Được sử dụng bởi người tiêu dùng xếp hàng. Đối với mỗi
get()được sử dụng để tìm nạp một tác vụ, một lệnh gọi tiếp theo tớitask_done()sẽ thông báo cho hàng đợi rằng quá trình xử lý tác vụ đã hoàn tất.Nếu một
join()hiện đang bị chặn thì nó sẽ tiếp tục lại khi tất cả các mục đã được xử lý (có nghĩa là đã nhận được lệnh gọitask_done()cho mọi mục đã đượcput()đưa vào hàng đợi).Tăng
ValueErrornếu được gọi nhiều lần hơn số mục được đặt trong hàng đợi.
- join()¶
Chặn cho đến khi tất cả các mục trong hàng đợi đã được nhận và xử lý.
Số lượng nhiệm vụ chưa hoàn thành sẽ tăng lên bất cứ khi nào một mục được thêm vào hàng đợi. Số lượng giảm xuống bất cứ khi nào người tiêu dùng gọi
task_done()để cho biết rằng mục đó đã được truy xuất và mọi công việc trên đó đã hoàn tất. Khi số lượng nhiệm vụ chưa hoàn thành giảm xuống 0,join()sẽ mở khóa.
Linh tinh¶
- multiprocessing.active_children()¶
Trả về danh sách tất cả các tiến trình con còn sống của tiến trình hiện tại.
Việc gọi điều này có tác dụng phụ là "tham gia" bất kỳ quy trình nào đã kết thúc.
- multiprocessing.cpu_count()¶
Trả về số lượng CPU trong hệ thống.
Con số này không tương đương với số lượng CPU mà tiến trình hiện tại có thể sử dụng. Số lượng CPU có thể sử dụng có thể được lấy bằng
os.process_cpu_count()(hoặclen(os.sched_getaffinity(0))).Khi không thể xác định được số lượng CPU,
NotImplementedErrorsẽ được nâng lên.Xem thêm
Thay đổi trong phiên bản 3.13: Giá trị trả về cũng có thể được ghi đè bằng cách sử dụng cờ
-X cpu_counthoặcPYTHON_CPU_COUNTvì đây chỉ là một trình bao bọc xung quanh các API đếm CPUos.
- multiprocessing.current_process()¶
Trả về đối tượng
Processtương ứng với tiến trình hiện tại.Một sự tương tự của
threading.current_thread().
- multiprocessing.parent_process()¶
Trả về đối tượng
Processtương ứng với tiến trình gốc củacurrent_process(). Đối với quy trình chính,parent_processsẽ làNone.Added in version 3.8.
- multiprocessing.freeze_support()¶
Thêm hỗ trợ khi chương trình sử dụng
multiprocessingbị đóng băng để tạo tệp thực thi. (Đã được thử nghiệm với py2exe, PyInstaller và cx_Freeze.)Người ta cần gọi hàm này ngay sau dòng
if __name__ == '__main__'của mô-đun chính. Ví dụ:từ Quá trình nhập đa xử lý, đóng băng_support chắc chắn f(): print('xin chào thế giới!') nếu __name__ == '__main__': đóng băng_support() Quá trình(target=f).start()
Nếu dòng
freeze_support()bị bỏ qua thì việc cố gắng chạy tệp thực thi đã cố định sẽ tạo raRuntimeError.Gọi
freeze_support()không có hiệu lực khi phương thức bắt đầu không phải là spawn. Ngoài ra, nếu mô-đun đang được trình thông dịch Python chạy bình thường (chương trình chưa bị đóng băng) thìfreeze_support()không có hiệu lực.
- multiprocessing.get_all_start_methods()¶
Trả về danh sách các phương thức khởi động được hỗ trợ, phương thức đầu tiên là mặc định. Các phương thức bắt đầu có thể là
'fork','spawn'và'forkserver'. Không phải tất cả các nền tảng đều hỗ trợ tất cả các phương pháp. Xem Bối cảnh và phương pháp bắt đầu.Added in version 3.4.
- multiprocessing.get_context(method=None)¶
Trả về một đối tượng ngữ cảnh có cùng thuộc tính với mô-đun
multiprocessing.Nếu method là
Nonethì ngữ cảnh mặc định sẽ được trả về. Lưu ý rằng nếu phương thức khởi động chung chưa được đặt thì phương thức này sẽ đặt thành mặc định của hệ thống. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết. Nếu không thì method phải là'fork','spawn','forkserver'.ValueErrorđược nâng lên nếu phương thức bắt đầu được chỉ định không khả dụng. Xem Bối cảnh và phương pháp bắt đầu.Added in version 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
Trả về tên của phương thức bắt đầu được sử dụng để bắt đầu các tiến trình.
Nếu phương thức bắt đầu toàn cục không được đặt và allow_none là
Falsethì phương thức bắt đầu toàn cục được đặt thành mặc định và tên của nó sẽ được trả về. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.Giá trị trả về có thể là
'fork','spawn','forkserver'hoặcNone. Xem Bối cảnh và phương pháp bắt đầu.Added in version 3.4.
Thay đổi trong phiên bản 3.8: Trên macOS, phương thức khởi động spawn hiện là mặc định. Phương thức khởi động fork được coi là không an toàn vì nó có thể dẫn đến sự cố của quy trình con. Xem bpo-33725.
- multiprocessing.set_executable(executable)¶
Đặt đường dẫn của trình thông dịch Python để sử dụng khi bắt đầu một tiến trình con. (Theo mặc định
sys.executableđược sử dụng). Trình nhúng có thể sẽ cần phải làm một cái gì đó nhưset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
trước khi họ có thể tạo các tiến trình con.
Thay đổi trong phiên bản 3.4: Hiện được hỗ trợ trên POSIX khi sử dụng phương thức khởi động
'spawn'.Thay đổi trong phiên bản 3.11: Chấp nhận path-like object.
- multiprocessing.set_forkserver_preload(module_names)¶
Đặt danh sách tên mô-đun cho quy trình chính của máy chủ ngã ba để cố gắng nhập sao cho trạng thái đã nhập của chúng được kế thừa bởi các quy trình phân nhánh. Bất kỳ
ImportErrornào khi làm như vậy đều bị âm thầm bỏ qua. Điều này có thể được sử dụng như một biện pháp nâng cao hiệu suất để tránh công việc lặp lại trong mọi quy trình.Để tính năng này hoạt động, nó phải được gọi trước khi quá trình máy chủ phân nhánh được khởi chạy (trước khi tạo
Poolhoặc khởi độngProcess).Chỉ có ý nghĩa khi sử dụng phương thức khởi động
'forkserver'. Xem Bối cảnh và phương pháp bắt đầu.Added in version 3.4.
- multiprocessing.set_start_method(method, force=False)¶
Đặt phương thức sẽ được sử dụng để bắt đầu các tiến trình con. Đối số method có thể là
'fork','spawn'hoặc'forkserver'. TăngRuntimeErrornếu phương thức bắt đầu đã được đặt và force không phải làTrue. Nếu method làNonevà force làTruethì phương thức bắt đầu được đặt thànhNone. Nếu method làNonevà force làFalsethì ngữ cảnh được đặt thành ngữ cảnh mặc định.Lưu ý rằng điều này chỉ được gọi tối đa một lần và nó phải được bảo vệ bên trong mệnh đề
if __name__ == '__main__'của mô-đun chính.Xem Bối cảnh và phương pháp bắt đầu.
Added in version 3.4.
Ghi chú
multiprocessing không chứa các từ tương tự như threading.active_count(), threading.enumerate(), threading.settrace(), threading.setprofile(), threading.Timer hoặc threading.local.
Đối tượng kết nối¶
Các đối tượng kết nối cho phép gửi và nhận các đối tượng hoặc chuỗi có thể chọn được. Chúng có thể được coi là các ổ cắm kết nối hướng thông điệp.
Các đối tượng kết nối thường được tạo bằng Pipe -- xem thêm Người nghe và khách hàng.
- class multiprocessing.connection.Connection¶
- send(obj)¶
Gửi một đối tượng đến đầu bên kia của kết nối sẽ được đọc bằng
recv().Đối tượng phải có thể ngâm được. Dưa chua rất lớn (khoảng 32 MiB +, mặc dù tùy thuộc vào hệ điều hành) có thể gây ra ngoại lệ
ValueError.
- recv()¶
Trả về một đối tượng được gửi từ đầu bên kia của kết nối bằng
send(). Chặn cho đến khi có thứ gì đó để nhận. TăngEOFErrornếu không còn gì để nhận và đầu bên kia đã bị đóng.
- fileno()¶
Trả về bộ mô tả tệp hoặc bộ xử lý được kết nối sử dụng.
- close()¶
Đóng kết nối.
Điều này được gọi tự động khi kết nối được thu thập rác.
- poll([timeout])¶
Trả về xem có dữ liệu nào có sẵn để đọc hay không.
Nếu timeout không được chỉ định thì nó sẽ trả về ngay lập tức. Nếu timeout là một số thì số này chỉ định thời gian tối đa tính bằng giây để chặn. Nếu timeout là
Nonethì thời gian chờ vô hạn sẽ được sử dụng.Lưu ý rằng nhiều đối tượng kết nối có thể được thăm dò cùng một lúc bằng cách sử dụng
multiprocessing.connection.wait().
- send_bytes(buffer[, offset[, size]])¶
Gửi dữ liệu byte từ bytes-like object dưới dạng tin nhắn hoàn chỉnh.
Nếu offset được cung cấp thì dữ liệu sẽ được đọc từ vị trí đó trong buffer. Nếu size được cung cấp thì nhiều byte đó sẽ được đọc từ bộ đệm. Bộ đệm rất lớn (khoảng 32 MiB+, mặc dù tùy thuộc vào hệ điều hành) có thể tạo ra ngoại lệ
ValueError
- recv_bytes([maxlength])¶
Trả về thông báo đầy đủ về dữ liệu byte được gửi từ đầu bên kia của kết nối dưới dạng chuỗi. Chặn cho đến khi có thứ gì đó để nhận. Tăng
EOFErrornếu không còn gì để nhận và đầu bên kia đã đóng.Nếu maxlength được chỉ định và tin nhắn dài hơn maxlength thì
OSErrorsẽ được nâng lên và kết nối sẽ không thể đọc được nữa.
- recv_bytes_into(buffer[, offset])¶
Đọc vào buffer một thông báo đầy đủ về dữ liệu byte được gửi từ đầu bên kia của kết nối và trả về số byte trong tin nhắn. Chặn cho đến khi có thứ gì đó để nhận. Tăng
EOFErrornếu không còn gì để nhận và đầu bên kia đã bị đóng.buffer phải là bytes-like object có thể ghi được. Nếu offset được đưa ra thì thông báo sẽ được ghi vào bộ đệm từ vị trí đó. Offset phải là số nguyên không âm nhỏ hơn độ dài của buffer (tính bằng byte).
Nếu bộ đệm quá ngắn thì ngoại lệ
BufferTooShortsẽ xuất hiện và thông báo hoàn chỉnh sẽ có sẵn dưới dạnge.args[0]trong đóelà phiên bản ngoại lệ.
Thay đổi trong phiên bản 3.3: Bản thân các đối tượng kết nối giờ đây có thể được chuyển giữa các tiến trình bằng
Connection.send()vàConnection.recv().Các đối tượng kết nối hiện cũng hỗ trợ giao thức quản lý ngữ cảnh - xem Các loại trình quản lý bối cảnh.
__enter__()trả về đối tượng kết nối và__exit__()gọiclose().
Ví dụ:
>>> từ ống nhập khẩu đa xử lý
>>> a, b = Ống()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'xin chào', Không có]
>>> b.send_bytes(b'cảm ơn')
>>> a.recv_bytes()
b'cảm ơn'
>>> nhập mảng
>>> Array1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> đếm = b.recv_bytes_into(arr2)
>>> khẳng định số lượng == len(arr1) * arr1.itemsize
>>> mảng2
mảng('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Cảnh báo
Phương thức Connection.recv() tự động bỏ chọn dữ liệu mà nó nhận được. Đây có thể là một rủi ro bảo mật trừ khi bạn có thể tin tưởng vào quy trình đã gửi tin nhắn.
Do đó, trừ khi đối tượng kết nối được tạo bằng Pipe(), bạn chỉ nên sử dụng các phương thức recv() và send() sau khi thực hiện một số loại xác thực. Xem Khóa xác thực.
Cảnh báo
Nếu một quá trình bị hủy trong khi nó đang cố đọc hoặc ghi vào một đường ống thì dữ liệu trong đường ống có thể bị hỏng vì không thể chắc chắn ranh giới thông báo nằm ở đâu.
Đồng bộ hóa nguyên thủy¶
Nói chung, các nguyên hàm đồng bộ hóa không cần thiết trong chương trình đa xử lý như trong chương trình đa luồng. Xem tài liệu về mô-đun threading.
Lưu ý rằng người ta cũng có thể tạo các nguyên hàm đồng bộ hóa bằng cách sử dụng đối tượng trình quản lý -- xem Người quản lý.
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
Đối tượng rào cản: bản sao của
threading.Barrier.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
Added in version 3.3.
- class multiprocessing.BoundedSemaphore([value])¶
Đối tượng semaphore giới hạn: tương tự như
threading.BoundedSemaphore.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
Có một điểm khác biệt duy nhất so với phương thức tương tự gần giống của nó: đối số đầu tiên của phương thức
acquiređược đặt tên là block, giống vớiLock.acquire().- locked()¶
Trả về một boolean cho biết hiện tại đối tượng này có bị khóa hay không.
Added in version 3.14.
Ghi chú
Trên macOS, điều này không thể phân biệt được với
Semaphorevìsem_getvalue()không được triển khai trên nền tảng đó.
- class multiprocessing.Condition([lock])¶
Biến điều kiện: bí danh cho
threading.Condition.Nếu lock được chỉ định thì nó phải là đối tượng
LockhoặcRLocktừmultiprocessing.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
Thay đổi trong phiên bản 3.3: Phương thức
wait_for()đã được thêm vào.
- class multiprocessing.Event¶
Một bản sao của
threading.Event.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
- class multiprocessing.Lock¶
Đối tượng khóa không đệ quy: tương tự gần với
threading.Lock. Khi một tiến trình hoặc luồng đã có được khóa, các lần thử tiếp theo để lấy nó từ bất kỳ tiến trình hoặc luồng nào sẽ bị chặn cho đến khi nó được giải phóng; bất kỳ quá trình hoặc chủ đề có thể giải phóng nó. Các khái niệm và hành vi củathreading.Lockkhi áp dụng cho các luồng được sao chép ở đây trongmultiprocessing.Lockvì nó áp dụng cho các tiến trình hoặc luồng, ngoại trừ như đã lưu ý.Lưu ý rằng
Lockthực sự là một hàm xuất xưởng trả về một phiên bảnmultiprocessing.synchronize.Lockđược khởi tạo với ngữ cảnh mặc định.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
Lockhỗ trợ giao thức context manager và do đó có thể được sử dụng trong các câu lệnhwith.- acquire(block=True, timeout=None)¶
Có được một khóa, chặn hoặc không chặn.
Với đối số block được đặt thành
True(mặc định), lệnh gọi phương thức sẽ chặn cho đến khi khóa ở trạng thái mở khóa, sau đó đặt thành khóa và trả vềTrue. Lưu ý rằng tên của đối số đầu tiên này khác với tên trongthreading.Lock.acquire().Với đối số block được đặt thành
False, lệnh gọi phương thức sẽ không bị chặn. Nếu khóa hiện ở trạng thái khóa, hãy trả vềFalse; nếu không thì đặt khóa về trạng thái khóa và trả vềTrue.Khi được gọi với giá trị dấu phẩy động dương cho timeout, hãy chặn tối đa số giây được chỉ định bởi timeout miễn là không thể lấy được khóa. Các lệnh gọi có giá trị âm cho timeout tương đương với timeout bằng 0. Các lệnh gọi có giá trị timeout là
None(mặc định) sẽ đặt khoảng thời gian chờ thành vô hạn. Lưu ý rằng việc xử lý các giá trị âm hoặcNonecho timeout khác với hành vi được triển khai trongthreading.Lock.acquire(). Đối số timeout không có ý nghĩa thực tế nếu đối số block được đặt thànhFalsevà do đó bị bỏ qua. Trả vềTruenếu khóa đã được lấy hoặcFalsenếu hết thời gian chờ.
- release()¶
Mở khóa. Điều này có thể được gọi từ bất kỳ tiến trình hoặc luồng nào, không chỉ tiến trình hoặc luồng ban đầu có được khóa.
Hành vi tương tự như trong
threading.Lock.release()ngoại trừ việc khi được gọi trên một khóa đã mở khóa,ValueErrorsẽ được nâng lên.
- locked()¶
Trả về một boolean cho biết hiện tại đối tượng này có bị khóa hay không.
Added in version 3.14.
- class multiprocessing.RLock¶
Đối tượng khóa đệ quy: tương tự như
threading.RLock. Khóa đệ quy phải được giải phóng bởi tiến trình hoặc luồng đã thu được nó. Khi một tiến trình hoặc luồng đã có được khóa đệ quy, thì tiến trình hoặc luồng tương tự có thể lấy lại khóa đó mà không bị chặn; quá trình hoặc luồng đó phải giải phóng nó một lần cho mỗi lần nó được mua lại.Lưu ý rằng
RLockthực sự là một hàm xuất xưởng trả về một phiên bảnmultiprocessing.synchronize.RLockđược khởi tạo với ngữ cảnh mặc định.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
RLockhỗ trợ giao thức context manager và do đó có thể được sử dụng trong các câu lệnhwith.- acquire(block=True, timeout=None)¶
Có được một khóa, chặn hoặc không chặn.
Khi được gọi với đối số block được đặt thành
True, hãy chặn cho đến khi khóa ở trạng thái mở khóa (không thuộc sở hữu của bất kỳ quy trình hoặc luồng nào) trừ khi khóa đã được sở hữu bởi quy trình hoặc luồng hiện tại. Sau đó, quy trình hoặc luồng hiện tại sẽ có quyền sở hữu khóa (nếu nó chưa có quyền sở hữu) và mức đệ quy bên trong khóa sẽ tăng lên một, dẫn đến giá trị trả về làTrue. Lưu ý rằng có một số điểm khác biệt trong hành vi của đối số đầu tiên này so với việc triển khaithreading.RLock.acquire(), bắt đầu bằng tên của chính đối số đó.Khi được gọi với đối số block được đặt thành
False, đừng chặn. Nếu khóa đã được một tiến trình hoặc luồng khác mua lại (và do đó được sở hữu), thì tiến trình hoặc luồng hiện tại sẽ không có quyền sở hữu và mức đệ quy trong khóa không thay đổi, dẫn đến giá trị trả về làFalse. Nếu khóa ở trạng thái mở khóa, quy trình hoặc luồng hiện tại sẽ có quyền sở hữu và mức đệ quy sẽ tăng lên, dẫn đến giá trị trả về làTrue.Cách sử dụng và hành vi của đối số timeout cũng giống như trong
Lock.acquire(). Lưu ý rằng một số hành vi này của timeout khác với các hành vi được triển khai trongthreading.RLock.acquire().
- release()¶
Nhả khóa, giảm mức đệ quy. Nếu sau khi giảm mức đệ quy bằng 0, hãy đặt lại khóa thành mở khóa (không thuộc sở hữu của bất kỳ quy trình hoặc luồng nào) và nếu bất kỳ quy trình hoặc luồng nào khác bị chặn khi chờ khóa được mở khóa, hãy cho phép chính xác một trong số chúng tiếp tục. Nếu sau khi giảm, mức đệ quy vẫn khác 0, khóa vẫn bị khóa và thuộc quyền sở hữu của tiến trình hoặc luồng gọi.
Chỉ gọi phương thức này khi tiến trình gọi hoặc luồng sở hữu khóa. Một
AssertionErrorđược nâng lên nếu phương thức này được gọi bởi một tiến trình hoặc luồng không phải là chủ sở hữu hoặc nếu khóa ở trạng thái mở khóa (chưa sở hữu). Lưu ý rằng loại ngoại lệ được nêu ra trong tình huống này khác với hành vi được triển khai trongthreading.RLock.release().
- locked()¶
Trả về một boolean cho biết hiện tại đối tượng này có bị khóa hay không.
Added in version 3.14.
- class multiprocessing.Semaphore([value])¶
Đối tượng semaphore: tương tự như
threading.Semaphore.Việc khởi tạo lớp này có thể thiết lập phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.
Có một điểm khác biệt duy nhất so với phương thức tương tự gần giống của nó: đối số đầu tiên của phương thức
acquiređược đặt tên là block, giống vớiLock.acquire().- get_value()¶
Trả về giá trị hiện tại của semaphore.
Lưu ý rằng điều này có thể tăng
NotImplementedErrortrên các nền tảng như macOS nơisem_getvalue()không được triển khai.
- locked()¶
Trả về một boolean cho biết hiện tại đối tượng này có bị khóa hay không.
Added in version 3.14.
Ghi chú
Trên macOS, sem_timedwait không được hỗ trợ, do đó, việc gọi acquire() có thời gian chờ sẽ mô phỏng hành vi của hàm đó bằng cách sử dụng vòng lặp ngủ.
Ghi chú
Một số chức năng của gói này yêu cầu triển khai semaphore được chia sẻ hoạt động trên hệ điều hành máy chủ. Nếu không có, mô-đun multiprocessing.synchronize sẽ bị vô hiệu hóa và các nỗ lực nhập nó sẽ dẫn đến ImportError. Xem bpo-3770 để biết thêm thông tin.
Người quản lý¶
Người quản lý cung cấp cách tạo dữ liệu có thể được chia sẻ giữa các quy trình khác nhau, bao gồm chia sẻ qua mạng giữa các quy trình chạy trên các máy khác nhau. Đối tượng người quản lý điều khiển quy trình máy chủ quản lý shared objects. Các quy trình khác có thể truy cập các đối tượng được chia sẻ bằng cách sử dụng proxy.
- multiprocessing.Manager()¶
Trả về một đối tượng
SyncManagerđã bắt đầu có thể được sử dụng để chia sẻ các đối tượng giữa các tiến trình. Đối tượng người quản lý được trả về tương ứng với một tiến trình con được sinh ra và có các phương thức sẽ tạo các đối tượng dùng chung và trả về các proxy tương ứng.
Các quy trình của trình quản lý sẽ bị tắt ngay khi chúng được thu gom rác hoặc quy trình mẹ của chúng thoát ra. Các lớp trình quản lý được định nghĩa trong mô-đun multiprocessing.managers:
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
Tạo một đối tượng BaseManager.
Sau khi tạo, bạn nên gọi
start()hoặcget_server().serve_forever()để đảm bảo rằng đối tượng người quản lý đề cập đến một quy trình quản lý đã bắt đầu.address là địa chỉ mà quy trình quản lý lắng nghe các kết nối mới. Nếu address là
Nonethì một cái tùy ý sẽ được chọn.authkey là khóa xác thực sẽ được sử dụng để kiểm tra tính hợp lệ của các kết nối đến quy trình máy chủ. Nếu authkey là
Nonethìcurrent_process().authkeyđược sử dụng. Nếu không thì authkey được sử dụng và nó phải là một chuỗi byte.serializer phải là
'pickle'(sử dụng tuần tự hóapickle) hoặc'xmlrpclib'(sử dụng tuần tự hóaxmlrpc.client).ctx là một đối tượng ngữ cảnh hoặc
None(sử dụng ngữ cảnh hiện tại). NếuNone, việc gọi này có thể đặt phương thức bắt đầu chung. Xem Phương pháp bắt đầu toàn cầu để biết thêm chi tiết.shutdown_timeout là thời gian chờ tính bằng giây được sử dụng để đợi cho đến khi quá trình được người quản lý sử dụng hoàn tất trong phương thức
shutdown(). Nếu hết thời gian tắt máy, quá trình này sẽ kết thúc. Nếu việc chấm dứt quá trình cũng hết thời gian, quá trình đó sẽ bị hủy.Thay đổi trong phiên bản 3.11: Đã thêm tham số shutdown_timeout.
- start([initializer[, initargs]])¶
Bắt đầu một quy trình con để khởi động trình quản lý. Nếu initializer không phải là
Nonethì tiến trình con sẽ gọiinitializer(*initargs)khi nó khởi động.
- get_server()¶
Trả về đối tượng
Serverđại diện cho máy chủ thực tế dưới sự kiểm soát của Người quản lý. Đối tượngServerhỗ trợ phương thứcserve_forever()>>> từ multiprocessing.managers nhập BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> máy chủ = manager.get_server() >>> server.serve_forever()
Servercũng có thuộc tínhaddress.
- connect()¶
Kết nối đối tượng người quản lý cục bộ với quy trình quản lý từ xa:
>>> từ multiprocessing.managers nhập BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()¶
Dừng quá trình được sử dụng bởi người quản lý. Điều này chỉ khả dụng nếu
start()đã được sử dụng để khởi động quá trình máy chủ.Điều này có thể được gọi nhiều lần.
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
Một phương thức lớp có thể được sử dụng để đăng ký một loại hoặc có thể gọi được với lớp người quản lý.
typeid là "mã định danh loại" được sử dụng để xác định một loại đối tượng dùng chung cụ thể. Đây phải là một chuỗi.
callable là một lệnh gọi được sử dụng để tạo đối tượng cho mã định danh loại này. Nếu một phiên bản trình quản lý sẽ được kết nối với máy chủ bằng phương thức
connect()hoặc nếu đối số create_method làFalsethì đối số này có thể được để lại làNone.proxytype là một lớp con của
BaseProxyđược sử dụng để tạo proxy cho các đối tượng được chia sẻ với typeid này. NếuNonethì một lớp proxy sẽ được tạo tự động.exposed được sử dụng để chỉ định một chuỗi các tên phương thức mà proxy cho typeid này sẽ được phép truy cập bằng
BaseProxy._callmethod(). (Nếu exposed làNonethìproxytype._exposed_sẽ được sử dụng thay thế nếu nó tồn tại.) Trong trường hợp không có danh sách nào được chỉ định, tất cả "phương thức công khai" của đối tượng được chia sẻ sẽ có thể truy cập được. (Ở đây "phương thức công khai" có nghĩa là bất kỳ thuộc tính nào có phương thức__call__()và có tên không bắt đầu bằng'_'.)method_to_typeid là một ánh xạ được sử dụng để chỉ định kiểu trả về của các phương thức được hiển thị sẽ trả về proxy. Nó ánh xạ tên phương thức thành chuỗi typeid. (Nếu method_to_typeid là
Nonethìproxytype._method_to_typeid_sẽ được sử dụng thay thế nếu nó tồn tại.) Nếu tên của phương thức không phải là khóa của ánh xạ này hoặc nếu ánh xạ làNonethì đối tượng được phương thức trả về sẽ được sao chép theo giá trị.create_method xác định xem có nên tạo một phương thức với tên typeid hay không. Phương thức này có thể được sử dụng để yêu cầu quá trình máy chủ tạo một đối tượng chia sẻ mới và trả về proxy cho nó. Theo mặc định là
True.
Các phiên bản
BaseManagercũng có một thuộc tính chỉ đọc:- address¶
Địa chỉ được người quản lý sử dụng.
Thay đổi trong phiên bản 3.3: Các đối tượng trình quản lý hỗ trợ giao thức quản lý ngữ cảnh - xem Các loại trình quản lý bối cảnh.
__enter__()bắt đầu quá trình máy chủ (nếu nó chưa bắt đầu) và sau đó trả về đối tượng người quản lý.__exit__()gọishutdown().Trong các phiên bản trước,
__enter__()không khởi động quy trình máy chủ của người quản lý nếu nó chưa được khởi động.
- class multiprocessing.managers.SyncManager¶
Một lớp con của
BaseManagercó thể được sử dụng để đồng bộ hóa các quy trình. Các đối tượng thuộc loại này được trả về bởimultiprocessing.Manager().Các phương thức của nó tạo và trả về Đối tượng ủy quyền cho một số loại dữ liệu thường được sử dụng để đồng bộ hóa giữa các quy trình. Điều này đặc biệt bao gồm các danh sách và từ điển được chia sẻ.
- Barrier(parties[, action[, timeout]])¶
Tạo một đối tượng
threading.Barrierđược chia sẻ và trả lại proxy cho nó.Added in version 3.3.
- BoundedSemaphore([value])¶
Tạo một đối tượng
threading.BoundedSemaphoređược chia sẻ và trả lại proxy cho nó.
- Condition([lock])¶
Tạo một đối tượng
threading.Conditionđược chia sẻ và trả lại proxy cho nó.Nếu lock được cung cấp thì nó sẽ là proxy cho đối tượng
threading.Lockhoặcthreading.RLock.Thay đổi trong phiên bản 3.3: Phương thức
wait_for()đã được thêm vào.
- Event()¶
Tạo một đối tượng
threading.Eventđược chia sẻ và trả lại proxy cho nó.
- Lock()¶
Tạo một đối tượng
threading.Lockđược chia sẻ và trả lại proxy cho nó.
- Queue([maxsize])¶
Tạo một đối tượng
queue.Queueđược chia sẻ và trả lại proxy cho nó.
- RLock()¶
Tạo một đối tượng
threading.RLockđược chia sẻ và trả lại proxy cho nó.
- Semaphore([value])¶
Tạo một đối tượng
threading.Semaphoređược chia sẻ và trả lại proxy cho nó.
- Array(typecode, sequence)¶
Tạo một mảng và trả về proxy cho nó.
- Value(typecode, value)¶
Tạo một đối tượng có thuộc tính
valuecó thể ghi và trả về proxy cho nó.
- set()¶
- set(sequence)
- set(mapping)
Tạo một đối tượng
setđược chia sẻ và trả lại proxy cho nó.Added in version 3.14: hỗ trợ
setđã được thêm vào.
Thay đổi trong phiên bản 3.6: Các đối tượng được chia sẻ có khả năng được lồng vào nhau. Ví dụ: một đối tượng vùng chứa dùng chung chẳng hạn như danh sách dùng chung có thể chứa các đối tượng dùng chung khác, tất cả sẽ được
SyncManagerquản lý và đồng bộ hóa.
- class multiprocessing.managers.Namespace¶
Một loại có thể đăng ký với
SyncManager.Một đối tượng không gian tên không có phương thức công khai nhưng có các thuộc tính có thể ghi. Biểu diễn của nó cho thấy các giá trị của các thuộc tính của nó.
Tuy nhiên, khi sử dụng proxy cho đối tượng không gian tên, thuộc tính bắt đầu bằng
'_'sẽ là thuộc tính của proxy chứ không phải thuộc tính của tham chiếu:>>> mp_context = multiprocessing.get_context('spawn') >>> người quản lý = mp_context.Manager() >>> Toàn cầu = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'xin chào' >>> Global._z = 12.3 # this là thuộc tính của proxy >>> in (Toàn cầu) Không gian tên(x=10, y='xin chào')
Người quản lý tùy chỉnh¶
Để tạo người quản lý của riêng mình, người ta tạo một lớp con của BaseManager và sử dụng phương thức lớp register() để đăng ký các loại hoặc khả năng gọi mới với lớp người quản lý. Ví dụ:
từ multiprocessing.managers nhập BaseManager
lớp ToánLớp:
def add(self, x, y):
trả về x + y
def mul(self, x, y):
trả về x * y
lớp MyManager(BaseManager):
vượt qua
MyManager.register('Toán', MathsClass)
nếu __name__ == '__main__':
với MyManager() làm người quản lý:
toán học = người quản lý.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Sử dụng trình quản lý từ xa¶
Có thể chạy máy chủ quản lý trên một máy và yêu cầu khách hàng sử dụng nó từ các máy khác (giả sử rằng tường lửa có liên quan cho phép điều đó).
Việc chạy các lệnh sau sẽ tạo một máy chủ cho một hàng đợi chia sẻ duy nhất mà máy khách từ xa có thể truy cập
>>> từ multiprocessing.managers nhập BaseManager
>>> từ hàng đợi nhập hàng đợi
>>> hàng đợi = Hàng đợi()
>>> lớp QueueManager(BaseManager): vượt qua
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Một khách hàng có thể truy cập vào máy chủ như sau:
>>> từ multiprocessing.managers nhập BaseManager
>>> lớp QueueManager(BaseManager): vượt qua
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> xếp hàng = m.get_queue()
>>> queue.put('xin chào')
Một khách hàng khác cũng có thể sử dụng nó:
>>> từ multiprocessing.managers nhập BaseManager
>>> lớp QueueManager(BaseManager): vượt qua
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> xếp hàng = m.get_queue()
>>> hàng đợi.get()
'xin chào'
Các quy trình cục bộ cũng có thể truy cập hàng đợi đó, sử dụng mã từ phía trên trên máy khách để truy cập từ xa
>>> từ quá trình nhập đa xử lý, hàng đợi
>>> từ multiprocessing.managers nhập BaseManager
>>> Lớp Worker(Process):
... def __init__(self, q):
... self.q = q
... siêu().__init__()
... def run(self):
... self.q.put('xin chào địa phương')
...
>>> hàng đợi = Hàng đợi()
>>> w = Công nhân(hàng đợi)
>>> w.start()
>>> lớp QueueManager(BaseManager): vượt qua
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Đối tượng ủy quyền¶
Proxy là một đối tượng refers đến một đối tượng được chia sẻ tồn tại (có lẽ) trong một quy trình khác. Đối tượng được chia sẻ được cho là referent của proxy. Nhiều đối tượng proxy có thể có cùng một tham chiếu.
Một đối tượng proxy có các phương thức gọi các phương thức tương ứng của tham chiếu của nó (mặc dù không phải mọi phương thức của tham chiếu đều nhất thiết phải có sẵn thông qua proxy). Bằng cách này, một proxy có thể được sử dụng giống như người giới thiệu nó có thể:
>>> mp_context = multiprocessing.get_context('spawn')
>>> người quản lý = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> in(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> in(repr(l))
<Đối tượng ListProxy, gõ 'danh sách' ở 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Lưu ý rằng việc áp dụng str() cho proxy sẽ trả về đại diện của người được giới thiệu, trong khi áp dụng repr() sẽ trả về đại diện của proxy.
Một tính năng quan trọng của các đối tượng proxy là chúng có thể chọn được để chúng có thể được chuyển giữa các tiến trình. Như vậy, một tham chiếu có thể chứa Đối tượng ủy quyền. Điều này cho phép lồng các danh sách, ký tự được quản lý này và Đối tượng ủy quyền khác:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent của a hiện chứa tham chiếu của b
>>> in(a, b)
[<Đối tượng ListProxy, typeid 'list' tại ...>] []
>>> b.append('xin chào')
>>> print(a[0], b)
['xin chào'] ['xin chào']
Tương tự, các proxy chính tả và danh sách có thể được lồng vào nhau
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> in(l_outer[1])
{'c': 3, 'z': 26}
Nếu các đối tượng list hoặc dict tiêu chuẩn (không phải proxy) được chứa trong một tham chiếu, thì các sửa đổi đối với các giá trị có thể thay đổi đó sẽ không được truyền qua trình quản lý vì proxy không có cách nào để biết khi nào các giá trị chứa bên trong được sửa đổi. Tuy nhiên, việc lưu trữ một giá trị trong proxy vùng chứa (kích hoạt __setitem__ trên đối tượng proxy) sẽ truyền qua trình quản lý và do đó, để sửa đổi mục đó một cách hiệu quả, người ta có thể gán lại giá trị đã sửa đổi cho proxy vùng chứa:
# create một proxy danh sách và nối thêm một đối tượng có thể thay đổi (từ điển)
lproxy = người quản lý.list()
lproxy.append({})
# now biến đổi từ điển
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at điểm này, những thay đổi đối với d vẫn chưa được đồng bộ hóa, nhưng bằng cách
# updating từ điển, proxy được thông báo về sự thay đổi
lproxy[0] = d
Cách tiếp cận này có lẽ kém thuận tiện hơn so với việc sử dụng Đối tượng ủy quyền lồng nhau trong hầu hết các trường hợp sử dụng nhưng cũng thể hiện mức độ kiểm soát đồng bộ hóa.
Ghi chú
Các loại proxy trong multiprocessing không hỗ trợ việc so sánh theo giá trị. Vì vậy, ví dụ, chúng ta có:
>>> manager.list([1,2,3]) == [1,2,3]
sai
Thay vào đó, người ta chỉ nên sử dụng một bản sao của người giới thiệu khi so sánh.
- class multiprocessing.managers.BaseProxy¶
Đối tượng proxy là phiên bản của các lớp con của
BaseProxy.- _callmethod(methodname[, args[, kwds]])¶
Gọi và trả về kết quả của một phương thức được tham chiếu của proxy.
Nếu
proxylà proxy có tham chiếu làobjthì biểu thứcproxy._callmethod(tên phương thức, args, kwds)
sẽ đánh giá biểu thức
getattr(obj, tên phương thức)(*args, **kwds)
trong quá trình của người quản lý.
Giá trị được trả về sẽ là bản sao kết quả của cuộc gọi hoặc proxy tới một đối tượng được chia sẻ mới -- xem tài liệu về đối số method_to_typeid của
BaseManager.register().Nếu một ngoại lệ được đưa ra trong cuộc gọi thì
_callmethod()sẽ được đưa ra lại. Nếu một số ngoại lệ khác được đưa ra trong quy trình của người quản lý thì ngoại lệ này sẽ được chuyển thành ngoại lệRemoteErrorvà được đưa ra bởi_callmethod().Đặc biệt lưu ý rằng một ngoại lệ sẽ được đưa ra nếu methodname chưa phải là exposed.
Một ví dụ về cách sử dụng
_callmethod():>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent tới l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent tới l[20] Traceback (cuộc gọi gần đây nhất): ... IndexError: liệt kê chỉ mục ngoài phạm vi
- _getvalue()¶
Trả lại một bản sao của người giới thiệu.
Nếu người giới thiệu không thể chọn được thì điều này sẽ đưa ra một ngoại lệ.
- __repr__()¶
Trả về một đại diện của đối tượng proxy.
- __str__()¶
Trả lại đại diện của người giới thiệu.
Dọn dẹp¶
Một đối tượng proxy sử dụng một lệnh gọi lại yếu để khi nó được thu thập rác, nó sẽ tự hủy đăng ký khỏi người quản lý sở hữu người tham chiếu của nó.
Một đối tượng được chia sẻ sẽ bị xóa khỏi quy trình quản lý khi không còn bất kỳ proxy nào đề cập đến nó nữa.
Nhóm xử lý¶
Người ta có thể tạo một nhóm các quy trình sẽ thực hiện các nhiệm vụ được gửi tới nó với lớp Pool.
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
Một đối tượng nhóm quy trình kiểm soát một nhóm các quy trình công nhân mà công việc có thể được gửi tới. Nó hỗ trợ các kết quả không đồng bộ với thời gian chờ và lệnh gọi lại, đồng thời triển khai bản đồ song song.
processes là số lượng quy trình công nhân sẽ sử dụng. Nếu processes là
Nonethì số được trả về bởios.process_cpu_count()sẽ được sử dụng.Nếu initializer không phải là
Nonethì mỗi tiến trình công nhân sẽ gọiinitializer(*initargs)khi nó bắt đầu.maxtasksperchild là số lượng nhiệm vụ mà một quy trình công nhân có thể hoàn thành trước khi nó thoát ra và được thay thế bằng một quy trình công nhân mới, để cho phép giải phóng các tài nguyên không sử dụng. Zz002zz mặc định là
None, có nghĩa là các quy trình công nhân sẽ tồn tại lâu như nhóm.context có thể được sử dụng để chỉ định bối cảnh được sử dụng để bắt đầu các quy trình công nhân. Thông thường, một nhóm được tạo bằng hàm
multiprocessing.Pool()hoặc phương thứcPool()của đối tượng ngữ cảnh. Trong cả hai trường hợp, context đều được đặt phù hợp. NếuNone, việc gọi hàm này sẽ có tác dụng phụ là thiết lập phương thức khởi động chung hiện tại nếu nó chưa được thiết lập. Xem chức năngget_context().Lưu ý rằng các phương thức của đối tượng nhóm chỉ nên được gọi bởi quá trình đã tạo nhóm.
Cảnh báo
Các đối tượng
multiprocessing.poolcó các tài nguyên nội bộ cần được quản lý đúng cách (giống như bất kỳ tài nguyên nào khác) bằng cách sử dụng nhóm làm trình quản lý bối cảnh hoặc bằng cách gọiclose()vàterminate()theo cách thủ công. Việc không thực hiện điều này có thể dẫn đến quá trình bị treo khi quyết toán.Lưu ý rằng not correct phải dựa vào trình thu gom rác để phá hủy nhóm vì CPython không đảm bảo rằng trình hoàn thiện của nhóm sẽ được gọi (xem
object.__del__()để biết thêm thông tin).Thay đổi trong phiên bản 3.2: Đã thêm tham số maxtasksperchild.
Thay đổi trong phiên bản 3.4: Đã thêm tham số context.
Thay đổi trong phiên bản 3.13: processes sử dụng
os.process_cpu_count()theo mặc định, thay vìos.cpu_count().Ghi chú
Các quy trình công nhân trong
Poolthường tồn tại trong toàn bộ thời gian của hàng đợi công việc của Nhóm. Một mô hình thường thấy trong các hệ thống khác (chẳng hạn như Apache, mod_wsgi, v.v.) để giải phóng tài nguyên do công nhân nắm giữ là cho phép một công nhân trong nhóm chỉ hoàn thành một lượng công việc nhất định trước khi thoát ra, được dọn dẹp và một quy trình mới được sinh ra để thay thế quy trình cũ. Đối số maxtasksperchild choPoolcho thấy khả năng này đối với người dùng cuối.- apply(func[, args[, kwds]])¶
Gọi func với các đối số args và các đối số từ khóa kwds. Nó chặn cho đến khi kết quả đã sẵn sàng. Với các khối này,
apply_async()phù hợp hơn để thực hiện công việc song song. Ngoài ra, func chỉ được thực thi ở một trong những công nhân của nhóm.
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
Một biến thể của phương thức
apply()trả về đối tượngAsyncResult.Nếu callback được chỉ định thì nó sẽ là một lệnh có thể gọi được và chấp nhận một đối số duy nhất. Khi kết quả đã sẵn sàng, callback được áp dụng cho nó, trừ khi lệnh gọi không thành công, trong trường hợp đó, error_callback được áp dụng thay thế.
Nếu error_callback được chỉ định thì nó sẽ là một lệnh có thể gọi được và chấp nhận một đối số duy nhất. Nếu hàm đích bị lỗi thì error_callback sẽ được gọi với phiên bản ngoại lệ.
Cuộc gọi lại phải hoàn tất ngay lập tức vì nếu không, chuỗi xử lý kết quả sẽ bị chặn.
- map(func, iterable[, chunksize])¶
Một hàm tương đương song song với hàm tích hợp
map()(tuy nhiên, nó chỉ hỗ trợ một đối số iterable, đối với nhiều lần lặp, hãy xemstarmap()). Nó chặn cho đến khi kết quả đã sẵn sàng.Phương thức này chia iterable thành một số khối mà nó gửi đến nhóm quy trình dưới dạng các tác vụ riêng biệt. Kích thước (gần đúng) của các khối này có thể được chỉ định bằng cách đặt chunksize thành số nguyên dương.
Lưu ý rằng nó có thể gây ra mức sử dụng bộ nhớ cao trong các lần lặp rất dài. Hãy cân nhắc sử dụng
imap()hoặcimap_unordered()với tùy chọn chunksize rõ ràng để có hiệu quả tốt hơn.
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Một biến thể của phương thức
map()trả về đối tượngAsyncResult.Nếu callback được chỉ định thì nó sẽ là một lệnh có thể gọi được và chấp nhận một đối số duy nhất. Khi kết quả đã sẵn sàng, callback được áp dụng cho nó, trừ khi lệnh gọi không thành công, trong trường hợp đó, error_callback được áp dụng thay thế.
Nếu error_callback được chỉ định thì nó sẽ là một lệnh có thể gọi được và chấp nhận một đối số duy nhất. Nếu hàm đích bị lỗi thì error_callback sẽ được gọi với phiên bản ngoại lệ.
Cuộc gọi lại phải hoàn tất ngay lập tức vì nếu không, chuỗi xử lý kết quả sẽ bị chặn.
- imap(func, iterable[, chunksize])¶
Một phiên bản lười biếng hơn của
map().Đối số chunksize giống với đối số được sử dụng bởi phương thức
map(). Đối với các lần lặp rất dài, việc sử dụng giá trị lớn cho chunksize có thể khiến công việc hoàn thành much nhanh hơn so với việc sử dụng giá trị mặc định là1.Ngoài ra, nếu chunksize là
1thì phương thứcnext()của trình lặp được trả về bởi phương thứcimap()có một tham số timeout tùy chọn:next(timeout)sẽ tăngmultiprocessing.TimeoutErrornếu không thể trả về kết quả trong vòng timeout giây.
- imap_unordered(func, iterable[, chunksize])¶
Tương tự như
imap()ngoại trừ việc thứ tự các kết quả từ trình lặp được trả về phải được coi là tùy ý. (Chỉ khi chỉ có một quy trình công nhân thì thứ tự mới được đảm bảo là "chính xác".)
- starmap(func, iterable[, chunksize])¶
Giống như
map()ngoại trừ các phần tử của iterable được mong đợi là các phần tử có thể lặp lại và được giải nén dưới dạng đối số.Do đó, iterable của
[(1,2), (3, 4)]sẽ tạo ra[func(1,2), func(3,4)].Added in version 3.3.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Một sự kết hợp của
starmap()vàmap_async()lặp lại trên iterable của các lần lặp và gọi func với các lần lặp được giải nén. Trả về một đối tượng kết quả.Added in version 3.3.
- close()¶
Ngăn chặn việc gửi thêm bất kỳ nhiệm vụ nào vào nhóm. Khi tất cả các nhiệm vụ đã được hoàn thành, các quy trình công nhân sẽ thoát.
- terminate()¶
Dừng quy trình của công nhân ngay lập tức mà không hoàn thành công việc còn tồn đọng. Khi đối tượng pool được thu gom rác
terminate()sẽ được gọi ngay lập tức.
- join()¶
Đợi quá trình công nhân thoát ra. Người ta phải gọi
close()hoặcterminate()trước khi sử dụngjoin().
Thay đổi trong phiên bản 3.3: Các đối tượng nhóm hiện hỗ trợ giao thức quản lý ngữ cảnh - xem Các loại trình quản lý bối cảnh.
__enter__()trả về đối tượng nhóm và__exit__()gọiterminate().
- class multiprocessing.pool.AsyncResult¶
Lớp kết quả được trả về bởi
Pool.apply_async()vàPool.map_async().- get([timeout])¶
Trả lại kết quả khi nó đến. Nếu timeout không phải là
Nonevà kết quả không đến trong vòng timeout giây thìmultiprocessing.TimeoutErrorsẽ được nâng lên. Nếu cuộc gọi từ xa đưa ra một ngoại lệ thì ngoại lệ đó sẽ đượcget()đưa ra lại.
- wait([timeout])¶
Đợi cho đến khi có kết quả hoặc cho đến khi timeout trôi qua.
- ready()¶
Trả về xem cuộc gọi đã hoàn thành chưa.
- successful()¶
Trả về liệu cuộc gọi có hoàn thành mà không đưa ra ngoại lệ hay không. Sẽ tăng
ValueErrornếu kết quả chưa sẵn sàng.Thay đổi trong phiên bản 3.7: Nếu kết quả chưa sẵn sàng,
ValueErrorsẽ được nâng lên thay vìAssertionError.
Ví dụ sau đây minh họa cách sử dụng nhóm:
từ nhóm nhập đa xử lý
thời gian nhập khẩu
định nghĩa f(x):
trả lại x*x
nếu __name__ == '__main__':
với Pool(processes=4) là pool: # start 4 quy trình công nhân
result = pool.apply_async(f, (10,)) # evaluate "f(10)" không đồng bộ trong một quy trình
print(result.get(timeout=1)) # prints "100" trừ khi máy tính của bạn chậm *very*
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" trừ khi máy tính của bạn chậm *very*
kết quả = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Người nghe và khách hàng¶
Thông thường việc truyền thông điệp giữa các tiến trình được thực hiện bằng cách sử dụng hàng đợi hoặc bằng cách sử dụng các đối tượng Connection được trả về bởi Pipe().
Tuy nhiên, mô-đun multiprocessing.connection cho phép linh hoạt hơn. Về cơ bản, nó cung cấp một API định hướng thông báo cấp cao để xử lý các ổ cắm hoặc đường ống có tên Windows. Nó cũng hỗ trợ digest authentication bằng cách sử dụng mô-đun hmac và để thăm dò nhiều kết nối cùng một lúc.
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
Gửi tin nhắn được tạo ngẫu nhiên đến đầu bên kia của kết nối và chờ phản hồi.
Nếu câu trả lời khớp với thông báo tóm tắt của tin nhắn sử dụng authkey làm khóa thì tin nhắn chào mừng sẽ được gửi đến đầu bên kia của kết nối. Nếu không thì
AuthenticationErrorsẽ được nâng lên.
- multiprocessing.connection.answer_challenge(connection, authkey)¶
Nhận tin nhắn, tính toán thông báo của tin nhắn bằng cách sử dụng authkey làm khóa, sau đó gửi lại thông báo.
Nếu không nhận được tin nhắn chào mừng thì
AuthenticationErrorsẽ được nâng lên.
- multiprocessing.connection.Client(address[, family[, authkey]])¶
Cố gắng thiết lập kết nối với trình nghe đang sử dụng địa chỉ address, trả về
Connection.Loại kết nối được xác định bởi đối số family, nhưng điều này thường có thể được bỏ qua vì nó thường có thể được suy ra từ định dạng của address. (Xem Định dạng địa chỉ)
Nếu authkey được cung cấp chứ không phải
Nonethì đó phải là một chuỗi byte và sẽ được sử dụng làm khóa bí mật cho thử thách xác thực dựa trên HMAC. Không có xác thực nào được thực hiện nếu authkey làNone.AuthenticationErrorđược nâng lên nếu xác thực không thành công. Xem Khóa xác thực.
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
Một trình bao bọc cho một ổ cắm bị ràng buộc hoặc đường ống có tên Windows đang 'lắng nghe' các kết nối.
address là địa chỉ được sử dụng bởi ổ cắm bị ràng buộc hoặc đường ống có tên của đối tượng người nghe.
Ghi chú
Nếu địa chỉ '0.0.0.0' được sử dụng, địa chỉ đó sẽ không phải là điểm cuối có thể kết nối trên Windows. Nếu bạn yêu cầu điểm cuối có thể kết nối, bạn nên sử dụng '127.0.0.1'.
family là loại socket (hoặc ống có tên) để sử dụng. Đây có thể là một trong các chuỗi
'AF_INET'(đối với ổ cắm TCP),'AF_UNIX'(đối với ổ cắm miền Unix) hoặc'AF_PIPE'(đối với ống có tên Windows). Trong số này chỉ có cái đầu tiên được đảm bảo có sẵn. Nếu family làNonethì họ được suy ra từ định dạng của address. Nếu address cũng làNonethì giá trị mặc định sẽ được chọn. Mặc định này là họ được coi là có sẵn nhanh nhất. Xem Định dạng địa chỉ. Lưu ý rằng nếu family là'AF_UNIX'và địa chỉ làNonethì ổ cắm sẽ được tạo trong một thư mục tạm thời riêng được tạo bằngtempfile.mkstemp().Nếu đối tượng người nghe sử dụng một ổ cắm thì backlog (1 theo mặc định) sẽ được chuyển tới phương thức
listen()của ổ cắm sau khi nó được liên kết.Nếu authkey được cung cấp chứ không phải
Nonethì đó phải là một chuỗi byte và sẽ được sử dụng làm khóa bí mật cho thử thách xác thực dựa trên HMAC. Không có xác thực nào được thực hiện nếu authkey làNone.AuthenticationErrorđược nâng lên nếu xác thực không thành công. Xem Khóa xác thực.- accept()¶
Chấp nhận kết nối trên ổ cắm bị ràng buộc hoặc đường ống có tên của đối tượng người nghe và trả về đối tượng
Connection. Nếu xác thực được thử và không thành công thìAuthenticationErrorsẽ được nâng lên.
- close()¶
Đóng ổ cắm bị ràng buộc hoặc đường ống có tên của đối tượng người nghe. Điều này được gọi tự động khi người nghe được thu thập rác. Tuy nhiên, nên gọi nó một cách rõ ràng.
Đối tượng Listener có các thuộc tính chỉ đọc sau:
- address¶
Địa chỉ đang được sử dụng bởi đối tượng Listener.
- last_accepted¶
Địa chỉ mà kết nối được chấp nhận cuối cùng đến. Nếu không có thì đó là
None.
Thay đổi trong phiên bản 3.3: Các đối tượng Listener hiện hỗ trợ giao thức quản lý ngữ cảnh -- xem Các loại trình quản lý bối cảnh.
__enter__()trả về đối tượng người nghe và__exit__()gọiclose().
- multiprocessing.connection.wait(object_list, timeout=None)¶
Đợi cho đến khi một đối tượng trong object_list sẵn sàng. Trả về danh sách các đối tượng đã sẵn sàng trong object_list. Nếu timeout là float thì cuộc gọi sẽ chặn tối đa khoảng thời gian đó. Nếu timeout là
Nonethì nó sẽ chặn trong thời gian không giới hạn. Thời gian chờ âm tương đương với thời gian chờ bằng 0.Đối với cả POSIX và Windows, một đối tượng có thể xuất hiện trong object_list nếu nó
một đối tượng
Connectioncó thể đọc được;một đối tượng
socket.socketđược kết nối và có thể đọc được; hoặc
Một đối tượng kết nối hoặc ổ cắm sẵn sàng khi có sẵn dữ liệu để đọc từ nó hoặc đầu kia đã bị đóng.
POSIX:
wait(object_list, timeout)gần như tương đương vớiselect.select(object_list, [], [], timeout). Sự khác biệt là, nếuselect.select()bị gián đoạn bởi một tín hiệu, nó có thể đưa raOSErrorvới số lỗi làEINTR, trong khiwait()thì không.Windows: Một mục trong object_list phải là một bộ điều khiển số nguyên có thể chờ được (theo định nghĩa được sử dụng trong tài liệu của hàm Win32
WaitForMultipleObjects()) hoặc nó có thể là một đối tượng có phương thứcfileno()trả về một bộ điều khiển ổ cắm hoặc bộ điều khiển ống. (Lưu ý rằng tay cầm ống và tay cầm ổ cắm là tay cầm có thể chờ not.)Added in version 3.3.
Examples
Mã máy chủ sau đây tạo trình nghe sử dụng 'secret password' làm khóa xác thực. Sau đó, nó chờ kết nối và gửi một số dữ liệu đến máy khách
từ trình nghe nhập multiprocessing.connection
từ mảng nhập mảng
address = ('localhost', 6000) # family được suy ra là 'AF_INET'
với Listener(address, authkey=b'secret pass') là người nghe:
với listen.accept() là kết nối:
print('kết nối được chấp nhận từ', listen.last_accepted)
conn.send([2.25, None, 'rác', float])
conn.send_bytes(b'hello')
conn.send_bytes(mảng('i', [42, 1729]))
Đoạn mã sau kết nối với máy chủ và nhận một số dữ liệu từ máy chủ
từ khách hàng nhập multiprocessing.connection
từ mảng nhập mảng
địa chỉ = ('localhost', 6000)
với Client(address, authkey=b'secret pass') là conn:
print(conn.recv()) # => [2.25, Không có, 'rác', float]
print(conn.recv_bytes()) # => 'xin chào'
mảng = mảng('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => mảng('i', [42, 1729, 0, 0, 0])
Đoạn mã sau sử dụng wait() để chờ tin nhắn từ nhiều quy trình cùng một lúc
từ quá trình nhập đa xử lý, đường ống, current_process
từ quá trình nhập multiprocessing.connection chờ
def foo(w):
cho tôi trong phạm vi (10):
w.send((i, current_process().name))
w.close()
nếu __name__ == '__main__':
độc giả = []
cho tôi trong phạm vi (4):
r, w = Ống(song công=Sai)
độc giả.append(r)
p = Quá trình(target=foo, args=(w,))
p.start()
# We hãy đóng đầu ống có thể ghi ngay bây giờ để đảm bảo rằng
# p là quá trình duy nhất sở hữu một tay cầm cho nó. Cái này
# ensures rằng khi p đóng tay cầm của nó ở đầu có thể ghi,
# wait() sẽ nhanh chóng báo cáo phần cuối có thể đọc được là đã sẵn sàng.
w.close()
trong khi độc giả:
for r in wait(độc giả):
thử:
tin nhắn = r.recv()
ngoại trừ EOFError:
độc giả.remove (r)
khác:
in (tin nhắn)
Định dạng địa chỉ¶
Địa chỉ
'AF_INET'là một bộ dữ liệu có dạng(hostname, port)trong đó hostname là một chuỗi và port là một số nguyên.Địa chỉ
'AF_UNIX'là một chuỗi biểu thị tên tệp trên hệ thống tệp.Địa chỉ
'AF_PIPE'là một chuỗi có dạngr'\\.\pipe\PipeName'. Để sử dụngClient()để kết nối với một đường dẫn có tên trên máy tính từ xa có tên ServerName, thay vào đó, bạn nên sử dụng địa chỉ có dạngr'\\ServerName\pipe\PipeName'.
Lưu ý rằng bất kỳ chuỗi nào bắt đầu bằng hai dấu gạch chéo ngược theo mặc định đều được coi là địa chỉ 'AF_PIPE' chứ không phải địa chỉ 'AF_UNIX'.
Khóa xác thực¶
Khi một người sử dụng Connection.recv, dữ liệu nhận được sẽ tự động được giải nén. Thật không may, việc giải nén dữ liệu từ một nguồn không đáng tin cậy là một rủi ro bảo mật. Do đó, Listener và Client() sử dụng mô-đun hmac để cung cấp xác thực thông báo.
Khóa xác thực là một chuỗi byte có thể được coi là mật khẩu: sau khi kết nối được thiết lập, cả hai đầu sẽ yêu cầu bằng chứng rằng bên kia biết khóa xác thực. (Việc chứng minh rằng cả hai đầu đều sử dụng cùng một khóa thì not có liên quan đến việc gửi khóa qua kết nối.)
Nếu yêu cầu xác thực nhưng không có khóa xác thực nào được chỉ định thì giá trị trả về của current_process().authkey sẽ được sử dụng (xem Process). Giá trị này sẽ được tự động kế thừa bởi bất kỳ đối tượng Process nào mà quy trình hiện tại tạo ra. Điều này có nghĩa là (theo mặc định) tất cả các quy trình của chương trình nhiều quy trình sẽ chia sẻ một khóa xác thực duy nhất có thể được sử dụng khi thiết lập kết nối giữa chúng.
Khóa xác thực phù hợp cũng có thể được tạo bằng cách sử dụng os.urandom().
Ghi nhật ký¶
Một số hỗ trợ cho việc ghi nhật ký có sẵn. Tuy nhiên, lưu ý rằng gói logging không sử dụng khóa chia sẻ quy trình nên có thể (tùy thuộc vào loại trình xử lý) các tin nhắn từ các quy trình khác nhau có thể bị lẫn lộn.
- multiprocessing.get_logger()¶
Trả về trình ghi nhật ký được
multiprocessingsử dụng. Nếu cần thiết, một cái mới sẽ được tạo ra.Khi tạo lần đầu tiên, trình ghi nhật ký có cấp độ
logging.NOTSETvà không có trình xử lý mặc định. Các tin nhắn được gửi tới trình ghi nhật ký này theo mặc định sẽ không được truyền tới trình ghi nhật ký gốc.Lưu ý rằng trên các tiến trình con của Windows sẽ chỉ kế thừa cấp độ của trình ghi nhật ký của tiến trình cha -- mọi tùy chỉnh khác của trình ghi nhật ký sẽ không được kế thừa.
- multiprocessing.log_to_stderr(level=None)¶
Hàm này thực hiện lệnh gọi tới
get_logger()nhưng ngoài việc trả về trình ghi nhật ký do get_logger tạo, nó còn thêm một trình xử lý gửi đầu ra tớisys.stderrbằng định dạng'[%(levelname)s/%(processName)s] %(message)s'. Bạn có thể sửa đổilevelnamecủa trình ghi nhật ký bằng cách chuyển đối sốlevel.
Dưới đây là phiên ví dụ khi bật tính năng ghi nhật ký:
>>> nhập đa xử lý, ghi nhật ký
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] thất bại
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] tiến trình con gọi self.run()
[INFO/SyncManager-...] đã tạo thư mục tạm thời /.../pymp-...
[INFO/SyncManager-...] người quản lý phục vụ tại '/.../listener-...'
>>> del m
[INFO/MainProcess] gửi thông báo tắt máy cho người quản lý
Trình quản lý [INFO/SyncManager-...] thoát với mã thoát 0
Để biết bảng đầy đủ về các cấp độ ghi nhật ký, hãy xem mô-đun logging.
Mô-đun multiprocessing.dummy¶
multiprocessing.dummy sao chép API của multiprocessing nhưng không khác gì một trình bao bọc xung quanh mô-đun threading.
Cụ thể, hàm Pool do multiprocessing.dummy cung cấp trả về một phiên bản của ThreadPool, là một lớp con của Pool hỗ trợ tất cả các lệnh gọi phương thức giống nhau nhưng sử dụng một nhóm các luồng công việc thay vì các tiến trình công nhân.
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
Một đối tượng nhóm luồng điều khiển một nhóm các luồng công việc mà công việc có thể được gửi tới. Các phiên bản
ThreadPoolhoàn toàn tương thích về mặt giao diện với các phiên bảnPoolvà tài nguyên của chúng cũng phải được quản lý hợp lý bằng cách sử dụng nhóm làm trình quản lý bối cảnh hoặc bằng cách gọiclose()vàterminate()theo cách thủ công.processes là số lượng luồng công việc sẽ sử dụng. Nếu processes là
Nonethì số được trả về bởios.process_cpu_count()sẽ được sử dụng.Nếu initializer không phải là
Nonethì mỗi tiến trình công nhân sẽ gọiinitializer(*initargs)khi nó bắt đầu.Không giống như
Pool, maxtasksperchild và context không thể được cung cấp.Ghi chú
ThreadPoolcó cùng giao diện vớiPool, được thiết kế xung quanh một nhóm quy trình và có trước sự ra đời của mô-đunconcurrent.futures. Do đó, nó kế thừa một số thao tác không có ý nghĩa đối với một nhóm được hỗ trợ bởi các luồng và nó có loại riêng để biểu thị trạng thái của các công việc không đồng bộ,AsyncResult, mà bất kỳ thư viện nào khác không hiểu được.Người dùng thường thích sử dụng
concurrent.futures.ThreadPoolExecutorhơn, giao diện này có giao diện đơn giản hơn được thiết kế xoay quanh các luồng ngay từ đầu và trả về các phiên bảnconcurrent.futures.Futuretương thích với nhiều thư viện khác, bao gồm cảasyncio.
Hướng dẫn lập trình¶
Có một số nguyên tắc và thành ngữ nhất định cần được tuân thủ khi sử dụng multiprocessing.
Tất cả các phương pháp bắt đầu¶
Những điều sau đây áp dụng cho tất cả các phương pháp bắt đầu.
Tránh trạng thái chia sẻ
Trong chừng mực có thể, người ta nên cố gắng tránh chuyển lượng lớn dữ liệu giữa các quy trình.
Có lẽ tốt nhất nên sử dụng hàng đợi hoặc đường ống để liên lạc giữa các tiến trình thay vì sử dụng các nguyên hàm đồng bộ hóa cấp thấp hơn.
Khả năng chọn lọc
Đảm bảo rằng các đối số cho các phương thức proxy có thể chọn được.
An toàn chủ đề của proxy
Không sử dụng một đối tượng proxy từ nhiều hơn một luồng trừ khi bạn bảo vệ nó bằng khóa.
(Không bao giờ có vấn đề với các quy trình khác nhau bằng proxy same.)
Tham gia quá trình zombie
Trên POSIX khi một quá trình kết thúc nhưng chưa được tham gia, nó sẽ trở thành zombie. Không bao giờ nên có nhiều quá vì mỗi khi một quy trình mới bắt đầu (hoặc
active_children()được gọi), tất cả các quy trình đã hoàn thành chưa được tham gia sẽ được tham gia. Đồng thời, việc gọiProcess.is_alivecủa một tiến trình đã hoàn tất sẽ tham gia vào tiến trình đó. Mặc dù vậy, có lẽ bạn nên tham gia rõ ràng tất cả các quy trình mà bạn bắt đầu.
Tốt hơn để kế thừa hơn là dưa chua/bỏ dưa chua
Khi sử dụng phương thức khởi động spawn hoặc forkserver, nhiều loại từ
multiprocessingcần phải có thể chọn được để các tiến trình con có thể sử dụng chúng. Tuy nhiên, nói chung nên tránh gửi các đối tượng dùng chung đến các quy trình khác bằng cách sử dụng đường ống hoặc hàng đợi. Thay vào đó, bạn nên sắp xếp chương trình sao cho một quy trình cần quyền truy cập vào tài nguyên được chia sẻ được tạo ở nơi khác có thể kế thừa nó từ quy trình tổ tiên.
Tránh kết thúc quá trình
Việc sử dụng phương pháp
Process.terminateđể dừng một quá trình có thể khiến mọi tài nguyên được chia sẻ (chẳng hạn như khóa, ngữ nghĩa, đường ống và hàng đợi) hiện đang được quá trình sử dụng bị hỏng hoặc không khả dụng đối với các quy trình khác.Vì vậy, tốt nhất bạn chỉ nên xem xét sử dụng
Process.terminatetrên các quy trình không bao giờ sử dụng bất kỳ tài nguyên được chia sẻ nào.
Tham gia các tiến trình sử dụng hàng đợi
Hãy nhớ rằng một quá trình đã đặt các mục vào hàng đợi sẽ đợi trước khi kết thúc cho đến khi tất cả các mục được lưu vào bộ đệm được luồng "feeder" đưa vào đường ống bên dưới. (Tiến trình con có thể gọi phương thức
Queue.cancel_join_threadcủa hàng đợi để tránh hành vi này.)Điều này có nghĩa là bất cứ khi nào bạn sử dụng hàng đợi, bạn cần đảm bảo rằng tất cả các mục đã được đưa vào hàng đợi cuối cùng sẽ bị xóa trước khi quá trình được tham gia. Nếu không, bạn không thể chắc chắn rằng các tiến trình đã đưa các mục vào hàng đợi sẽ chấm dứt. Cũng hãy nhớ rằng các tiến trình không phải daemon sẽ được tham gia tự động.
Một ví dụ sẽ bế tắc là như sau
từ quá trình nhập đa xử lý, hàng đợi định nghĩa f(q): q.put('X' * 1000000) nếu __name__ == '__main__': hàng đợi = Hàng đợi() p = Quá trình(target=f, args=(queue,)) p.start() p.join() # this bế tắc obj = queue.get()Cách khắc phục ở đây là hoán đổi hai dòng cuối cùng (hoặc đơn giản là xóa dòng
p.join()).
Truyền rõ ràng tài nguyên cho các tiến trình con
Trên POSIX sử dụng phương thức bắt đầu fork, một tiến trình con có thể sử dụng tài nguyên dùng chung được tạo trong tiến trình cha bằng cách sử dụng tài nguyên chung. Tuy nhiên, tốt hơn là chuyển đối tượng làm đối số cho hàm tạo của tiến trình con.
Ngoài việc làm cho mã (có khả năng) tương thích với Windows và các phương thức khởi động khác, điều này còn đảm bảo rằng miễn là tiến trình con vẫn còn hoạt động thì đối tượng sẽ không bị thu gom rác trong tiến trình gốc. Điều này có thể quan trọng nếu một số tài nguyên được giải phóng khi đối tượng là rác được thu thập trong quy trình gốc.
Vì vậy, ví dụ
từ quá trình nhập đa xử lý, Khóa chắc chắn f(): ... làm điều gì đó bằng cách sử dụng "khóa" ... nếu __name__ == '__main__': khóa = Khóa() cho tôi trong phạm vi (10): Quá trình(target=f).start()nên viết lại thành
từ quá trình nhập đa xử lý, Khóa định nghĩa f(l): ... làm điều gì đó bằng cách sử dụng "l" ... nếu __name__ == '__main__': khóa = Khóa() cho tôi trong phạm vi (10): Process(target=f, args=(lock,)).start()
Cẩn thận khi thay thế sys.stdin bằng "tập tin giống như đối tượng"
multiprocessingban đầu được gọi vô điều kiện:os.close(sys.stdin.fileno())trong phương pháp
multiprocessing.Process._bootstrap()--- điều này dẫn đến các vấn đề với các tiến trình trong tiến trình. Điều này đã được thay đổi thành:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Điều này giải quyết vấn đề cơ bản về các quá trình va chạm với nhau dẫn đến lỗi mô tả tệp không hợp lệ, nhưng gây ra mối nguy hiểm tiềm tàng cho các ứng dụng thay thế
sys.stdin()bằng "đối tượng giống như tệp" với bộ đệm đầu ra. Mối nguy hiểm này là nếu nhiều quá trình gọiclose()trên đối tượng giống như tệp này, điều đó có thể dẫn đến việc cùng một dữ liệu bị chuyển sang đối tượng nhiều lần, dẫn đến hỏng.Nếu bạn viết một đối tượng giống như tệp và triển khai bộ nhớ đệm của riêng mình, bạn có thể làm cho nó an toàn bằng cách lưu trữ pid bất cứ khi nào bạn thêm vào bộ đệm và loại bỏ bộ đệm khi pid thay đổi. Ví dụ:
@property bộ đệm def (tự): pid = os.getpid() nếu pid != self._pid: self._pid = pid self._cache = [] tự trả về._cacheĐể biết thêm thông tin, hãy xem bpo-5155, bpo-5313 và bpo-5331
Phương thức khởi động spawn và forkserver¶
Có một số hạn chế bổ sung không áp dụng cho phương thức khởi động fork.
Khả năng ngâm nhiều hơn
Đảm bảo rằng tất cả các đối số cho
Processđều có thể chọn được. Ngoài ra, nếu bạn phân lớpProcess.__init__, bạn phải đảm bảo rằng các phiên bản đó sẽ có thể được chọn khi phương thứcProcess.startđược gọi.
Biến toàn cục
Hãy nhớ rằng nếu mã chạy trong tiến trình con cố gắng truy cập vào một biến toàn cục thì giá trị mà nó nhìn thấy (nếu có) có thể không giống với giá trị trong tiến trình mẹ tại thời điểm
Process.startđược gọi.Tuy nhiên, các biến toàn cục chỉ là hằng số cấp mô-đun không gây ra vấn đề gì.
Nhập mô-đun chính một cách an toàn
Đảm bảo rằng mô-đun chính có thể được nhập một cách an toàn bởi trình thông dịch Python mới mà không gây ra tác dụng phụ ngoài ý muốn (chẳng hạn như bắt đầu một quy trình mới).
Ví dụ: sử dụng phương thức khởi động spawn hoặc forkserver chạy mô-đun sau sẽ không thành công với
RuntimeError:từ quá trình nhập đa xử lý def foo(): in('xin chào') p = Quá trình (đích=foo) p.start()Thay vào đó, người ta nên bảo vệ "điểm vào" của chương trình bằng cách sử dụng
if __name__ == '__main__':như sautừ Quá trình nhập đa xử lý, đóng băng_support, set_start_method def foo(): in('xin chào') nếu __name__ == '__main__': đóng băng_support() set_start_method('spawn') p = Quá trình (đích=foo) p.start()(Có thể bỏ qua dòng
freeze_support()nếu chương trình chạy bình thường thay vì bị treo.)Điều này cho phép trình thông dịch Python mới được sinh ra nhập mô-đun một cách an toàn và sau đó chạy chức năng
foo()của mô-đun.Các hạn chế tương tự được áp dụng nếu nhóm hoặc người quản lý được tạo trong mô-đun chính.
Ví dụ¶
Trình diễn cách tạo và sử dụng trình quản lý và proxy tùy chỉnh:
từ nhập đa xử lý đóng băng_support
từ multiprocessing.managers nhập BaseManager, BaseProxy
nhà điều hành nhập khẩu
##
class Foo:
def f(tự):
print('bạn đã gọi Foo.f()')
def g(tự):
print('bạn đã gọi Foo.g()')
def _h(tự):
print('bạn đã gọi Foo._h()')
# A chức năng tạo đơn giản
def baz():
cho tôi trong phạm vi (10):
nhường tôi*i
loại # Proxy cho các đối tượng máy phát điện
lớp GeneratorProxy(BaseProxy):
_expose_ = ['__tiếp theo__']
chắc chắn __iter__(tự):
tự trở về
chắc chắn __next__(tự):
trả về self._callmethod('__next__')
# Function để trả về mô-đun vận hành
chắc chắn get_operator_module():
toán tử trả về
##
class MyManager(BaseManager):
vượt qua
# register lớp Foo; làm cho `f()` và `g()` có thể truy cập được qua proxy
MyManager.register('Foo1', Foo)
# register lớp Foo; làm cho `g()zz000zzh()` có thể truy cập được qua proxy
MyManager.register('Foo2', Foo, Exposure=('g', '_h'))
# register chức năng tạo baz; sử dụng `GeneratorProxy` để tạo proxy
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); làm cho các chức năng công cộng có thể truy cập được thông qua proxy
MyManager.register('operator', get_operator_module)
##
def kiểm tra():
người quản lý = MyManager()
người quản lý.start()
in('-' * 20)
f1 = người quản lý.Foo1()
f1.f()
f1.g()
khẳng định không phải hasattr(f1, '_h')
khẳng định đã sắp xếp(f1._expose_) == đã sắp xếp(['f', 'g'])
in('-' * 20)
f2 = người quản lý.Foo2()
f2.g()
f2._h()
khẳng định không phải hasattr(f2, 'f')
khẳng định đã sắp xếp(f2._expose_) == đã sắp xếp(['g', '_h'])
in('-' * 20)
it = manager.baz()
vì tôi ở trong đó:
print('<%d>' % i, end=' ')
in()
in('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._expose_ =', op._expose_)
##
if __name__ == '__main__':
đóng băng_support()
kiểm tra()
Sử dụng Pool:
nhập đa xử lý
thời gian nhập khẩu
nhập khẩu ngẫu nhiên
hệ thống nhập khẩu
#
# Functions được sử dụng bởi mã kiểm tra
#
def tính toán (func, args):
kết quả = func(*args)
return '%s nói rằng %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, đối số, kết quả
)
def tính sao(args):
trả về tính toán(*args)
def mul(a, b):
time.sleep(0,5 * ngẫu nhiên.random())
trả về a * b
def cộng (a, b):
time.sleep(0,5 * ngẫu nhiên.random())
trả về a + b
định nghĩa f(x):
trả về 1,0 / (x - 5,0)
def pow3(x):
trả lại x ** 3
def noop(x):
vượt qua
#
mã # Test
kiểm tra #
def():
PROCESSES = 4
print('Tạo nhóm với %d tiến trình\n' % PROCESSES)
với multiprocessing.Pool(PROCESSES) là nhóm:
#
# Tests
#
TASKS = [(mul, (i, 7)) cho i trong phạm vi (10)] + \
[(cộng, (i, 8)) cho i trong phạm vi (10)]
kết quả = [pool.apply_async(tính toán, t) cho t trong TASKS]
imap_it = pool.imap(calatestar, TASKS)
imap_unordered_it = pool.imap_unordered(toán sao, TASKS)
print('Kết quả được sắp xếp bằng pool.apply_async():')
cho r trong kết quả:
print('\t', r.get())
in()
print('Sắp xếp kết quả bằng pool.imap():')
cho x trong imap_it:
in('\t', x)
in()
print('Kết quả không có thứ tự khi sử dụng pool.imap_unordered():')
cho x trong imap_unordered_it:
in('\t', x)
in()
print('Kết quả được sắp xếp bằng pool.map() --- sẽ chặn cho đến khi hoàn tất:')
cho x trong pool.map(calatestar, TASKS):
in('\t', x)
in()
#
xử lý lỗi # Test
#
print('Xử lý lỗi kiểm tra:')
thử:
print(pool.apply(f, (5,)))
ngoại trừ ZeroDivisionError:
print('\tGot ZeroDivisionError như mong đợi từ pool.apply()')
khác:
raise AssertionError('expected ZeroDivisionError')
thử:
print(pool.map(f, list(range(10))))
ngoại trừ ZeroDivisionError:
print('\tGot ZeroDivisionError như mong đợi từ pool.map()')
khác:
raise AssertionError('expected ZeroDivisionError')
thử:
print(list(pool.imap(f, list(range(10)))))
ngoại trừ ZeroDivisionError:
print('\tGot ZeroDivisionError như mong đợi từ list(pool.imap())')
khác:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
cho tôi trong phạm vi (10):
thử:
x = tiếp theo(nó)
ngoại trừ ZeroDivisionError:
nếu tôi == 5:
vượt qua
ngoại trừ StopIteration:
phá vỡ
khác:
nếu tôi == 5:
raise AssertionError('expected ZeroDivisionError')
khẳng định tôi == 9
print('\tGot ZeroDivisionError như mong đợi từ IMapIterator.next()')
in()
#
hết thời gian chờ # Testing
#
print('Đang kiểm tra ApplyResult.get() khi hết thời gian chờ:', end=' ')
res = pool.apply_async(tính toán, TASKS[0])
trong khi 1:
sys.stdout.flush()
thử:
sys.stdout.write('\n\t%s' % res.get(0.02))
phá vỡ
ngoại trừ đa xử lý.TimeoutError:
sys.stdout.write('.')
in()
in()
print('Đang kiểm tra IMapIterator.next() khi hết thời gian chờ:', end=' ')
it = pool.imap(calatestar, TASKS)
trong khi 1:
sys.stdout.flush()
thử:
sys.stdout.write('\n\t%s' % it.next(0.02))
ngoại trừ StopIteration:
phá vỡ
ngoại trừ đa xử lý.TimeoutError:
sys.stdout.write('.')
in()
in()
nếu __name__ == '__main__':
đa xử lý.freeze_support()
kiểm tra()
Một ví dụ cho thấy cách sử dụng hàng đợi để cung cấp tác vụ cho một tập hợp các quy trình công nhân và thu thập kết quả:
thời gian nhập khẩu
nhập khẩu ngẫu nhiên
từ quá trình nhập đa xử lý, Hàng đợi, current_process, đóng băng_support
#
# Function được điều hành bởi quy trình công nhân
Công nhân #
def (đầu vào, đầu ra):
đối với func, lập luận trong iter(input.get, 'STOP'):
kết quả = tính toán(func, args)
đầu ra.put(kết quả)
#
# Function dùng để tính kết quả
#
def tính toán (func, args):
kết quả = func(*args)
return '%s nói rằng %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions được tham chiếu bởi các tác vụ
#
def mul(a, b):
time.sleep(0.5*random.random())
trả về a * b
def cộng (a, b):
time.sleep(0.5*random.random())
trả về a + b
#
#
kiểm tra #
def():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) cho i trong phạm vi (20)]
TASKS2 = [(cộng, (i, 8)) cho i trong phạm vi (10)]
hàng đợi # Create
task_queue = Hàng đợi()
done_queue=Xếp hàng()
nhiệm vụ # Submit
cho nhiệm vụ trong TASKS1:
task_queue.put(task)
quy trình công nhân # Start
cho tôi trong phạm vi (NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get và in kết quả
print('Kết quả không có thứ tự:')
cho tôi trong phạm vi(len(TASKS1)):
print('\t', done_queue.get())
# Add nhiều tác vụ hơn bằng cách sử dụng `put()`
cho nhiệm vụ trong TASKS2:
task_queue.put(task)
# Get và in thêm một số kết quả
cho tôi trong phạm vi(len(TASKS2)):
print('\t', done_queue.get())
Quá trình con # Tell dừng lại
cho tôi trong phạm vi (NUMBER_OF_PROCESSES):
task_queue.put('STOP')
nếu __name__ == '__main__':
đóng băng_support()
kiểm tra()