用java写的一个简单的spark程序,经过本地运行和集群运行例子。java
1 在eclipse下建一个maven工程sql
配置pom.xmlapache
配置文件参考下面:api
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.spark</groupId> <artifactId>SparkTest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SparkTest</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.3.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/main/test</testSourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>cn.spark.sparktest.App</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
配置好后eclipse会自动从远端资源库中进行下载app
2 编写spark程序eclipse
程序详细以下:maven
package org.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 用java语言开发spark程序 * 第一个学习程序 wordcount * @author 18521 * */ public class wordCountLocal { public static void main(String[] args) { // TODO Auto-generated method stub // 1 建立一个sparkconf 对象并配置 // 使用setMaster 能够设置spark集群能够连接集群的URL,若是设置local 表明在本地运行而不是在集群运行 SparkConf conf = new SparkConf() .setAppName("wordCountLocal") .setMaster("local"); // 2 建立javasparkContext对象 // sparkcontext 是一个入口,主要做用就是初始化spark应用程序所需的一些核心组件,例如调度器,task, // 还会注册spark,sparkMaster结点上注册。反正就是spake应用中最重要的对象 JavaSparkContext sc = new JavaSparkContext(conf); // 3 对输入源建立一个出事RDD // 元素就是输入源文件中的一行 JavaRDD<String> lines = sc.textFile("D://worksoft//testdata//spark.txt"); // 4 把输入源拆分红一个一个的单词 // 引用一个RDD 都会建立一个function 类(比较简单的话就是一个匿名内部类) // FlatMapFunction 有连个参数输入和输出 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String arg0) throws Exception { // TODO Auto-generated method stub return Arrays.asList(arg0.split(" ")); } }); // 5 须要将每个单词映射为(单词,1) 后面才能够更具单词key 对后面value 1 进行累加从而达到计数的功能 JavaPairRDD<String, Integer> parirs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * 每个单词都映射成(单词,1) */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(arg0, 1); } }); // 6 以单词作为key 统计单词出现的次数,用reducebykey 算子,对每个key对于的value进行操做 JavaPairRDD<String,Integer> wordcount = parirs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer arg0, Integer arg1) throws Exception { // TODO Auto-generated method stub return arg0+arg1; } }); // 7 已经经过spark 的几个算子 flatMap,mapToPair,reduceByKey 已经统计出每个结点中的单词出现的次数 // 这中操做叫作transformation,可是在一开始的RDD是把文件拆分打散到不一样的结点中的,因此后面还须要操做action 进行集合 // 9 action 操做经过foreach 来遍历全部最后一个RDD生成的元素 wordcount.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0._1+" 出现了:"+arg0._2+"次"); } }); sc.close(); } }
3 本地测试ide
4 集群运行oop
4.1 spark程序修改学习
4.2 测试文件上传到hdfs
[root@spark1 opt]# hadoop fs -put spark.txt /spark.txt
[root@spark1 opt]# hadoop fs -ls / 17/05/27 11:51:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 3 root supergroup 171073915 2017-05-27 10:32 /spark.txt drwxr-xr-x - root supergroup 0 2017-05-23 15:40 /user
4.3 程序打包
4.4 上传打包程序并写启动脚本
编写启动脚本
[root@spark1 java]# cat wordcount.sh /opt/spark/bin/spark-submit \ # 用这个命令启动 --class org.spark.study.core.wordCountSpark \ # 配置类名 --num-executors 3 \ # 配置在三个结点上运行 --driver-memory 100m \ # drive内存 --executor-memory 100m \ # 配置execute内存 --executor-cores 3 \ # 内核运行单元数 /opt/spark-study/java/study-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ # 运行的jar包
4.5 运行启动脚本进行测试
[root@spark1 java]# ./wordcount.sh >> spark.log [root@spark1 java]# cat spark.log integration 出现了:89100次 Hadoop��s 出现了:89100次 general 出现了:89100次 have 出现了:267300次 Million 出现了:89100次 here 出现了:89100次 big 出现了:89100次 stack. 出现了:89100次 modification 出现了:89100次 meili 出现了:267300次 conference. 出现了:89100次 we 出现了:178200次 requiring 出现了:89100次 conv 出现了:297次 simple 出现了:89100次 This 出现了:89100次 Joel 出现了:89118次 send 出现了:89118次 (HDFS) 出现了:89100次 without 出现了:178200次 ……