5

I've been playing with Spark and Python on this online jupyter notebook https://tmpnb.org/ and tried 3 ways to pass python functions:

1) using map

import numpy as np
def my_sqrt(x):
    return np.sqrt(x)

sc.parallelize(range(10)).map(my_sqrt).collect()

2) parallelizing my_sqrt and call it

sc.parallelize([(my_sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()

3) parallelizing np.sqrt and call it

sc.parallelize([(np.sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()

(1) and (3) do work and (2) doesn't. First I would like to understand why/how (1) and (3) work. Second, I would like to understand why (2) doesn't and what could be done to make it work.

1 Answer 1

6

The first approach works because Spark is using special serialization strategy to process closures required for transformations which is significantly slower but more powerful than the standard pickle (otherwise we couldn't use .map(lambda x: ...)).

The last approach works because there is no need to serialize function code at all. It references sqrt from numpy module so as long as NumPy is accessible on each worker there is no problem at all.

The second approach doesn't work because pickling doesn't serialize the code.

import pickle

pickle.dumps(my_sqrt)
## b'\x80\x03c__main__\nmy_sqrt\nq\x00.'

All it does it states please give me an object assigned to my_sqrt (my_sqrt.__name__) from the top-level script environment (a.k.a. __main__). When it is executed on the workers it doesn't use the same environment and there is no such object in the scope anymore, hence the exception. To be clear it is neither a bug or something specific to Spark. You can easily reproduce the same behavior locally as follows:

In [1]: import pickle

In [2]: def foo(): ...

In [3]: foo_ = pickle.dumps(foo)

In [4]: pickle.loads(foo_)
Out[4]: <function __main__.foo>

In [5]: del foo

In [6]: pickle.loads(foo_)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
...

AttributeError: Can't get attribute 'foo' on <module '__main__'>

Since it doesn't concern itself with an actual value you can even reassign like this:

In [7]: foo = "foo"

In [8]: pickle.loads(foo_)
Out[8]: 'foo'

Take away message here is if you want to use a function this way put it in a separate module and distribute it among the workers the same way as you do with other dependencies, including custom classes definitions.

Sign up to request clarification or add additional context in comments.

2 Comments

can you please post a link to some material that expands on the "special serialization strategy" that spark uses instead of pickle?
@x89a10 Spark is using a modified version of cloudpickle. There are some other pieces scattered around but this should give you enough to understand the difference.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.