0

My next question is not new but I want to understand how to make it step by step.

In Spark application I create DataFrame. Lets call it df. Version of Spark: 2.4.0

val df: DataFrame  = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

How create .csv file from this DataFrame and put csv file to specific folder in server?

For example is this code correct? I notice that some people use coalesce or repartition for this task. But I don't understand which one would be better in my case.

union.write
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/home/reports/")

When I try to use next code it raise ERROR:

org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/home/reports/_temporary/0":hdfs:hdfs:drwxr-xr-x 

I run Spark application as root user. reports folder created by root user with next command:

mkdir -m 777 reports

It seems like only hdfs user can write file.

15
  • 1
    Possible duplicate of Write single CSV file using spark-csv Commented Jan 19, 2019 at 19:49
  • 2
    In Spark 2.* you can use syntax dataFrame.option("header", "true").csv("/home/reports/"). But you should not mix up local file system and HDFS. The path you specified is path on HDFS. Your user don't have permissions for writing to that location. If you want to write to a local directory on a server you should collect the data first using val dataToSave = dataFrame.collect(). If you do that, all the data related to the DataFrame will go to the Spark master node. So, make sure you have enough memory for that. After that you will be able to save you data using standard Scala/Java IO API. Commented Jan 20, 2019 at 8:34
  • 1
    No, that is not what I mean. You can use dataToSave.write.csv(path) for saving data to HDFS or S3. If you want to save your DataFrame to some location on a server. You should do val dataToSave = dataFrame.collect(). After that the data is contained in memory on your Spark master node. Then you can use PrintWriter or FileWriter to save the data. Commented Jan 20, 2019 at 9:31
  • 2
    If you decide to use collect method on the DataFrame, you will get Array[Row]. After that you can do val linesToSave: Array[String] = dataToSave.map(_.toSeq.mkString(";")). Also, you can check au.com.bytecode.opencsv.CSVWriter for creating csv files from Array[Row]. Commented Jan 20, 2019 at 10:32
  • 2
    Actually, the simplest way is to use approach which is described in the page from the first comment. You can write your file to some temporary location on HDFS (e.g.: /tmp) and after the job is complete just copy/move it to a local directory. Commented Jan 20, 2019 at 10:36

1 Answer 1

2

I believe you are confused about the way Spark behaves, I would recommend you to read the official documentation and / or some tutorial first.
Nevertheless, I hope this answers your question.

This code will save a DataFrame as a SINGLE CSV File on a local filesystem.
It was tested with Spark 2.4.0 with Scala 2.12.8 on an Ubuntu 18.04 laptop.

import org.apache.spark.sql.SparkSession

val spark =
  SparkSession
    .builder
    .master("local[*]")
    .appName("CSV Writter Test")
    .getOrCreate()
import spark.implicits._

val df =
  Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
  ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df.printSchema
// root
//  |-- NAME: string (nullable = true)
//  |-- START_DATE: string (nullable = true)
//  |-- END_DATE: string (nullable = true)
//  |-- STATUS: string (nullable = true)

df.coalesce(numPartitions = 1)
  .write
  .option(key = "header", value = "true")
  .option(key = "sep", value = ",")
  .option(key = "encoding", value = "UTF-8")
  .option(key = "compresion", value = "none")
  .mode(saveMode = "OVERWRITE")
  .csv(path = "file:///home/balmungsan/dailyReport/") // Change the path. Note there are 3 /, the first two are for the file protocol, the third one is for the root folder.

spark.stop()

Now, let's check the saved file.

balmungsan@BalmungSan:dailyReport $ pwd
/home/balmungsan/dailyReport

balmungsan@BalmungSan:dailyReport $ ls
part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv  _SUCCESS

balmungsan@BalmungSan:dailyReport $ cat part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv 
NAME,START_DATE,END_DATE,STATUS
Alex,2018-01-01 00:00:00,2018-02-01 00:00:00,OUT
Bob,2018-02-01 00:00:00,2018-02-05 00:00:00,IN
Mark,2018-02-01 00:00:00,2018-03-01 00:00:00,IN
Mark,2018-05-01 00:00:00,2018-08-01 00:00:00,OUT
Meggy,2018-02-01 00:00:00,2018-02-01 00:00:00,OUT

The _SUCCESS file exists to signal that the writing succeed.

Important notes:

  • You need to specify the file:// protocol to save to a local filesystem, instead of in HDFS.
  • The path specifies the name of the folder to save the partitions of the file, not the name of the file, inside that folder there will one file per partition. If you want to read such file again with Spark, then you only need to specify the folder, Spark will understand the partition files. If not, I would recommend rename the file after - as far as I know, there is no way to control the name from Spark.
  • If the df is too big to fit in the memory of just one node, the job will fail.
  • If you run this on a distributed way (e.g. with master yarn), then the file will not be saved in the master node, but in one of the slave nodes. If you really need it to be in the master node, then you may collect it and write it with normal Scala as Dmitry suggested.
Sign up to request clarification or add additional context in comments.

2 Comments

Hello! Thank for your detailed answer. It's really a problem that Spark can't change the name of csv file. In the future DataFrame can be big. So you advice me use more memory or more nodes, right? Could you describe your last step about collection more detaily please? Dmitry adviced to use au.com.bytecode.opencsv.CSVWriter library to create csv file from Array[Row] but I still can't find any good example.
If the DataFrame is really big I would suggest using more nodes (Horizontal Scaling is always better than Vertical). Again, if the DataFrame is big, then collect may just crash the App, also it even does not make sense to write to a single file. (unless the final result is just a small report). The reason why Spark cannot change the name, is because, as stated above since there will usually be multiple part files, then the folder becomes the file identifier in the distributed filesystem. If you want, you can invite me to a SO chat to discuss more about your use case.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.