0

Hello I'm new here and I want to ask some question.Now I'm using python multiprocessing to process data in queue. Example I have 3 functions to calculate data from queue and in queue I have 3 datas. Is it possible to use pipeline technique with multiprocessing to make it more faster?

In this code I tried to use multiprocessing queue to communicate between multiprocessing process and use Lock to prevent other process to use data in queue before it done from previous function. but it


from multiprocessing import Process, current_process, cpu_count, Queue, Pool, Lock, Array
from threading import Thread, current_thread
import time
import os

def a(pid, q1, q2, lock):
    while not q1.empty():
        data = q1.get()
        print("data from q1 is %s" % data)
        # for i in range(1000000):
        new_data = data*2
        lock.acquire()
        q2.put(new_data)
        print(pid)
        lock.release()

def b(pid, q2, q3, lock):
    while not q2.empty():
        data = q2.get()
        print("data from q2 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*3
        q3.put(new_data)
        print(pid)
        lock.release()

def c(pid, q3, q4, lock):
    while not q3.empty():
        data = q3.get()
        print("data from q3 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*4
        q4.put(new_data)
        print(pid)
        lock.release()

if __name__ == "__main__":

    number = [1,2,3]

    lock = Lock()

    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    q4 = Queue()

    for data in number:
        q1.put(data)

    p1 = Process(target=a,args=(1, q1, q2, lock))
    p2 = Process(target=b,args=(2, q2, q3, lock))
    p3 = Process(target=c,args=(3, q3, q4, lock))

    p1.start()
    p2.start()
    p3.start()

    p1.join()   
    p2.join()
    p3.join()

    for i in range(q4.qsize()):
        print(q4.get())

I expect that the sequence of pipeline should be execute like this f1 | f1 f2 | f1 f2 f3 | f2 f3 | f3 and solution in queue is 24, 48, 72 if my information is correct. I try my best to explain how the things should work because this is my first time to ask something in stackoverflow and my english skill is not good and also I really need help.

1
  • Please tell us what you have already tried and what results you have achieved. Commented Apr 15, 2019 at 19:08

1 Answer 1

1

Your problem is that you are using q.empty() to terminate your loops. Some of those Queues will be empty at the start, and those Process will terminate too early. You need a different technique to let the p2 and p3 processes know when to quit.

Here is a modification of your code that uses None as a flag in the queues to signal when done:

from multiprocessing import Process, current_process, cpu_count, Queue, Pool, Lock, Array
from threading import Thread, current_thread
import time
import os

def a(pid, q1, q2, lock):
    while not q1.empty():
        data = q1.get()
        print("data from q1 is %s" % data)
        # for i in range(1000000):
        new_data = data*2
        lock.acquire()
        q2.put(new_data)
        print(pid)
        lock.release()
    q2.put(None)

def b(pid, q2, q3, lock):
    while True:
        data = q2.get()
        if data is None:
            q3.put(None)
            return
        print("data from q2 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*3
        q3.put(new_data)
        print(pid)
        lock.release()

def c(pid, q3, q4, lock):
    while True:
        data = q3.get()
        if data is None:
            return
        print("data from q3 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*4
        q4.put(new_data)
        print(pid)
        lock.release()

if __name__ == "__main__":

    number = [1,2,3]

    lock = Lock()

    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    q4 = Queue()

    for data in number:
        q1.put(data)

    p1 = Process(target=a,args=(1, q1, q2, lock))
    p2 = Process(target=b,args=(2, q2, q3, lock))
    p3 = Process(target=c,args=(3, q3, q4, lock))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    for i in range(q4.qsize()):
        print(q4.get())

Also, you don't actually need the Lock. According to the documentation:

The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.