http://spark.apache.org/docs/latest/quick-start.html| 0f41ff82e270667d9fadbc467533cee31 |html
➜ spark-1.4.0-bin-hadoop2.6 ./bin/spark-shell
java
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25) 15/06/28 10:36:07 INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040 15/06/28 10:36:07 INFO repl.SparkILoop: Created spark context.. Spark context available as sc. 15/06/28 10:36:08 INFO hive.HiveContext: Initializing execution hive, version 0.13.1 15/06/28 10:36:23 INFO repl.SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext.
第一个例子: 统计一个文本文件的单词数量.
调用sc的textFile(fileName)会生成一个MapPartitionsRDDmysql
scala> val textFile = sc.textFile("README.md") 15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(63424) called with curMem=0, maxMem=278019440 15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.9 KB, free 265.1 MB) 15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(20061) called with curMem=63424, maxMem=278019440 15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.6 KB, free 265.1 MB) 15/06/28 10:36:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58638 (size: 19.6 KB, free: 265.1 MB) 15/06/28 10:36:45 INFO spark.SparkContext: Created broadcast 0 from textFile at <console>:21 textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
调用上面生成的textFile RDD的count()会触发一个Action.es6
scala> textFile.count() java.net.ConnectException: Call From hadoop/127.0.0.1 to localhost:9000 failed on connection exception: java.net.ConnectException: 拒绝链接; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ... Caused by: java.net.ConnectException: 拒绝链接 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ...
因为本机已经安装了Hadoop,使用的是伪分布式模式,因此Spark会读取Hadoop的配置信息.
咱们这里先不启动Hadoop,使用本地模式,要手动添加file:///并使用绝对路径读取文本文件web
scala> textFile res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
从新构造读取本地文本文件的textFile RDDsql
scala> val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
触发RDD的Action: countshell
scala> textFile.count() 15/06/28 10:44:07 INFO scheduler.DAGScheduler: Job 0 finished: count at <console>:24, took 0.275609 s res2: Long = 98
又一个Action RDD : 输出文本文件的第一行apache
scala> textFile.first() 15/06/28 10:44:27 INFO scheduler.DAGScheduler: Job 1 finished: first at <console>:24, took 0.017917 s res3: String = # Apache Spark
1.统计包含了Spark这个单词一共有几行bootstrap
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23 scala> textFile.filter(line => line.contains("Spark")).count()
2.文本文件中长度最长的那一行,它一共有多少个单词api
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
3.MapReduce WordCount
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23 scala> wordCounts.collect() res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (systems.,1...
4.Cache
scala> linesWithSpark.cache() res7: linesWithSpark.type = MapPartitionsRDD[4] at filter at <console>:23 scala> linesWithSpark.count() 15/06/28 10:47:11 INFO scheduler.DAGScheduler: Job 5 finished: count at <console>:26, took 0.054036 s res8: Long = 19 scala> linesWithSpark.count() 15/06/28 10:47:14 INFO scheduler.DAGScheduler: Job 6 finished: count at <console>:26, took 0.016638 s res9: Long = 19
val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md") 〇 textFile.count() ① textFile.first() val linesWithSpark = textFile.filter(line => line.contains("Spark")) ② textFile.filter(line => line.contains("Spark")).count() textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) ③ val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) ④ wordCounts.collect() linesWithSpark.cache() ⑤ linesWithSpark.count() ⑥ linesWithSpark.count() ⑦ linesWithSpark.count()
http://127.0.0.1:4040
Jobs: 上面每一个Action RDD编号对应了下图中的Job Id.
Stages: 上面有8个Job, 可是Stages多了一个. 实际上是④的collect
有两个stage
Storage: 在Cache的时候才有
在Jobs中点击Job Id=4的collect RDD(输出WordCount的结果). 在下方的列表中能够看到有2个Stages
仔细观察列表的最后面两列, 分别是Shuffle Read和Shuffle Write.
其中map会进行Shuffle Write, collect会进行Shuffle Read
点击Stage Id=4的map. 它的DAG可视化图和上面的概览图的左侧是同样的
Spark的WebUI还提供了一个EventTime,能够很清楚地看到每一个阶段消耗的时间
回退,点击Stage Id=5的collect
准备工做:
1.master无密码ssh到slaves(将master的pub追加到全部slaves的authorized_keys) 2.关闭全部节点的防火墙(chkconfig iptables off) 3.安装scala-2.10,并设置~/.bashrc
cd $SPARK_HOME
vi conf/spark-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_51 export SCALA_HOME=/usr/install/scala-2.10.5 export HADOOP_HOME=/usr/install/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=dp0652 export MASTER=spark://dp0652:7077 #export SPARK_LOCAL_IP=dp0652 export SPARK_LOCAL_DIRS=/usr/install/spark-1.4.0-bin-hadoop2.6 export SPARK_MASTER_WEBUI_PORT=8082 export SPARK_MASTER_PORT=7077 export SPARK_WORKER_CORES=1 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_MEMORY=8g
vi conf/slaves
dp0652 dp0653 dp0655 dp0656 dp0657
将spark目录分发到集群的其余节点
cd .. scp -r $SPARK_HOME dp0653:/usr/install scp -r $SPARK_HOME dp0655:/usr/install scp -r $SPARK_HOME dp0656:/usr/install scp -r $SPARK_HOME dp0657:/usr/install
因为集群中dp0652和dp0653的内存比较大, 咱们修改了这两个节点的spark-env.sh
export SPARK_WORKER_INSTANCES=2 export SPARK_WORKER_MEMORY=20g
启动集群, 在master上启动便可.
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ sbin/start-all.sh starting org.apache.spark.deploy.master.Master, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.master.Master-1-dp0652.out dp0656: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0656.out dp0655: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0655.out dp0657: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0657.out dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0652.out dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0653.out dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0652.out dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0653.out
在master和slaves上查看Spark进程
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ jps -lm 40708 org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082 41095 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077 40926 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ ssh dp0653 Last login: Thu Jul 2 09:07:17 2015 from 192.168.6.140 [qihuang.zheng@dp0653 ~]$ jps -lm 27153 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077 27029 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 [qihuang.zheng@dp0653 ~]$ exit logout Connection to dp0653 closed. [qihuang.zheng@dp0652 logs]$ ssh dp0655 Last login: Thu Jul 2 08:55:05 2015 from 192.168.6.140 [qihuang.zheng@dp0655 ~]$ jps -lm 8766 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 [qihuang.zheng@dp0655 ~]$
在master上查看web ui: http://dp0652:8082/
1.若是配置了SPARK_LOCAL_IP, 可是并无在slaves上修改成本身的IP,则会报错:
15/07/02 09:04:08 ERROR netty.NettyTransport: failed to bind to /192.168.6.52:0, shutting down Netty transport Exception in thread "main" java.net.BindException: Failed to bind to: /192.168.6.52:0: Service 'sparkWorker' failed after 16 retries! at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/07/02 09:04:09 INFO util.Utils: Shutdown hook called
缘由分析: SPARK_LOCAL_IP指的是本机IP地址,所以分发到集群的不一样节点上,都要到各自的节点修改成本身的IP地址.
若是集群节点比较多,则比较麻烦, 能够用SPARK_LOCAL_DIRS代替.
2.若是没有配置export MASTER, 在worker上会报错:
5/07/02 08:40:51 INFO worker.Worker: Retrying connection to master (attempt # 12) 15/07/02 08:40:51 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master... 15/07/02 08:40:51 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@dp0652:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: 拒绝链接: dp0652/192.168.6.52:7077 15/07/02 08:41:23 ERROR worker.Worker: RECEIVED SIGNAL 15: SIGTERM 15/07/02 08:41:23 INFO util.Utils: Shutdown hook called
致使的后果是虽然slaves上都启动了Worker进程(使用jps查看),可是在Master上并无看到workers. 这时候应该查看Master上的日志.
master上启动成功显示的日志是spark@dp0652:7077. 而上面却显示的是sparkMaster@dp0652:7077. 因此应该手动export MASTER
3.最后成功启动集群, 在Master上的日志:
Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082 ======================================== 15/07/02 09:27:49 INFO master.Master: Registered signal handlers for [TERM, HUP, INT] 15/07/02 09:27:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 09:27:50 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng 15/07/02 09:27:50 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng 15/07/02 09:27:50 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng) 15/07/02 09:27:51 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/02 09:27:51 INFO Remoting: Starting remoting 15/07/02 09:27:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@dp0652:7077] 15/07/02 09:27:51 INFO util.Utils: Successfully started service 'sparkMaster' on port 7077. 15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@dp0652:6066 15/07/02 09:27:51 INFO util.Utils: Successfully started service on port 6066. 15/07/02 09:27:51 INFO rest.StandaloneRestServer: Started REST server for submitting applications on port 6066 15/07/02 09:27:51 INFO master.Master: Starting Spark master at spark://dp0652:7077 15/07/02 09:27:51 INFO master.Master: Running Spark version 1.4.0 15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8082 15/07/02 09:27:51 INFO util.Utils: Successfully started service 'MasterUI' on port 8082. 15/07/02 09:27:51 INFO ui.MasterWebUI: Started MasterWebUI at http://192.168.6.52:8082 15/07/02 09:27:52 INFO master.Master: I have been elected leader! New state: ALIVE 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.52:35398 with 1 cores, 20.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.56:60106 with 1 cores, 8.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.55:50995 with 1 cores, 8.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.57:34020 with 1 cores, 8.0 GB RAM 15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.52:55912 with 1 cores, 20.0 GB RAM 15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.53:35846 with 1 cores, 20.0 GB RAM
在53的其中一个Worker上的日志:
Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 ======================================== 15/07/02 09:27:52 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] 15/07/02 09:27:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 09:27:52 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng 15/07/02 09:27:52 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng 15/07/02 09:27:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng) 15/07/02 09:27:53 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/02 09:27:53 INFO Remoting: Starting remoting 15/07/02 09:27:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@192.168.6.53:55994] 15/07/02 09:27:54 INFO util.Utils: Successfully started service 'sparkWorker' on port 55994. 15/07/02 09:27:54 INFO worker.Worker: Starting Spark worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM 15/07/02 09:27:54 INFO worker.Worker: Running Spark version 1.4.0 15/07/02 09:27:54 INFO worker.Worker: Spark home: /usr/install/spark-1.4.0-bin-hadoop2.6 15/07/02 09:27:54 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/02 09:27:54 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 15/07/02 09:27:54 INFO util.Utils: Successfully started service 'WorkerUI' on port 8081. 15/07/02 09:27:54 INFO ui.WorkerWebUI: Started WorkerWebUI at http://192.168.6.53:8081 15/07/02 09:27:54 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master... 15/07/02 09:27:54 INFO worker.Worker: Successfully registered with master spark://dp0652:7077
[qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-shell --master spark://dp0652:7077 --executor-memory 4g [qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077 --class org.apache.spark.examples.SparkPi --executor-memory 4g --total-executor-cores 2 lib/spark-examples-1.4.0-hadoop2.6.0.jar 1000
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("/user/qihuang.zheng/sparktest/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19) // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.saveAsParquetFile("/user/qihuang.zheng/sparktest/people.parquet") people.write.parquet("/user/qihuang.zheng/sparktest/people2.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("/user/qihuang.zheng/sparktest/people.parquet") // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // JOIN TABLE val jointbls = sqlContext.sql("SELECT people.name FROM people join parquetFile where people.name=parquetFile.name") jointbls.map(t => "Name: " + t(0)).collect().foreach(println)
若是在执行cache时内存不足,会退出当前shell,解决办法是在spark-shell命令前添加SPARK_SUBMIT_OPTS="-XX:MaxPermSize=1g"
若是没有编译hive on spark,而是直接把hive-site.xml分发到spark集群的conf目录下,直接启动spark-sql会报错:
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-sql Exception in thread "main" java.lang.RuntimeException: java.io.IOException: 权限不够 at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:330) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:109) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.IOException: 权限不够 at java.io.UnixFileSystem.createFileExclusively(Native Method) at java.io.File.createNewFile(File.java:1006) at java.io.File.createTempFile(File.java:1989) at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(SessionState.java:432) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:328) ... 11 more 15/07/03 08:42:33 INFO util.Utils: Shutdown hook called 15/07/03 08:42:33 INFO util.Utils: Deleting directory /tmp/spark-831ff199-cf80-4d49-a22f-824736065289
这是由于Spark集群的每一个Worker都须要Hive的支持,而Worker节点并无都安装了hive. 并且spark须要编译支持hive的包.
可是从新编译hive on spark要花不少时间,可不能够直接使用集群中已经安装好的hive呢? YES!!
http://lxw1234.com/archives/2015/06/294.htm| 0f41ff82e270667d9fadbc467533cee318 |
http://shiyanjun.cn/archives/1113.html| 0f41ff82e270667d9fadbc467533cee320 |
http://www.cnblogs.com/hseagle/p/3758922.html| 0f41ff82e270667d9fadbc467533cee322 |
1.在spark-env.sh中添加
export HIVE_HOME=/usr/install/apache-hive-0.13.1-bin export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.34.jar:$SPARK_CLASSPATH
2.将apache-hive-0.13.1-bin
分发到集群中的每一个节点(SparkWorker所在的节点)
cd install scp -r apache-hive-0.13.1-bin dp0653:/usr/install/
3.拷贝apache-hive-0.13.1-bin/conf/hive-site.xml到$SPARK_HOME/conf下
scp apache-hive-0.13.1-bin/conf/hive-site.xml dp0653:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
4.重启spark集群
sbin/stop-all.sh sbin/start-all.sh
5.测试spark-sql
SPARK_CLASSPATH was detected (set to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath 15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around. 15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around. 15/07/03 10:01:00 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.6.53:9083 15/07/03 10:01:00 INFO hive.metastore: Connected to metastore. 15/07/03 10:01:00 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr. SET spark.sql.hive.version=0.13.1 SET spark.sql.hive.version=0.13.1 spark-sql> show databases; default test spark-sql> use test; Time taken: 2.045 seconds spark-sql> show tables; koudai false ... spark-sql> select count(*) from koudai; 311839 Time taken: 12.443 seconds, Fetched 1 row(s) spark-sql>
在作指标分析时,若是是天天或者每周这样的统计间隔,能够将分析后的结果保存成Persistent Table或者save到HDFS上供别人使用.
线上的数据通常都比较多,查询时能够使用partition分区