I was trying a hands on the spark dataframes. With previous knowledge from Cascading Framework which has a trap mechanism to filter out faulty rows (rows with null values) into a separate Tap called Trap. Those who are unaware let me make that clear. When you get a faulty row which has been read from a text file. The framework either scraps out the bad row from the entire data or stops the execution. Now in apache spark, I observed that the bad rows didn't hinder the execution. That is good but when it comes to getting business insights from data, Quality of the data Does matter!
So, I have a text file with bunch of rows in it (you may pick up any dataset, you like to), in which few records do contain null values. Now I load the text file into a Dataframe with spark.read.csv. Now, what I want to do is analyze the Dataframe and dynamically create a column named "isMyRowBad" where the logic will analyze each rows at a time and if the logic finds out the row which has a null value, it flags the isMyRowBad column on that particular row as true and the columns which do not have null values, the corresponding column isMyRowBad should have false for that purticular row which is clean.
Giving you the overview of the incoming and outgoing datasets
INCOMING DATAFRAME
fname,lname,age
will,smith,40
Dwayne,Nunn,36
Aniruddha,Sinha,
Maria,,22
OUTGOING DATAFRAME
fname,lname,age,isMyRowBad
will,smith,40,false
Dwayne,Nunn,36,false
Aniruddha,Sinha,,true
Maria,,22,true
The above method to classify good and bad rows might seem a little foolish but it does make sense since I will not need to run filter operation multiple times. let us take a look, how?
Suppose I have a Dataframe named inDf as inputDf and AnalysedDf:(DataFrame,DataFrame) as output Df Tuple
Now, I did try this part of code
val analyzedDf: (DataFrame, DataFrame) = (inputDf.filter(_.anyNull),inputDf.filter(!_.anyNull))
This code segregates good and bad rows. I agree! but this has a performance setback as filter runs two times which means filter will iterate all over the dataset for two times!( you may counter this point if you feel running filter two times does make sense when considering 50 fields and atleast 584000 rows ( that is 250 mb of data)!)
and this as well
val analyzedDf: DataFrame = inputDf.select("*").withColumn("isMyRowBad", <this point, I am not able to analyze row>
The above snippet shows where I am not able to figure out how to sweep the entire row and mark the row as bad with a boolean value.
Hope, you all got to understand what am I aiming to achieve. Please ignore the syntactical errors if you find in the snippets since I typed them here right away(will correct the same with future edits)
Please give me a hint(a little code snippet or a pseudo code will be enough) on how to proceed with the challenge. Please reach out to me if you didn't understand what I intend to do.
Any help will be greatly appreciated. Thanks in advance!
P.S: There are brilliant people out here on BigData/spark/hadoop/scala etc. Request you to kindly correct me on any point which I might have wrongly written(conceptually)
The below code give me a solution by the way. Please have a look
package aniruddha.data.quality
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
/**
* Created by aniruddha on 8/4/17.
*/
object DataQualityCheck extends App {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val schema: StructType = StructType(List(
StructField("fname", StringType, nullable = true),
StructField("lname", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("pan", StringType, nullable = true),
StructField("married", StringType, nullable = true)
))
val inputDataFrame: DataFrame = spark
.read
.schema(schema)
.option("header",true)
.option("delimiter",",")
.csv("inputData/infile")
//inputDataFrame.show()
val analysedDataFrame: DataFrame = inputDataFrame.select("*").withColumn("isRowBad", when($"pan".isNull||$"lname".isNull||$"married".isNull,true).otherwise(false))
analysedDataFrame show
}
input
fname,lname,age,pan,married
aniruddha,sinha,23,0AA22,no
balajee,venkatesh,23,0b96,no
warren,shannon,72,,
wes,borland,63,0b22,yes
Rohan,,32,0a96,no
james,bond,66,007,no
output
+---------+---------+---+-----+-------+--------+
| fname| lname|age| pan|married|isRowBad|
+---------+---------+---+-----+-------+--------+
|aniruddha| sinha| 23|0AA22| no| false|
| balajee|venkatesh| 23| 0b96| no| false|
| warren| shannon| 72| null| null| true|
| wes| borland| 63| 0b22| yes| false|
| Rohan| null| 32| 0a96| no| true|
| james| bond| 66| 007| no| false|
+---------+---------+---+-----+-------+--------+
The code works fine but I have a problem with the when function. Can't we just select all the columns without hardcoding it?