0

Is it possible in pyspark to use the parallelize function over python objects? I want to run on parallel on a list of objects, modified them using a function, and then print these objects.

def init_spark(appname):
  spark = SparkSession.builder.appName(appname).getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def run_on_configs_spark(object_list):
  spark,sc = init_spark(appname="analysis")
  p_configs_RDD = sc.parallelize(object_list)
  p_configs_RDD=p_configs_RDD.map(func)
  p_configs_RDD.foreach(print)

def func(object):
  return do-somthing(object)

When I run the above code, I encounter an error of "AttributeError: Can't get attribute 'Object' on <module 'pyspark.daemon' from...> ". How can I solve it?

I did the following workaround. But I don't think it is a good solution in general, and it assumes I can change the constructor of the object.

I have converted the object into a dictionary, and construed the object from the directory.

 def init_spark(appname):
  spark = SparkSession.builder.appName(appname).getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def run_on_configs_spark(object_list):
  spark,sc = init_spark(appname="analysis")
  p_configs_RDD = sc.parallelize([x.__dict__() for x in object_list])
  p_configs_RDD=p_configs_RDD.map(func)
  p_configs_RDD.foreach(print)

def func(dict):
  object=CreateObject(create_from_dict=True,dictionary=dict)
  return do-something(object)

In the constructor of the Object:

class Object:
   def __init__(create_from_dict=False,dictionary=None, other_params...):
      if(create_from_dict):
        self.__dict__.update(dictionary)
        return

Are there any better solutions?

2
  • What improvement are you looking for when you ask if there are better solutions? Also, when you say Python object, do you mean dictionary or some other, custom object? Commented Aug 3, 2020 at 21:11
  • A modular way to run over general created objects (see above, in the "Object class") without modifying the object constructor. Commented Aug 4, 2020 at 8:58

1 Answer 1

1

Well for better answer I suggest you post a sample of the object_list and your desired output so we can test with real code.

enter image description here According to pyspark docs (as above) parallelize function should accept any collection, so I think the problem might be the object_list. I see the workaround can work since the input type is a list of dictionary (or other mapping object)

As for a modular way to run over general created objects, it's depend on how you want the RDD to be, but the general way should be converting the whole object you want into a collection type object. One solution without modifying constructor / structure can be

sc.parallelize([object_list])

The key point is to ensure that the input is in collection type.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.