4

Im trying to merge two columns of different datatypes. In the code snippet below I pick the columns from the same data frame for simplicity.

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import datetime


a=sc.parallelize([('ship1',datetime(2015,1,1),2,3.,4.),('ship1',datetime(2015,1,2),4,8.,9.),('ship1',datetime(2015,1,3),5,39.,49.),('ship2',datetime(2015,1,4),2,3.,4.),('ship2',datetime(2015,1,5),4,4.,6.),('ship3',datetime(2015,1,15),33,56.,6.),('ship3',datetime(2015,1,12),3,566.,64.),('ship4',datetime(2015,1,5),3,3.,None)])


schemaString = "name time ROT SOG COG"
strtype=[StringType(),TimestampType(),IntegerType(),FloatType(),FloatType()]
fields = [StructField(schemaString.split()[i], strtype[i],True) for i in range(0,len(strtype))]
schema=StructType(fields)
df=sqlContext.createDataFrame(a,schema)

df.show()

+-----+--------------------+---+-----+----+
| name|                time|ROT|  SOG| COG|
+-----+--------------------+---+-----+----+
|ship1|2015-01-01 00:00:...|  2|  3.0| 4.0|
|ship1|2015-01-02 00:00:...|  4|  8.0| 9.0|
|ship1|2015-01-03 00:00:...|  5| 39.0|49.0|
|ship2|2015-01-04 00:00:...|  2|  3.0| 4.0|
|ship2|2015-01-05 00:00:...|  4|  4.0| 6.0|
|ship3|2015-01-15 00:00:...| 33| 56.0| 6.0|
|ship3|2015-01-12 00:00:...|  3|566.0|64.0|
|ship4|2015-01-05 00:00:...|  3|  3.0|null|
+-----+--------------------+---+-----+----+

when I extract two columns from df, into new DataFrames and try to merge them with df.withColumn()

b=df.select("time")
c=df.select("SOG")
d=b.withColumn("SOG",c.SOG)

I get this:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-48-4845b5dc1c80> in <module>()
      4 c=aa.select("SOG")
      5 
----> 6 d=b.withColumn("SOG",c.SOG)

/hadoop-dist/spark/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

/hadoop-dist/spark/python/pyspark/sql/dataframe.pyc in select(self, *cols)
    719         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
    720         """
--> 721         jdf = self._jdf.select(self._jcols(*cols))
    722         return DataFrame(jdf, self.sql_ctx)
    723 

/hadoop-dist/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/hadoop-dist/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o231.select.
: org.apache.spark.sql.AnalysisException: resolved attribute(s) SOG#3 missing from time#1 in operator !Project [time#1,SOG#3 AS SOG#34];
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
    at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
    at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

Im I doing something stupid, or is it some kind of bug? (I am able to merge columns of a similar datatype)

1
  • Could you explain what you mean by I am able to merge columns of a similar datatype? Commented Oct 28, 2015 at 14:31

1 Answer 1

7

What your are trying to achieve here is simply not supported. When you use DataFrame.withColumn, column expression can reference only the columns from a given data frame. It is not possible to add a column based on the data from an another table.

If you want to merge columns from multiple data frames you have to use join. It means you need either a natural key which can be used to match corresponding or performed join based on a row numbers. The latter one is possible only if you can ensure that order of rows in both data frames is exactly the same.

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks alot for clearing that up for me! Turns out I was attempting something stupid ;)
I was able to use withColumn in the manner described above when b=c, and I was mislead to think it was a datatype issue.
Joining based on row numbers is clever.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.