0

I am using Spark 1.5.2 to create a data frame from scala object using of of the following syntax. My purpose is to create a data for for unit testing.

class Address (first:String = null, second: String = null, zip: String = null){}
class Person (id: String = null, name: String = null, address: Seq[Address] = null){}

def test () = {

  val sqlContext = new SQLContext(sc)

  import sqlContext.implicits._

  val persons = Seq(
    new Person(id = "1", name = "Salim", 
      address = Seq(new Address(first = "1st street"))),
    new Person(name = "Sana",
      address = Seq(new Address(zip = "60088")))
  )

  // The code can't infer schema automatically
  val claimDF = sqlContext.createDataFrame(sc.parallelize(persons, 2),classOf[Person])

  claimDF.printSchema() // This prints "root" not the schema of Person.
}

Instead if I convert the Person and Address to case class then Spark can inherit schema automatically using the above syntax or using sc.parallelize(persons, 2).toDF or using sqlContext.createDataFrame(sc.parallelize(persons, 2),StructType)

I can't use case class because it can't hold more than 20 fields and I have a lot of fields in the class. And using StructType causes a lot of inconvenience. Case class is most convenient but can't hold too many properties.

Please help, thanks in advance.

1
  • I think that if your classes extend the Product trait and implement its abstract methods it might work. (because of this signature: createDataFrame[A <: Product](data: Seq[A])) Commented Aug 11, 2016 at 20:27

2 Answers 2

1

Thanks a lot for the input.

We ultimately migrated to Spark 2.1 with Scala 2.11 which supported a larger case class so this issue got resolved.

For Spark 1.6 and Scala 2.10 I ended up building Row object and Struct type to build a Dataframe.

val rows = Seq(Row("data"))
val aRDD = sc.parallelize(rows)
val aDF = sqlContext.createDataFrame(aRDD,getSchema())

def getSchema(): StructType= {
    StructType(
        Array(
            StructField("jobNumber", StringType, nullable = true))
    )
}
Sign up to request clarification or add additional context in comments.

Comments

0

Two changes to your code will make printSchema() emit the full structure of your dataframe without using case classes.

Firstly, as Daniel suggested, you need to have your classes extend the scala.Product trait (painful, but required for the .toDF method below):

class Address (first:String = null, second: String = null, zip: String = null) extends Product with Serializable
{
  override def canEqual(that: Any) = that.isInstanceOf[Address]
  override def productArity: Int = 3
  def productElement(n: Int) = n match {
    case 0 => first; case 1 => second; case 2 => zip
  }
}

class Person (id: String = null, name: String = null, address: Seq[Address] = null) extends Product with Serializable
{
  override def canEqual(that: Any) = that.isInstanceOf[Person]
  override def productArity: Int = 3
  def productElement(n: Int) = n match {
    case 0 => id; case 1 => name; case 2 => address
  }
}

Secondly, you should create your dataframe using the .toDF implicit method that is brought into scope with import sqlContext.implicits._ rather than using sqlContext.createDataFrame(..) like so:

val claimDF = sc.parallelize(persons, 2).toDF

then claimDF.printSchema() will print:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- first: string (nullable = true)
 |    |    |-- second: string (nullable = true)
 |    |    |-- zip: string (nullable = true)

Alternatively, you could use Scala 2.11.0-M3 which removes the 22 field limit on case classes.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.