I made two different codes on Eclipse Oxygen ide. In the former one, master of running Spark is local.
public class JavaClientLocal {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf sc = new SparkConf().setAppName("SparkTest").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sc);
System.out.println(jsc.master() + " : " + jsc.version());
JavaRDD<String> textFile = jsc.parallelize(Arrays.asList("spark rdd example", "sample example", "sit on spark"));
System.out.println(textFile.collect());
JavaRDD<String> words = textFile.flatMap((String str) -> Arrays.asList(str.split(" ")).iterator());
JavaPairRDD<String, Integer> wcPair = words.mapToPair((String t) -> new Tuple2(t,1));
JavaPairRDD<String, Integer> result = wcPair.reduceByKey((Integer c1,Integer c2) -> c1 + c2);
System.out.println(result.collect());
}
}
The above codes are executed without exception. System.out.println(result.collect) show the right values. However the below codes throws the exception.
public class JavaClientYarn {
private static final String srcDir = "/home/m_usr/sparkData/";
private static final String srcFile = "data.txt";
private static final String dstSrc = "hdfs://master:9000/user/m_usr/data.txt";
private static final String dstFile = "hdfs://master:9000/user/m_usr/result.txt";
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
SparkConf sc = new SparkConf().setAppName("SparkTest").setMaster("yarn-client");
JavaSparkContext jsc = new JavaSparkContext(sc);
System.out.println(jsc.master() + " : " + jsc.version());
Path srcPath = new Path(srcDir + srcFile);
Path dstPath = new Path(dstSrc);
FileSystem fs = FileSystem.get(URI.create(dstSrc), new Configuration());
fs.copyFromLocalFile(srcPath, dstPath);
JavaRDD<String> textFile = jsc.textFile(dstSrc);
System.out.println(textFile.collect());
JavaRDD<String> words = textFile.flatMap((String str) -> Arrays.asList(str.split(" ")).iterator());
JavaPairRDD<String, Integer> wcPair = words.mapToPair((String t) -> new Tuple2(t,1));
JavaPairRDD<String, Integer> result = wcPair.reduceByKey((Integer c1,Integer c2) -> c1 + c2);
System.out.println(result.collect());
jsc.stop();
}
}
The exceptions are thrown like below
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function.FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
The difference between 2 codes is master. The master of the former codes is local and the master of latter codes is yarn-client. But the latter code do not recognize the Java 8 lambda expression and bring error. I have no idea what configuration of spark yarn bring the errors.
Update
Let me show my configuration of Hadoop 2.7.4 and Spark 2.2
$ vi .bashrc
############ Eclipse PATH ###########
export ECLIPSE_HOME=./eclipse
export PATH=$PATH:$ECLIPSE_HOME
######### JDK8 PATH ############
JAVA_HOME=/usr/java/jdk1.8.0_131
CLASSPATH=.:$JAVA_HOME/lib/tools.jar
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin
export JAVA_HOME CLASSPATH
export PATH
############ Hadoop PATH ###########
export HADOOP_HOME=/home/m_usr/hadoop-2.7.4
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export PATH=$PATH:/usr/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin
export HADOOP_PID_DIR=/home/m_usr/hadoop-2.7.4/pids
export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native:$JAVA_LIBRARY_PATH
export YARN_HOME=$HADOOP_HOME
export PATH=$PATH:$YARN_HOME
############ Spark Path ############
export SPARK_HOME=/home/m_usr/spark-2.2.0-bin-hadoop2.7
export SPARK_SUBMIT=/home/m_usr/spark-2.2.0-bin-hadoop2.7/bin/spark-submit
export PATH=$PATH:$SPARK_HOME/bin
export PATH=$PATH:$SPARK_HOME/sbin
$ vi spark-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_131
export HADOOP_HOME=/home/m_usr/hadoop-2.7.4
export SPARK_HOME=/home/m_usr/spark-2.2.0-bin-hadoop2.7
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
$ vi spark-defaults.conf
spark.master spark://master:7077
spark.eventLog.enabled true
spark.eventLog.dir file:///home/m_usr/spark-2.2.0-bin-hadoop2.7/sparkeventlogs
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 5g
spark.yarn.am.memory 1g
spark.executor.instances 2
spark.executor.extraJavaOptions -Dlog4j.configuration=file:/home/m_usr/spark-2.2.0-bin-hadoop2.7/conf/log4j.properties
spark.driver.extraJavaOptions -Dlog4j.configuration=file:/home/m_usr/spark-2.2.0-bin-hadoop2.7/conf/log4j.properties
$ pwd /home/jhwang/hadoop-2.7.4/etc/hadoop
$ vi yarn-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_131
export HADOOP_HOME=/home/m_usr/hadoop-2.7.4
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
$ vi core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/m_usr/hadoop-2.7.4/tmp</value>
</property>
</configuration>
$ vi hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
$ vi mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
$ vi yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>
And this site is Eclipse IDE configuration of Hadoop and Spark. Do I miss any procedure of Hadoop and Spark setting on Centos 7?