Reading Time: 6 minutes

Trong hướng dẫn này, chúng ta sẽ tìm hiểu chi tiết về Multiprocessing trong Python thông qua các ví dụ.

Multiprocessing trong Python

Multiprocessing trong Python

Ngày nay, xử lý song song (Parallel processing) đang nhận được nhiều sự chú ý hơn. Nếu bạn vẫn chưa biết về xử lý song song, hãy tìm hiểu từ Wikipedia. Khi các nhà sản xuất CPU bắt đầu thêm ngày càng nhiều số lượng lõi (core) vào bộ xử lý của họ, việc tạo ra mã song song (parallel code) là một cách tuyệt vời để cải thiện hiệu suất.

Python đã giới thiệu module multiprocessing để cho phép chúng ta viết mã song song. Để hiểu được động lực chính của module này, chúng ta cần biết một số kiến thức cơ bản về lập trình song song. Hy vọng sau khi đọc bài viết này, bạn sẽ có thêm kiến thức nền tảng về chủ đề này.

Python Multiprocessing: Process, Queue và Locks

Trong module multiprocessing của Python có rất nhiều lớp (class) để xây dựng một chương trình song song. Trong số đó, ba lớp cơ bản là Process, QueueLock. Các lớp này sẽ giúp bạn xây dựng một chương trình song song.

Nhưng trước khi mô tả về chúng, hãy bắt đầu chủ đề này với một đoạn mã đơn giản. Để làm cho một chương trình song song trở nên hữu ích, bạn phải biết máy tính của mình có bao nhiêu lõi xử lý. Module Multiprocessing của Python cho phép bạn biết điều đó. Đoạn mã đơn giản sau đây sẽ in ra số lượng lõi trên máy tính của bạn:

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

Output sau đây có thể khác nhau tùy thuộc vào máy tính của bạn. Với tôi, số lượng core là 8.

python multiprocessing cpu count

Lớp Process trong Python Multiprocessing

Lớp Process trong module multiprocessing của Python là một sự trừu tượng (abstraction) giúp thiết lập một tiến trình Python khác, cung cấp cho nó khả năng chạy mã và một cách để ứng dụng cha kiểm soát việc thực thi. Có hai hàm quan trọng thuộc về lớp Process là hàm start()join().

Đầu tiên, chúng ta cần viết một hàm mà tiến trình sẽ chạy. Sau đó, chúng ta cần khởi tạo một đối tượng tiến trình. Nếu chúng ta chỉ tạo một đối tượng tiến trình, sẽ không có gì xảy ra cho đến khi chúng ta yêu cầu nó bắt đầu xử lý thông qua hàm start(). Khi đó, tiến trình sẽ chạy và trả về kết quả của nó. Sau đó, chúng ta yêu cầu tiến trình hoàn thành thông qua hàm join(). Nếu không có lời gọi hàm join(), tiến trình sẽ vẫn ở trạng thái idle(nhàn rỗi) và sẽ không kết thúc.

Vì vậy, nếu bạn tạo nhiều tiến trình mà không kết thúc chúng, bạn có thể gặp phải tình trạng thiếu tài nguyên. Khi đó, bạn có thể cần phải kết thúc chúng theo cách thủ công. Một điều quan trọng là, nếu bạn muốn truyền bất kỳ đối số nào thông qua tiến trình, bạn cần sử dụng đối số từ khóa args. Đoạn mã sau đây sẽ hữu ích để hiểu cách sử dụng lớp Process.

from multiprocessing import Process

def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # instantiating process with arguments
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # complete the processes
    for proc in procs:
        proc.join()

Kết quả đầu ra của đoạn mã sau sẽ là:

python multiprocessing process là gì

Lớp Queue trong Python Multiprocessing

Nếu bạn đã có kiến thức cơ bản về cấu trúc dữ liệu máy tính, có lẽ bạn đã biết về Queue. Module Multiprocessing của Python cung cấp lớp Queue – là một cấu trúc dữ liệu chính xác là First-In-First-Out (FIFO) (Vào trước Ra trước).

Chúng có thể lưu trữ bất kỳ đối tượng Python nào, có thể pickle nào và cực kỳ hữu ích cho việc chia sẻ dữ liệu giữa các tiến trình. Queues đặc biệt hữu ích khi được truyền làm tham số cho hàm target của một Process để cho phép Process đó tiêu thụ dữ liệu. Bằng cách sử dụng hàm put(), chúng ta có thể chèn dữ liệu vào hàng đợi và sử dụng get() để lấy các item từ hàng đợi. Hãy xem đoạn mã sau đây để có một ví dụ nhanh.

from multiprocessing import Queue

colors = ['red', 'green', 'blue', 'black']
cnt = 1
# instantiating a queue object
queue = Queue()
print('pushing items to queue:')
for color in colors:
    print('item no: ', cnt, ' ', color)
    queue.put(color)
    cnt += 1

print('\\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

Tìm hiểu về python multiprocessing

Lớp Lock trong Python Multiprocessing

Nhiệm vụ của lớp Lock khá đơn giản. Nó cho phép mã khóa để không có tiến trình nào khác có thể thực thi đoạn mã tương tự cho đến khi khóa đã được release. Vì vậy, nhiệm vụ của lớp Lock chủ yếu có hai phần: một là để yêu cầu khóa và hai là để nhả khóa. Để yêu cầu khóa, hàm acquire() được sử dụng, và để nhả khóa, hàm release() được sử dụng.

Ví dụ về Python Multiprocessing

Trong ví dụ về Python multiprocessing này, chúng ta sẽ kết hợp tất cả kiến thức đã học. Giả sử chúng ta có một số nhiệm vụ cần hoàn thành. Để thực hiện nhiệm vụ đó, chúng ta sẽ sử dụng một số tiến trình.

Vì vậy, chúng ta sẽ duy trì hai queue. Một sẽ chứa các nhiệm vụ và hàng đợi còn lại sẽ chứa nhật ký (log) của các nhiệm vụ đã hoàn thành. Sau đó, chúng ta sẽ khởi tạo các tiến trình để hoàn thành nhiệm vụ.

Lưu ý rằng lớp Queue của Python đã được đồng bộ hóa. Điều đó có nghĩa là chúng ta không cần sử dụng lớp Lock để chặn nhiều tiến trình truy cập cùng một đối tượng Queue. Đó là lý do tại sao chúng ta không cần sử dụng lớp Lock trong trường hợp này.

Dưới đây là phần triển khai nơi chúng ta đang thêm các nhiệm vụ vào hàng đợi, sau đó tạo và khởi động các tiến trình, sau đó sử dụng join() để hoàn thành các tiến trình. Cuối cùng, chúng ta đang in nhật ký từ hàng đợi thứ hai.

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception

def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True

def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # creating processes
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    # completing process
    for p in processes:
        p.join()

    # print the output
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True

if __name__ == '__main__':
    main()

Tùy thuộc vào số lượng tác vụ, đoạn mã sẽ mất một khoảng thời gian để hiển thị kết quả. Kết quả đầu ra của đoạn mã sau sẽ thay đổi theo thời gian.

python multiprocessing example

Python multiprocessing Pool

Python multiprocessing Pool có thể được sử dụng để thực thi song song một hàm trên nhiều giá trị đầu vào, phân phối dữ liệu đầu vào trên các tiến trình (hay còn gọi là song song dữ liệu – data parallelism). Dưới đây là một ví dụ đơn giản về Python multiprocessing Pool.

from multiprocessing import Pool

import time

work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])

def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])

def pool_handler():
    p = Pool(2)
    p.map(work_log, work)

if __name__ == '__main__':
    pool_handler()

Hình ảnh dưới đây cho thấy kết quả đầu ra của chương trình trên. Lưu ý rằng kích thước pool là 2, nên hai lần thực thi hàm work_log đang diễn ra song song. Khi một trong các hàm xử lý xong, nó sẽ chọn đối số tiếp theo và cứ thế tiếp tục

python multiprocessing pool

Vậy là chúng ta đã hoàn thành việc tìm hiểu tổng quan về module multiprocessing của Python.

 

0 Bình luận

Đăng nhập để thảo luận

Chuyên mục Hướng dẫn

Tổng hợp các bài viết hướng dẫn, nghiên cứu và phân tích chi tiết về kỹ thuật, các xu hướng công nghệ mới nhất dành cho lập trình viên.

Đăng ký nhận bản tin của chúng tôi

Hãy trở thành người nhận được các nội dung hữu ích của CyStack sớm nhất

Xem chính sách của chúng tôi Chính sách bảo mật.

Đăng ký nhận Newsletter

Nhận các nội dung hữu ích mới nhất