1

I was trying a hands on the spark dataframes. With previous knowledge from Cascading Framework which has a trap mechanism to filter out faulty rows (rows with null values) into a separate Tap called Trap. Those who are unaware let me make that clear. When you get a faulty row which has been read from a text file. The framework either scraps out the bad row from the entire data or stops the execution. Now in apache spark, I observed that the bad rows didn't hinder the execution. That is good but when it comes to getting business insights from data, Quality of the data Does matter!

So, I have a text file with bunch of rows in it (you may pick up any dataset, you like to), in which few records do contain null values. Now I load the text file into a Dataframe with spark.read.csv. Now, what I want to do is analyze the Dataframe and dynamically create a column named "isMyRowBad" where the logic will analyze each rows at a time and if the logic finds out the row which has a null value, it flags the isMyRowBad column on that particular row as true and the columns which do not have null values, the corresponding column isMyRowBad should have false for that purticular row which is clean.
Giving you the overview of the incoming and outgoing datasets
INCOMING DATAFRAME

fname,lname,age
will,smith,40
Dwayne,Nunn,36
Aniruddha,Sinha,
Maria,,22

OUTGOING DATAFRAME

fname,lname,age,isMyRowBad
 will,smith,40,false
 Dwayne,Nunn,36,false
 Aniruddha,Sinha,,true
 Maria,,22,true

The above method to classify good and bad rows might seem a little foolish but it does make sense since I will not need to run filter operation multiple times. let us take a look, how?

Suppose I have a Dataframe named inDf as inputDf and AnalysedDf:(DataFrame,DataFrame) as output Df Tuple

Now, I did try this part of code

val analyzedDf: (DataFrame, DataFrame) = (inputDf.filter(_.anyNull),inputDf.filter(!_.anyNull))

This code segregates good and bad rows. I agree! but this has a performance setback as filter runs two times which means filter will iterate all over the dataset for two times!( you may counter this point if you feel running filter two times does make sense when considering 50 fields and atleast 584000 rows ( that is 250 mb of data)!)

and this as well

val analyzedDf: DataFrame = inputDf.select("*").withColumn("isMyRowBad", <this point, I am not able to analyze row>

The above snippet shows where I am not able to figure out how to sweep the entire row and mark the row as bad with a boolean value.
Hope, you all got to understand what am I aiming to achieve. Please ignore the syntactical errors if you find in the snippets since I typed them here right away(will correct the same with future edits)

Please give me a hint(a little code snippet or a pseudo code will be enough) on how to proceed with the challenge. Please reach out to me if you didn't understand what I intend to do.

Any help will be greatly appreciated. Thanks in advance!

P.S: There are brilliant people out here on BigData/spark/hadoop/scala etc. Request you to kindly correct me on any point which I might have wrongly written(conceptually)


The below code give me a solution by the way. Please have a look

package aniruddha.data.quality

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
/**
  * Created by aniruddha on 8/4/17.
  */
object DataQualityCheck extends App {
  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  import spark.implicits._

  val schema: StructType = StructType(List(
    StructField("fname", StringType, nullable = true),
    StructField("lname", StringType, nullable = true),
    StructField("age", IntegerType, nullable = true),
    StructField("pan", StringType, nullable = true),
    StructField("married", StringType, nullable = true)
  ))

  val inputDataFrame: DataFrame = spark
    .read
    .schema(schema)
    .option("header",true)
    .option("delimiter",",")
    .csv("inputData/infile")

  //inputDataFrame.show()

  val analysedDataFrame: DataFrame = inputDataFrame.select("*").withColumn("isRowBad", when($"pan".isNull||$"lname".isNull||$"married".isNull,true).otherwise(false))

  analysedDataFrame show
}

input

fname,lname,age,pan,married
aniruddha,sinha,23,0AA22,no
balajee,venkatesh,23,0b96,no
warren,shannon,72,,
wes,borland,63,0b22,yes
Rohan,,32,0a96,no
james,bond,66,007,no

output

+---------+---------+---+-----+-------+--------+
|    fname|    lname|age|  pan|married|isRowBad|
+---------+---------+---+-----+-------+--------+
|aniruddha|    sinha| 23|0AA22|     no|   false|
|  balajee|venkatesh| 23| 0b96|     no|   false|
|   warren|  shannon| 72| null|   null|    true|
|      wes|  borland| 63| 0b22|    yes|   false|
|    Rohan|     null| 32| 0a96|     no|    true|
|    james|     bond| 66|  007|     no|   false|
+---------+---------+---+-----+-------+--------+

The code works fine but I have a problem with the when function. Can't we just select all the columns without hardcoding it?

1
  • Is there anybody out there? A hint can be okay Commented Apr 9, 2017 at 21:13

2 Answers 2

2

As far as I know, you can't do this with the inbuilt csv parser. You can get the parser to stop if it hits an error (failFast mode), but not annotate.

However, you could do this with a custom csv parser, that can process the data in a single pass. Unless we want to do some clever type introspection, it is easiest if we create a helper class to annotate the structure of the file:

case class CSVColumnDef(colPos: Int, colName: String, colType: String)

val columns = List(CSVColumnDef(0,"fname","String"),CSVColumnDef(1,"lname","String"),CSVColumnDef(2,"age", "Int"))

Next, we need some functions to a) split the input, b) extract data from split data, c) check if row is bad:

import scala.util.Try
def splitToSeq(delimiter: String) = udf[Seq[String],String](_.split(delimiter))

def extractColumnStr(i: Int) = udf[Option[String],Seq[String]](s => Try(Some(s(i))).getOrElse(None))
def extractColumnInt(i: Int) = udf[Option[Int],Seq[String]](s => Try(Some(s(i).toInt)).getOrElse(None))

def isRowBad(delimiter: String) = udf[Boolean,String](s => {
   (s.split(delimiter).length != columns.length) || (s.split(delimiter).exists(_.length==0))
})

To use these, we first need to read in the text file (since I don't have it, and to allow people to replicate this answer, I will create an rdd):

val input = sc.parallelize(List(("will,smith,40"),("Dwayne,Nunn,36"),("Aniruddha,Sinha,"),("Maria,,22")))
input.take(5).foreach(println)

Given this input, we can create a dataframe with a single column, the raw line, and add our split column to it:

val delimiter = ","
val raw = "raw"
val delimited = "delimited"
val compDF = input.toDF(raw).withColumn(delimited, splitToSeq(delimiter)(col(raw)))

Finally, we can extract all the columns we previously defined, and check if the rows are bad:

val df = columns.foldLeft(compDF){case (acc,column) => column.colType match {
   case "Int" => acc.withColumn(column.colName, extractColumnInt(column.colPos)(col(delimited)))
   case _ => acc.withColumn(column.colName, extractColumnStr(column.colPos)(col(delimited)))
}}.
withColumn("isMyRowBad", isRowBad(delimiter)(col(raw))).
drop(raw).drop(delimited)

df.show
df.printSchema

The nice thing about this solution is that the spark execution planner is smart enough to build all of those .withColumn operations into a single pass (map) over the data, without zero shuffling. The annoying thing is that it is a lot more dev work than using a nice shiny csv library, and we need to define the columns somehow. If you wanted to be a bit more clever, you could get the column names from the first line of the file (hint: .mapPartitionsWithIndex), and just parse everything as a string. We also can't define a case class to describe the entire DF, since you have too many columns to approach the solution that way. Hope this helps...

Sign up to request clarification or add additional context in comments.

8 Comments

+1 as this solution works exactly as I aim to achieve. will be debugging and try to understand the way it works. The explanation given by you is great still I need to understand this. Will reach out to you when I face some doubts. Please bear with me for a while :) Thankyou by the way
If you wanted to be a bit more clever, you could get the column names from the first line of the file (hint: .mapPartitionsWithIndex), and just parse everything as a string. This could have been one of an approach but I thought about the datasets which may not have a header. We do get such datasets. right?
We also can't define a case class to describe the entire DF, since you have too many columns to approach the solution that way uhh that's actually true which leave us with no choice than to compromise type safety by using dataframe
The solution was quite clever in terms of approach. Well I was not able to think of this sort as I did not dig quite a lot into RDDs . You motivated me now. :D Thanks again. Accepted!
val columns = List(CSVColumnDef(0,"fname","String"),CSVColumnDef(1,"lname","String"),CSVColumnDef(2,"age", "Int")) Can't I snoop in a strict schema like StrucType(Array(StructField(..........)) sort of syntax??
|
2

This can be done using udf. Although the answer given by Ben Horsburgh is definitely brilliant, yet we can do this without getting much into internal architecture behind Dataframes.
The following code can give you an idea

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * Created by vaijnath on 10/4/17.
  */
object DataQualityCheck extends App {
  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  import spark.implicits._

  val schema: StructType = StructType(List(
    StructField("fname", StringType, nullable = true),
    StructField("lname", StringType, nullable = true),
    StructField("married", StringType, nullable = true)
  ))

  val inputDataFrame: DataFrame = spark
    .read
    .schema(schema)
    .option("header",false)
    .option("delimiter",",")
    .csv("hydrograph.engine.spark/testData/inputFiles/delimitedInputFile.txt")

  //inputDataFrame.show()

  def isBad(row:Row):Boolean={
    row.anyNull
  }
  val simplefun=udf(isBad(_:Row))
  val cols=struct(inputDataFrame.schema.fieldNames.map(e=> col(e)):_*)
  //      println(cols+"******************") //for debugging
  val analysedDataFrame: DataFrame = inputDataFrame.withColumn("isRowBad", simplefun(cols))

  analysedDataFrame.show
}

Please get back to me if you face any issues. I believe this solution can be appropriate since you seem to look for a code with usage of dataframe.

Thanks.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.