0

I made two different codes on Eclipse Oxygen ide. In the former one, master of running Spark is local.

public class JavaClientLocal {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf sc = new SparkConf().setAppName("SparkTest").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(sc);
        System.out.println(jsc.master() + " : " + jsc.version());

        JavaRDD<String> textFile = jsc.parallelize(Arrays.asList("spark rdd example", "sample example", "sit on spark"));
        System.out.println(textFile.collect());


        JavaRDD<String> words = textFile.flatMap((String str) -> Arrays.asList(str.split(" ")).iterator());

        JavaPairRDD<String, Integer> wcPair = words.mapToPair((String t) -> new Tuple2(t,1));

        JavaPairRDD<String, Integer> result = wcPair.reduceByKey((Integer c1,Integer c2) -> c1 + c2);
        System.out.println(result.collect());
    }

}

The above codes are executed without exception. System.out.println(result.collect) show the right values. However the below codes throws the exception.

public class JavaClientYarn {

    private static final String srcDir = "/home/m_usr/sparkData/";
    private static final String srcFile = "data.txt";
    private static final String dstSrc = "hdfs://master:9000/user/m_usr/data.txt";
    private static final String dstFile = "hdfs://master:9000/user/m_usr/result.txt";

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        SparkConf sc = new SparkConf().setAppName("SparkTest").setMaster("yarn-client");
        JavaSparkContext jsc = new JavaSparkContext(sc);
        System.out.println(jsc.master() + " : " + jsc.version());

        Path srcPath = new Path(srcDir + srcFile);
        Path dstPath = new Path(dstSrc);

        FileSystem fs = FileSystem.get(URI.create(dstSrc), new Configuration());
        fs.copyFromLocalFile(srcPath, dstPath);

        JavaRDD<String> textFile = jsc.textFile(dstSrc);
        System.out.println(textFile.collect());

        JavaRDD<String> words = textFile.flatMap((String str) -> Arrays.asList(str.split(" ")).iterator());

        JavaPairRDD<String, Integer> wcPair = words.mapToPair((String t) -> new Tuple2(t,1));

        JavaPairRDD<String, Integer> result = wcPair.reduceByKey((Integer c1,Integer c2) -> c1 + c2);
        System.out.println(result.collect());

        jsc.stop();
    }
}

The exceptions are thrown like below

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function.FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

The difference between 2 codes is master. The master of the former codes is local and the master of latter codes is yarn-client. But the latter code do not recognize the Java 8 lambda expression and bring error. I have no idea what configuration of spark yarn bring the errors.

Update

Let me show my configuration of Hadoop 2.7.4 and Spark 2.2

$ vi .bashrc

############ Eclipse PATH ###########
export ECLIPSE_HOME=./eclipse
export PATH=$PATH:$ECLIPSE_HOME

######### JDK8 PATH ############
JAVA_HOME=/usr/java/jdk1.8.0_131
CLASSPATH=.:$JAVA_HOME/lib/tools.jar
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin
export JAVA_HOME CLASSPATH
export PATH

############ Hadoop PATH ###########
export HADOOP_HOME=/home/m_usr/hadoop-2.7.4
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export PATH=$PATH:/usr/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin
export HADOOP_PID_DIR=/home/m_usr/hadoop-2.7.4/pids
export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native:$JAVA_LIBRARY_PATH
export YARN_HOME=$HADOOP_HOME
export PATH=$PATH:$YARN_HOME

############ Spark Path ############
export SPARK_HOME=/home/m_usr/spark-2.2.0-bin-hadoop2.7
export SPARK_SUBMIT=/home/m_usr/spark-2.2.0-bin-hadoop2.7/bin/spark-submit
export PATH=$PATH:$SPARK_HOME/bin
export PATH=$PATH:$SPARK_HOME/sbin

$ vi spark-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_131
export HADOOP_HOME=/home/m_usr/hadoop-2.7.4
export SPARK_HOME=/home/m_usr/spark-2.2.0-bin-hadoop2.7
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

$ vi spark-defaults.conf

spark.master                     spark://master:7077

spark.eventLog.enabled           true
spark.eventLog.dir               file:///home/m_usr/spark-2.2.0-bin-hadoop2.7/sparkeventlogs
spark.serializer                 org.apache.spark.serializer.KryoSerializer  
spark.driver.memory              5g
spark.yarn.am.memory             1g
spark.executor.instances           2


spark.executor.extraJavaOptions        -Dlog4j.configuration=file:/home/m_usr/spark-2.2.0-bin-hadoop2.7/conf/log4j.properties

spark.driver.extraJavaOptions        -Dlog4j.configuration=file:/home/m_usr/spark-2.2.0-bin-hadoop2.7/conf/log4j.properties

$ pwd /home/jhwang/hadoop-2.7.4/etc/hadoop

$ vi yarn-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_131
export HADOOP_HOME=/home/m_usr/hadoop-2.7.4
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

$ vi core-site.xml

<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://master:9000</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/home/m_usr/hadoop-2.7.4/tmp</value>
  </property>
</configuration>

$ vi hdfs-site.xml

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
  <property>
    <name>dfs.permissions</name>
    <value>false</value>
  </property>
</configuration>

$ vi mapred-site.xml

<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

$ vi yarn-site.xml

<configuration>
<!-- Site specific YARN configuration properties -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
   <property>
     <name>yarn.log-aggregation-enable</name>
     <value>true</value>
  </property>
  <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
  </property>
</configuration>

And this site is Eclipse IDE configuration of Hadoop and Spark. Do I miss any procedure of Hadoop and Spark setting on Centos 7?

1 Answer 1

0

I copy-pasted your code and was able to execute the same without any issues. That points us to some setup issue rather than Lambda code issue. I inspected your Lambda code, and that is just fine.

Below is a quick explanation on yarn-client mode.

Yarn client is much like Spark client mode, except that the executors are running in Yarn containers managed by the Yarn resource manager on the cluster instead of as Spark workers managed by the Spark master.

The driver executes as a local client in your local JVM. It communicates with the workers on the cluster.

Transformations are scheduled on the cluster by the driver's logic. Actions involve communication between local driver and remote cluster executors. So, there is some additional network overhead, especially if the driver is not co-located on the cluster.

In yarn-cluster mode -- in contrast, the driver is executed as a thread in a Yarn application master on the cluster.

Hope that helps!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.