spark读取sequenceFile文件

先用hive建立一张存储为sequenceFile文件的表java

create table test(id string,time string) row format delimited fields terminated by '\t';

load data local inpath '/home/hadoop/data/order_seq.txt' into table test;
//须要借助一张临时表,由于不能直接加载数据到存储为sequencefile 的表里
create table order_seq  stored  as sequencefile as select * from test;

hive (g6)> select  * from order_seq ;
OK
id      time
107030111111    2014-05-01 06:01:01.334+01
107030222222    2014-05-01 07:01:01.334+01
107030333333    2014-05-01 08:01:01.334+01
107030444444    2014-05-01 09:01:01.334+01
107030555555    2014-05-01 10:01:01.334+01
Time taken: 0.082 seconds, Fetched: 5 row(s)

启动spark-shell,读取sequenceFile文件–报错node

// ./spark-shell --master local[2]  启动后操做以下:

scala> val b = sc.sequenceFile[org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text]("hdfs://hadoop001:9000//user/hive/warehouse/g6.db/test_seq/000000_0")
b: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)] = MapPartitionsRDD[7] at sequenceFile at <console>:26

scala> b.collect
19/06/05 17:23:08 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 5)
java.io.NotSerializableException: org.apache.hadoop.io.BytesWritable
Serialization stack:
        - object not serializable (class: org.apache.hadoop.io.BytesWritable, value: )
        - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
        - object (class scala.Tuple2, (,22))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 2)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

上面报错:java.io.NotSerializableException: org.apache.hadoop.io.BytesWritable
没有被序列化什么的。网上给查了下,能够这样:
启动spark-shell加序列化参数,读取sequenceFile文件–正常web

// ./spark-shell --master local[2] --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
//这样启动spark-shell 加了个序列化的方式
scala> val b = sc.sequenceFile[org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text]("hdfs://hadoop001:9000//user/hive/warehouse/g6.db/order_seq")
b: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)] = MapPartitionsRDD[3] at sequenceFile at <console>:24

scala> b.collect
res1: Array[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)] = Array((,107030555555?2014-05-01 10:01:01.334+01), (,107030555555?2014-05-01 10:01:01.334+01), (,107030555555?2014-05-01 10:01:01.334+01), (,107030555555?2014-05-01 10:01:01.334+01), (,107030555555?2014-05-01 10:01:01.334+01))
//中间多了个问号,还没解决。。。。

用IDEA来操做:spark读取sequenceFile文件
----报错未解决shell

import org.apache.spark.{SparkConf, SparkContext}

object SparkContextApp {
  def main(args: Array[String]): Unit = {
    val SparkConf = new SparkConf().setAppName("SparkContextApp").setMaster("local[2]")
    val sc = new SparkContext(SparkConf)
    val a = sc.sequenceFile[Int,String]("hdfs://hadoop001:9000//user/hive/warehouse/g6.db/test_seq/000000_0")
    a.collect()
    sc.stop()
  }
}

用上述代码来操做,spark读取sequenceFile文件,报错以下:apache

......
 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
......
INFO HadoopRDD: Input split: hdfs://hadoop001:9000/user/hive/warehouse/g6.db/test_seq/000000_0:0+119
.......
WARN BlockReaderFactory: I/O error constructing remote block reader.
java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
......
 WARN DFSClient: Failed to connect to /10.9.140.90:50010 for block, add to deadNodes and continue. java.net.ConnectException: Connection timed out: no further information
java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
.......
INFO DFSClient: Could not obtain BP-319425841-10.9.140.90-1559687031867:blk_1073741864_1040 from any node: java.io.IOException: No live nodes contain block BP-319425841-10.9.140.90-1559687031867:blk_1073741864_1040 after checking nodes = [10.9.140.90:50010], ignoredNodes = null No live nodes contain current block Block locations: 10.9.140.90:50010 Dead nodes:  10.9.140.90:50010. Will get new block locations from namenode and retry...
.....
WARN DFSClient: Failed to connect to /10.9.140.90:50010 for block, add to deadNodes and continue. java.net.ConnectException: Connection timed out: no further information
......
WARN DFSClient: DFS Read
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-319425841-10.9.140.90-1559687031867:blk_1073741864_1040 file=/user/hive/warehouse/g6.db/test_seq/000000_0
.....

代码应该没有问题,应该是IDEA用spark访问不了HDFS致使的,网上查的缘由貌似没有管用的。。。。待解决svg