4

I am writing a spark job, trying to read a text file using scala, the following works fine on my local machine.

  val myFile = "myLocalPath/myFile.csv"
  for (line <- Source.fromFile(myFile).getLines()) {
    val data = line.split(",")
    myHashMap.put(data(0), data(1).toDouble)
  }

Then I tried to make it work on AWS, I did the following, but it didn't seem to read the entire file properly. What should be the proper way to read such text file on s3? Thanks a lot!

val credentials = new BasicAWSCredentials("myKey", "mySecretKey");
val s3Client = new AmazonS3Client(credentials);
val s3Object = s3Client.getObject(new GetObjectRequest("myBucket", "myFile.csv"));

val reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()));

var line = ""
while ((line = reader.readLine()) != null) {
      val data = line.split(",")
      myHashMap.put(data(0), data(1).toDouble)
      println(line);
}

3 Answers 3

1

I think I got it work like below:

    val s3Object= s3Client.getObject(new GetObjectRequest("myBucket", "myPath/myFile.csv"));

    val myData = Source.fromInputStream(s3Object.getObjectContent()).getLines()
    for (line <- myData) {
        val data = line.split(",")
        myMap.put(data(0), data(1).toDouble)
    }

    println(" my map : " + myMap.toString())
Sign up to request clarification or add additional context in comments.

Comments

0

Read in csv-file with sc.textFile("s3://myBucket/myFile.csv"). That will give you an RDD[String]. Get that into a map

val myHashMap = data.collect
                    .map(line => {
                      val substrings = line.split(" ")
                      (substrings(0), substrings(1).toDouble)})
                    .toMap

You can the use sc.broadcast to broadcast your map, so that it is readily available on all your worker nodes.

(Note that you can of course also use the Databricks "spark-csv" package to read in the csv-file if you prefer.)

5 Comments

My utility function needs myHashMap. so my code is like: output = input.map { t => myUtiltyFunction(myHashMap, t)} is it possible to avoid passing myHashMap to myUtiltiyFunction each time? Is there a way to use broadcast myHashMap and let the myUtitlityFunction know it directly? Thanks a lot!
Also, I didn't want to use sc.textFile("s3://myBucket/myFile.csv") because I want to make the code generic even when without spark context. Thanks.
You do realize that if you let your utility function read the map directly, and you use the utility function like you describe output = input.map { t => myUtiltyFunction(...)}, the map will be read and created for every single row of your input rdd. I really don't think you want that. If you broadcast the variable (using sc.broadcast) on the other hand, you read and create the map only once on you driver, and then all your workers have direct access to it. Why do you not want to pass the map to the utility function? That seems odd to me.
Are you sure the map is created per single row of the input rdd, not per task? The reason I don't want to pass the HashMap is: 1. cleaness of the code. 2. I want the same code to be used some other scenarios which reading the input data to the utility function is trivial.
If you use map and not mapPartition it will apply the utility function to each row and thus if your utility function is in charge of creating the map, it will be done for each row. If you use mapPartitions it will only create the map once per partition, but (depending on the size of your data, of course) that could still easily end up adding a significant overhead (io is never cheap). IMO, you should focus on writing code that is optimal for parallel processing (Spark) and be less concerned with other trivial (non-parallel) uses of the code.
0

This can be acheived even withoutout importing amazons3 libraries using SparkContext textfile. Use the below code

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val s3Login = "s3://AccessKey:Securitykey@Externalbucket"
val filePath = s3Login + "/Myfolder/myscv.csv"
for (line <- sc.textFile(filePath).collect())
{
    var data = line.split(",")
    var value1 = data(0)
    var value2 = data(1).toDouble
}

In the above code, sc.textFile will read the data from your file and store in the line RDD. It then split each line with , to a different RDD data inside the loop. Then you can access values from this RDD with the index.

3 Comments

This code returns the error "java.io.IOException: No FileSystem for scheme: s3"
Can you explain the answer, I am also getting a java.io.FileNotFoundException
Mitkash and ibaalf - Please share your code for me to debug. There could be some typo because this is perfectly working for me

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.