2

I have a TSV file, where the first line is the header. I want to create a JavaPairRDD from this file. Currently, I'm doing so with the following code:

TsvParser tsvParser = new TsvParser(new TsvParserSettings());
List<String[]> allRows;
List<String> headerRow;
try (BufferedReader reader = new BufferedReader(new FileReader(myFile))) {
        allRows = tsvParser.parseAll((reader));
        //Removes the header row
        headerRow = Arrays.asList(allRows.remove(0));
    }
JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext
            .parallelize(allRows)
            .mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row)));

I was wondering if there was a way to have the javaSparkContext read and process the file directly instead of splitting the operation into two parts.

EDIT: This is not a duplicate of How do I convert csv file to rdd, because I'm looking for an answer in Java, not Scala.

2
  • Possible duplicate of How do I convert csv file to rdd Commented Aug 3, 2016 at 19:50
  • Its not a duplicate, because I'm looking for an answer in Java, not Scala. Commented Aug 3, 2016 at 21:04

4 Answers 4

3

use https://github.com/databricks/spark-csv

import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("delimiter","\t")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");
Sign up to request clarification or add additional context in comments.

4 Comments

I looked into it, and I don't like that there's no simple way to convert a Dataframe to a JavaRDD of custom objects. You have to parse each row manually.
I'm curious why you would want an RDD over a dataframe in the instance of unique objects requirement. @alexgbelov
The rest of my logic uses RDD's; I just needed a way to read in the data as cleanly as possible.
spark 2x implement the function. You don't need spark-csv anymore
1

Try below code to read CSV file and create JavaPairRDD.

public class SparkCSVReader {

public static void main(String[] args) {

    SparkConf conf = new SparkConf().setAppName("CSV Reader");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> allRows = sc.textFile("c:\\temp\\test.csv");//read csv file
    String header = allRows.first();//take out header
    JavaRDD<String> filteredRows = allRows.filter(row -> !row.equals(header));//filter header
    JavaPairRDD<String, MyCSVFile> filteredRowsPairRDD = filteredRows.mapToPair(parseCSVFile);//create pair
    filteredRowsPairRDD.foreach(data -> {
        System.out.println(data._1() + " ### " + data._2().toString());// print row and object
    });
    sc.stop();
    sc.close();
}

private static PairFunction<String, String, MyCSVFile> parseCSVFile = (row) -> {
    String[] fields = row.split(",");
    return new Tuple2<String, MyCSVFile>(row, new MyCSVFile(fields[0], fields[1], fields[2]));
};

}

You can also use Databricks spark-csv (https://github.com/databricks/spark-csv). spark-csv is also included in Spark 2.0.0.

1 Comment

Thank you. I looked into Databricks spark-csv, but I didn't like it since you'd have to manually parse each row, which I'm already doing anyway.
1

Apache Spark 2.x have built-in csv reader so you don't have to use https://github.com/databricks/spark-csv

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 *
 * @author cpu11453local
 */
public class Main {
    public static void main(String[] args) {


        SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("meowingful")
                .getOrCreate();

        Dataset<Row> df = spark.read()
                    .option("header", "true")
                    .option("delimiter","\t")
                    .csv("hdfs://127.0.0.1:9000/data/meow_data.csv");

        df.show();
    }
}

And maven file pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.meow.meowingful</groupId>
    <artifactId>meowingful</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>

</project>

Comments

0

I'm the author of uniVocity-parsers and can't help you much with spark, but I believe something like this can work for you:

parserSettings.setHeaderExtractionEnabled(true); //captures the header row

parserSettings.setProcessor(new AbstractRowProcessor(){
        @Override
        public void rowProcessed(String[] row, ParsingContext context) {
            String[] headers = context.headers() //not sure if you need them
            JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext
                    .mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row)));
            //process your stuff.
        }
    });

If you want to paralellize the processing of each row, you can wrap a ConcurrentRowProcessor:

parserSettings.setProcessor(new ConcurrentRowProcessor(new AbstractRowProcessor(){
        @Override
        public void rowProcessed(String[] row, ParsingContext context) {
            String[] headers = context.headers() //not sure if you need them
            JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext
                    .mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row)));
            //process your stuff.
        }
    }, 1000)); //1000 rows loaded in memory.

Then just call to parse:

new TsvParser(parserSettings).parse(myFile);

Hope this helps!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.