Trong hướng dẫn này, chúng ta sẽ tìm hiểu chi tiết về Multiprocessing trong Python thông qua các ví dụ.
Multiprocessing trong Python
Ngày nay, xử lý song song (Parallel processing) đang nhận được nhiều sự chú ý hơn. Nếu bạn vẫn chưa biết về xử lý song song, hãy tìm hiểu từ Wikipedia. Khi các nhà sản xuất CPU bắt đầu thêm ngày càng nhiều số lượng lõi (core) vào bộ xử lý của họ, việc tạo ra mã song song (parallel code) là một cách tuyệt vời để cải thiện hiệu suất.
Python đã giới thiệu module multiprocessing
để cho phép chúng ta viết mã song song. Để hiểu được động lực chính của module này, chúng ta cần biết một số kiến thức cơ bản về lập trình song song. Hy vọng sau khi đọc bài viết này, bạn sẽ có thêm kiến thức nền tảng về chủ đề này.
Python Multiprocessing: Process, Queue và Locks
Trong module multiprocessing
của Python có rất nhiều lớp (class) để xây dựng một chương trình song song. Trong số đó, ba lớp cơ bản là Process
, Queue
và Lock
. Các lớp này sẽ giúp bạn xây dựng một chương trình song song.
Nhưng trước khi mô tả về chúng, hãy bắt đầu chủ đề này với một đoạn mã đơn giản. Để làm cho một chương trình song song trở nên hữu ích, bạn phải biết máy tính của mình có bao nhiêu lõi xử lý. Module Multiprocessing
của Python cho phép bạn biết điều đó. Đoạn mã đơn giản sau đây sẽ in ra số lượng lõi trên máy tính của bạn:
import multiprocessing
print("Number of cpu : ", multiprocessing.cpu_count())
Output sau đây có thể khác nhau tùy thuộc vào máy tính của bạn. Với tôi, số lượng core là 8.
Lớp Process trong Python Multiprocessing
Lớp Process
trong module multiprocessing
của Python là một sự trừu tượng (abstraction) giúp thiết lập một tiến trình Python khác, cung cấp cho nó khả năng chạy mã và một cách để ứng dụng cha kiểm soát việc thực thi. Có hai hàm quan trọng thuộc về lớp Process
là hàm start()
và join()
.
Đầu tiên, chúng ta cần viết một hàm mà tiến trình sẽ chạy. Sau đó, chúng ta cần khởi tạo một đối tượng tiến trình. Nếu chúng ta chỉ tạo một đối tượng tiến trình, sẽ không có gì xảy ra cho đến khi chúng ta yêu cầu nó bắt đầu xử lý thông qua hàm start(). Khi đó, tiến trình sẽ chạy và trả về kết quả của nó. Sau đó, chúng ta yêu cầu tiến trình hoàn thành thông qua hàm join()
. Nếu không có lời gọi hàm join()
, tiến trình sẽ vẫn ở trạng thái idle(nhàn rỗi) và sẽ không kết thúc.
Vì vậy, nếu bạn tạo nhiều tiến trình mà không kết thúc chúng, bạn có thể gặp phải tình trạng thiếu tài nguyên. Khi đó, bạn có thể cần phải kết thúc chúng theo cách thủ công. Một điều quan trọng là, nếu bạn muốn truyền bất kỳ đối số nào thông qua tiến trình, bạn cần sử dụng đối số từ khóa args
. Đoạn mã sau đây sẽ hữu ích để hiểu cách sử dụng lớp Process.
from multiprocessing import Process
def print_func(continent='Asia'):
print('The name of continent is : ', continent)
if __name__ == "__main__": # confirms that the code is under main function
names = ['America', 'Europe', 'Africa']
procs = []
proc = Process(target=print_func) # instantiating without any argument
procs.append(proc)
proc.start()
# instantiating process with arguments
for name in names:
# print(name)
proc = Process(target=print_func, args=(name,))
procs.append(proc)
proc.start()
# complete the processes
for proc in procs:
proc.join()
Kết quả đầu ra của đoạn mã sau sẽ là:
Lớp Queue trong Python Multiprocessing
Nếu bạn đã có kiến thức cơ bản về cấu trúc dữ liệu máy tính, có lẽ bạn đã biết về Queue
. Module Multiprocessing
của Python cung cấp lớp Queue
– là một cấu trúc dữ liệu chính xác là First-In-First-Out (FIFO) (Vào trước Ra trước).
Chúng có thể lưu trữ bất kỳ đối tượng Python nào, có thể pickle nào và cực kỳ hữu ích cho việc chia sẻ dữ liệu giữa các tiến trình. Queues đặc biệt hữu ích khi được truyền làm tham số cho hàm target
của một Process để cho phép Process đó tiêu thụ dữ liệu. Bằng cách sử dụng hàm put()
, chúng ta có thể chèn dữ liệu vào hàng đợi và sử dụng get()
để lấy các item từ hàng đợi. Hãy xem đoạn mã sau đây để có một ví dụ nhanh.
from multiprocessing import Queue
colors = ['red', 'green', 'blue', 'black']
cnt = 1
# instantiating a queue object
queue = Queue()
print('pushing items to queue:')
for color in colors:
print('item no: ', cnt, ' ', color)
queue.put(color)
cnt += 1
print('\\npopping items from queue:')
cnt = 0
while not queue.empty():
print('item no: ', cnt, ' ', queue.get())
cnt += 1
Lớp Lock trong Python Multiprocessing
Nhiệm vụ của lớp Lock khá đơn giản. Nó cho phép mã khóa để không có tiến trình nào khác có thể thực thi đoạn mã tương tự cho đến khi khóa đã được release. Vì vậy, nhiệm vụ của lớp Lock chủ yếu có hai phần: một là để yêu cầu khóa và hai là để nhả khóa. Để yêu cầu khóa, hàm acquire()
được sử dụng, và để nhả khóa, hàm release()
được sử dụng.
Ví dụ về Python Multiprocessing
Trong ví dụ về Python multiprocessing này, chúng ta sẽ kết hợp tất cả kiến thức đã học. Giả sử chúng ta có một số nhiệm vụ cần hoàn thành. Để thực hiện nhiệm vụ đó, chúng ta sẽ sử dụng một số tiến trình.
Vì vậy, chúng ta sẽ duy trì hai queue. Một sẽ chứa các nhiệm vụ và hàng đợi còn lại sẽ chứa nhật ký (log) của các nhiệm vụ đã hoàn thành. Sau đó, chúng ta sẽ khởi tạo các tiến trình để hoàn thành nhiệm vụ.
Lưu ý rằng lớp Queue
của Python đã được đồng bộ hóa. Điều đó có nghĩa là chúng ta không cần sử dụng lớp Lock
để chặn nhiều tiến trình truy cập cùng một đối tượng Queue
. Đó là lý do tại sao chúng ta không cần sử dụng lớp Lock
trong trường hợp này.
Dưới đây là phần triển khai nơi chúng ta đang thêm các nhiệm vụ vào hàng đợi, sau đó tạo và khởi động các tiến trình, sau đó sử dụng join()
để hoàn thành các tiến trình. Cuối cùng, chúng ta đang in nhật ký từ hàng đợi thứ hai.
from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception
def do_job(tasks_to_accomplish, tasks_that_are_done):
while True:
try:
'''
try to get task from the queue. get_nowait() function will
raise queue.Empty exception if the queue is empty.
queue(False) function would do the same task also.
'''
task = tasks_to_accomplish.get_nowait()
except queue.Empty:
break
else:
'''
if no exception has been raised, add the task completion
message to task_that_are_done queue
'''
print(task)
tasks_that_are_done.put(task + ' is done by ' + current_process().name)
time.sleep(.5)
return True
def main():
number_of_task = 10
number_of_processes = 4
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
processes = []
for i in range(number_of_task):
tasks_to_accomplish.put("Task no " + str(i))
# creating processes
for w in range(number_of_processes):
p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
processes.append(p)
p.start()
# completing process
for p in processes:
p.join()
# print the output
while not tasks_that_are_done.empty():
print(tasks_that_are_done.get())
return True
if __name__ == '__main__':
main()
Tùy thuộc vào số lượng tác vụ, đoạn mã sẽ mất một khoảng thời gian để hiển thị kết quả. Kết quả đầu ra của đoạn mã sau sẽ thay đổi theo thời gian.
Python multiprocessing Pool
Python multiprocessing Pool có thể được sử dụng để thực thi song song một hàm trên nhiều giá trị đầu vào, phân phối dữ liệu đầu vào trên các tiến trình (hay còn gọi là song song dữ liệu – data parallelism). Dưới đây là một ví dụ đơn giản về Python multiprocessing Pool.
from multiprocessing import Pool
import time
work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])
def work_log(work_data):
print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
time.sleep(int(work_data[1]))
print(" Process %s Finished." % work_data[0])
def pool_handler():
p = Pool(2)
p.map(work_log, work)
if __name__ == '__main__':
pool_handler()
Hình ảnh dưới đây cho thấy kết quả đầu ra của chương trình trên. Lưu ý rằng kích thước pool là 2, nên hai lần thực thi hàm work_log
đang diễn ra song song. Khi một trong các hàm xử lý xong, nó sẽ chọn đối số tiếp theo và cứ thế tiếp tục
Vậy là chúng ta đã hoàn thành việc tìm hiểu tổng quan về module multiprocessing
của Python.