38

I am using python 2.7, I have some code that looks like this:

task1()
task2()
task3()
dependent1()

task4()
task5()
task6()
dependent2()

dependent3()

The only dependencies here are as follows: dependent1 needs to wait for tasks1-3, dependent2 needs to wait for tasks 4-6 and dependent3 needs to wait for dependents1-2... The following would be okay: running the whole 6 tasks first in parallel, then the first two dependents in parallel.. then the final dependent

I prefer to have as much tasks as possible running in parallel, I've googled for some modules but I was hoping to avoid external libraries, and not sure how the Queue-Thread technique can solve my problem (maybe someone can recommend a good resource?)

3
  • I recommend using the standard library's Queue.task_done and Queue.join methods to synchronize the threads. At the bottom of the page in the Queue docs, you'll find an example of how to wait for other threads to finish their tasks: docs.python.org/library/queue.html#Queue.Queue.join Commented Nov 23, 2011 at 12:30
  • If your code gets more complex, it's worth looking at external libraries, because there are already things to deal with running tasks in parallel while making sure dependencies run in order. Commented Nov 23, 2011 at 13:25
  • Because of the GIL, the threads will only run one at a time in standard python. Future versions of Pypy using STM may get round this though. Commented Nov 30, 2015 at 10:39

3 Answers 3

43

The builtin threading.Thread class offers all you need: start to start a new thread and join to wait for the end of a thread.

import threading

def task1():
    pass
def task2():
    pass
def task3():
    pass
def task4():
    pass
def task5():
    pass
def task6():
    pass

def dep1():
    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)
    t3 = threading.Thread(target=task3)

    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()

def  dep2():
    t4 = threading.Thread(target=task4)
    t5 = threading.Thread(target=task5)

    t4.start()
    t5.start()

    t4.join()
    t5.join()

def dep3():
    d1 = threading.Thread(target=dep1)
    d2 = threading.Thread(target=dep2)

    d1.start()
    d2.start()

    d1.join()
    d2.join()

d3 = threading.Thread(target=dep3)
d3.start()
d3.join()

Alternatively to join you can use Queue.join to wait for the threads end.

Sign up to request clarification or add additional context in comments.

4 Comments

This is great! but my task functions return values that I use in the dep functions, how do I get the returned values from t1,t2,t3 etc.. ?
What if I want to pass some arguments into the function?
You can pass args and kwargs to the threading.Thread. Please have a look at the documentation: docs.python.org/3.4/library/threading.html#threading.Thread
5

If you are willing to give external libraries a shot, you can express tasks and their dependencies elegantly with Ray. This works well on a single machine, the advantage here is that parallelism and dependencies can be easier to express with Ray than with python multiprocessing and it doesn't have the GIL (global interpreter lock) problem that often prevents multithreading from working efficiently. In addition it is very easy to scale the workload up on a cluster if you need to in the future.

The solution looks like this:

import ray

ray.init()

@ray.remote
def task1():
    pass

@ray.remote
def task2():
    pass

@ray.remote
def task3():
    pass

@ray.remote
def dependent1(x1, x2, x3):
    pass

@ray.remote
def task4():
    pass

@ray.remote
def task5():
    pass

@ray.remote
def task6():
    pass

@ray.remote
def dependent2(x1, x2, x3):
    pass

@ray.remote
def dependent3(x, y):
    pass

id1 = task1.remote()
id2 = task2.remote()
id3 = task3.remote()

dependent_id1 = dependent1.remote(id1, id2, id3)

id4 = task4.remote()
id5 = task5.remote()
id6 = task6.remote()

dependent_id2 = dependent2.remote(id4, id5, id6)

dependent_id3 = dependent3.remote(dependent_id1, dependent_id2)

ray.get(dependent_id3) # This is optional, you can get the results if the tasks return an object

You can also pass actual python objects between the tasks by using the arguments inside of the tasks and returning the results (for example saying "return value" instead of the "pass" above).

Using "pip install ray" the above code works out of the box on a single machine, and it is also easy to parallelize applications on a cluster, either in the cloud or your own custom cluster, see https://ray.readthedocs.io/en/latest/autoscaling.html and https://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html). That might come in handy if your workload grows later on.

Disclaimer: I'm one of the developers of Ray.

Comments

0

Look at Gevent.

Example Usage:

import gevent
from gevent import socket

def destination(jobs):
    gevent.joinall(jobs, timeout=2)
    print [job.value for job in jobs]

def task1():
    return gevent.spawn(socket.gethostbyname, 'www.google.com')

def task2():
    return gevent.spawn(socket.gethostbyname, 'www.example.com')

def task3():
    return gevent.spawn(socket.gethostbyname, 'www.python.org')

jobs = []
jobs.append(task1())
jobs.append(task2())
jobs.append(task3())
destination(jobs)

Hope, this is what you have been looking for.

1 Comment

Really? The OP asked for a multithreading solution using a Queue/Thread technique and wanted to avoid external libraries. But you point him to a nest of external dependencies and ignore basic solutions provided by the standard library.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.