毕设三: spark与phoenix集成插入数据/解析json数组

需求:将前些日子采集的评论存储到hbase中java

思路:ios

先用fastjson解析评论,而后构造rdd,最后使用spark与phoenix交互,把数据存储到hbase中sql

部分数据:apache

  1 [
  2   {
  3     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
  4     "creationTime": "2019-04-08 01:13:42",
  5     "content": "此用户没有填写评价内容",
  6     "label": []
  7   },
  8   {
  9     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
 10     "creationTime": "2019-03-29 11:49:36",
 11     "content": "不错",
 12     "label": []
 13   },
 14   {
 15     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
 16     "creationTime": "2019-03-21 21:13:07",
 17     "content": "正品没什么毛病。信号好像是照别的差一点,可是还能够,不是特别差。分辨率不是那么好,可是也不是特别差。通常般。手机不卡。打游戏很顺畅。官方正品没有翻车。",
 18     "label": [
 19       {
 20         "labelName": "功能齐全"
 21       }
 22     ]
 23   },
 24   {
 25     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
 26     "creationTime": "2019-03-22 09:56:22",
 27     "content": "不错是正品",
 28     "label": [
 29       {
 30         "labelName": "系统流畅"
 31       },
 32       {
 33         "labelName": "声音大"
 34       },
 35       {
 36         "labelName": "作工精致"
 37       },
 38       {
 39         "labelName": "待机时间长"
 40       }
 41     ]
 42   },
 43   {
 44     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
 45     "creationTime": "2019-03-13 07:27:56",
 46     "content": "性价比很高的手机,用习惯了ios转安卓7个月,从新回归,只能说,用苹果省心。苏宁质量有保障,送货快,价格优惠,推荐购买!",
 47     "label": [
 48       {
 49         "labelName": "系统流畅"
 50       },
 51       {
 52         "labelName": "功能齐全"
 53       },
 54       {
 55         "labelName": "包装通常"
 56       }
 57     ]
 58   },
 59   {
 60     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
 61     "creationTime": "2019-02-25 22:03:18",
 62     "content": "弟弟就想要一个苹果手机。原本打算买8p的。而后我,推荐他这款xr,是最新款。价格整体来讲性价比,比8p好。买了很快就到了,次日就。屏幕很大,仍是面容id。苹果x大不少。比***x小一点",
 63     "label": [
 64       {
 65         "labelName": "外观漂亮"
 66       },
 67       {
 68         "labelName": "作工精致"
 69       },
 70       {
 71         "labelName": "反应快"
 72       },
 73       {
 74         "labelName": "系统流畅"
 75       }
 76     ]
 77   },
 78   {
 79     "referenceName": "【券后低至5388】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
 80     "creationTime": "2019-02-21 12:45:22",
 81     "content": "物流很棒,xr价格能够接受、性能稳定!",
 82     "label": []
 83   },
 84   {
 85     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
 86     "creationTime": "2019-02-22 12:14:55",
 87     "content": "很不错的手机除了有点厚,极致的体验,黑边彻底没有影响",
 88     "label": [
 89       {
 90         "labelName": "拍照效果好"
 91       },
 92       {
 93         "labelName": "待机时间长"
 94       },
 95       {
 96         "labelName": "电池耐用"
 97       },
 98       {
 99         "labelName": "性价比高 "
100       }
101     ]
102   },
103   {
104     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
105     "creationTime": "2019-02-13 00:28:03",
106     "content": "很是很是好的商品。很不错,下次再来",
107     "label": [
108       {
109         "labelName": "信号稳定"
110       },
111       {
112         "labelName": "反应快"
113       },
114       {
115         "labelName": "声音大"
116       },
117       {
118         "labelName": "作工精致"
119       }
120     ]
121   },
122   {
123     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G 手机",
124     "creationTime": "2019-04-02 17:29:43",
125     "content": "此用户没有填写评价内容",
126     "label": []
127   }
128 ]
129 [
130   {
131     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
132     "creationTime": "2019-04-05 18:13:14",
133     "content": "满意嘻嘻",
134     "label": [
135       {
136         "labelName": "音质好"
137       },
138       {
139         "labelName": "拍照效果好"
140       },
141       {
142         "labelName": "功能齐全"
143       },
144       {
145         "labelName": "外观漂亮"
146       }
147     ]
148   },
149   {
150     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
151     "creationTime": "2019-03-21 00:13:17",
152     "content": "棒棒哒",
153     "label": []
154   },
155   {
156     "referenceName": "【双12爆款】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
157     "creationTime": "2019-01-19 10:23:57",
158     "content": "双十二买的正好遇上手机坏了就买了xr太贵没舍得买一直用苹果的系统用顺手了不肯意换",
159     "label": [
160       {
161         "labelName": "反应快"
162       },
163       {
164         "labelName": "作工精致"
165       },
166       {
167         "labelName": "信号稳定"
168       },
169       {
170         "labelName": "待机时间长"
171       },
172       {
173         "labelName": "性价比通常般"
174       }
175     ]
176   },
177   {
178     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
179     "creationTime": "2019-03-21 12:47:34",
180     "content": "用了几天感受还能够,只是信号不是很好,整体上是能够的",
181     "label": [
182       {
183         "labelName": "音质好"
184       },
185       {
186         "labelName": "分辨率高"
187       }
188     ]
189   },
190   {
191     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
192     "creationTime": "2019-03-09 08:32:15",
193     "content": "苹果手机作工精细,手感不错,外观设计也是颇有时尚感!系统运行十分流畅,操做体验不错;功能齐全,屏幕分辨率高,整体来讲很满意!",
194     "label": [
195       {
196         "labelName": "作工精致"
197       },
198       {
199         "labelName": "系统流畅"
200       },
201       {
202         "labelName": "功能齐全"
203       }
204     ]
205   },
206   {
207     "referenceName": "【低至5399】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
208     "creationTime": "2019-01-15 22:44:53",
209     "content": "真心喜欢,一直在徘徊x.仍是xr.我的以为真不错,一直信赖苏宁,新机为激活,价钱能接受,值得推荐,黑边什么的不影响。",
210     "label": [
211       {
212         "labelName": "拍照效果好"
213       },
214       {
215         "labelName": "外观漂亮"
216       },
217       {
218         "labelName": "屏幕清晰"
219       }
220     ]
221   },
222   {
223     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
224     "creationTime": "2019-03-08 22:12:18",
225     "content": "手机运行流畅,外观也漂亮,黑色彰显档次,音质也好,又是双卡双待手机,性价比不错,比起XS便宜很多。",
226     "label": [
227       {
228         "labelName": "外观漂亮"
229       },
230       {
231         "labelName": "系统流畅"
232       }
233     ]
234   },
235   {
236     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
237     "creationTime": "2019-03-01 15:54:33",
238     "content": "手机很好,电池很是抗用,价钱也很是美丽,值得购买。",
239     "label": [
240       {
241         "labelName": "系统流畅"
242       },
243       {
244         "labelName": "电池耐用"
245       },
246       {
247         "labelName": "分辨率高"
248       },
249       {
250         "labelName": "待机时间长"
251       },
252       {
253         "labelName": "包装通常"
254       }
255     ]
256   },
257   {
258     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
259     "creationTime": "2019-02-19 09:37:25",
260     "content": "春节期间配送超快,手机没有任何问题,苏宁易购确实作到了全网最低价",
261     "label": []
262   },
263   {
264     "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
265     "creationTime": "2019-02-23 13:27:15",
266     "content": "挺好的懒得拍照了借了几张黑边确实大",
267     "label": [
268       {
269         "labelName": "待机时间长"
270       },
271       {
272         "labelName": "反应快"
273       },
274       {
275         "labelName": "作工精致"
276       }
277     ]
278   }
279 ]
View Code

数据是json数组,所以采用fastjson进行解析,为了最终存数据的更方便,须要构造Comment类json

依赖api

 1         <dependency>
 2             <groupId>com.alibaba</groupId>
 3             <artifactId>fastjson</artifactId>
 4             <version>1.2.47</version>
 5         </dependency>
 6         
 7         <dependency>
 8             <groupId>junit</groupId>
 9             <artifactId>junit</artifactId>
10             <version>4.9</version>
11         </dependency>
12     
13     
14         <!-- hbase -->
15         <!-- <dependency>
16             <groupId>org.apache.hbase</groupId>
17             <artifactId>hbase-client</artifactId>
18             <version>2.0.2</version>
19         </dependency>
20         <dependency>
21             <groupId>org.apache.hbase</groupId>
22             <artifactId>hbase-server</artifactId>
23             <version>2.0.2</version>
24         </dependency>
25         <dependency>
26             <groupId>org.apache.hbase</groupId>
27             <artifactId>hbase-common</artifactId>
28             <version>2.0.2</version>
29         </dependency> -->
30         
31         
32         <!-- phoenix -->
33         <dependency>
34             <groupId>org.apache.phoenix</groupId>
35             <artifactId>phoenix-core</artifactId>
36             <version>5.0.0-HBase-2.0</version>
37         </dependency>
38         
39          
40         <!-- phoenix_spark -->
41          <dependency>
42             <groupId>org.apache.phoenix</groupId>
43             <artifactId>phoenix-spark</artifactId>
44             <version>5.0.0-HBase-2.0</version>
45         </dependency>
46          
47          
48          <!-- spark -->
49         <dependency>
50             <groupId>org.apache.spark</groupId>
51             <artifactId>spark-core_2.11</artifactId>
52             <version>2.0.2</version>
53         </dependency>
54           
55           
56         <dependency>
57             <groupId>org.apache.spark</groupId>
58             <artifactId>spark-sql_2.11</artifactId>
59             <version>2.0.2</version>
60         </dependency>

 Comment.java数组

 1 package cn.tele.bean;  2 
 3 /**
 4  *  5  * @author Tele  6  *  7  */
 8 public class Comment {  9     private Integer id; 10     private String name; 11     private String content; 12     private String creationtime; 13     private String label; 14     private String platform; 15 
16     public Integer getId() { 17         return id; 18  } 19 
20     public void setId(Integer id) { 21         this.id = id; 22  } 23 
24     public String getName() { 25         return name; 26  } 27 
28     public void setName(String name) { 29         this.name = name; 30  } 31 
32     public String getContent() { 33         return content; 34  } 35 
36     public void setContent(String content) { 37         this.content = content; 38  } 39 
40     public String getCreationtime() { 41         return creationtime; 42  } 43 
44     public void setCreationtime(String creationtime) { 45         this.creationtime = creationtime; 46  } 47 
48     public String getLabel() { 49         return label; 50  } 51 
52     public void setLabel(String label) { 53         this.label = label; 54  } 55 
56     public String getPlatform() { 57         return platform; 58  } 59 
60     public void setPlatform(String platform) { 61         this.platform = platform; 62  } 63 }

StoreData.javasession

 1 package cn.tele.spark;  2 
 3 import java.io.IOException;  4 import java.nio.file.FileVisitResult;  5 import java.nio.file.Files;  6 import java.nio.file.Path;  7 import java.nio.file.Paths;  8 import java.nio.file.SimpleFileVisitor;  9 import java.nio.file.attribute.BasicFileAttributes;  10 import java.sql.SQLException;  11 import java.util.ArrayList;  12 import java.util.HashMap;  13 import java.util.Iterator;  14 import java.util.List;  15 import java.util.Map;  16 import java.util.Map.Entry;  17 import java.util.Set;  18 import java.util.UUID;  19 import org.apache.spark.SparkConf;  20 import org.apache.spark.api.java.JavaRDD;  21 import org.apache.spark.api.java.JavaSparkContext;  22 import org.apache.spark.api.java.function.FlatMapFunction;  23 import org.apache.spark.api.java.function.Function;  24 import org.apache.spark.api.java.function.Function2;  25 import org.apache.spark.sql.Dataset;  26 import org.apache.spark.sql.Row;  27 import org.apache.spark.sql.SaveMode;  28 import org.apache.spark.sql.SparkSession;  29 import org.apache.spark.storage.StorageLevel;  30 import com.alibaba.fastjson.JSON;  31 import com.alibaba.fastjson.JSONArray;  32 import com.alibaba.fastjson.JSONObject;  33 import cn.tele.bean.Comment;  34 import scala.Tuple2;  35 
 36 /**
 37  * 存储爬取的评论到hbase中  38  *  39  * @author Tele  40  *  41  */
 42 public class StoreData {  43     private static SparkConf conf = new SparkConf().setAppName("storedata").setMaster("local").set("spark.serializer",  44             "org.apache.spark.serializer.KryoSerializer");  45     private static JavaSparkContext jsc = new JavaSparkContext(conf);  46     private static SparkSession session = new SparkSession(jsc.sc());  47     static {  48         // 注册
 49         conf.registerKryoClasses(new Class[] { Comment.class });  50  }  51     // 连接信息
 52     private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004";  53     /*
 54  * private static final String DB_PHOENIX_DRIVER =  55  * "org.apache.phoenix.jdbc.PhoenixDriver"; private static final String  56  * DB_PHOENIX_USER = ""; private static final String DB_PHOENIX_PASS = "";  57  * private static final String DB_PHOENIX_FETCHSIZE = "10000";  58      */
 59 
 60     public static void main(String[] args) throws SQLException {  61 
 62         // 遍历文件夹
 63         Path path = Paths.get("F:\\comment\\");  64 
 65         try {  66             MyFileVisitor myFileVisitor = new MyFileVisitor();  67  Files.walkFileTree(path, myFileVisitor);  68             List<Map<String, Object>> list = myFileVisitor.getData();  69             JavaRDD<Comment> commentRDD = getCommentRDD(list);  70             // 存储至hbase
 71  storeData(commentRDD);  72 
 73         } catch (IOException e) {  74  e.printStackTrace();  75  }  76 
 77         // 读取数据
 78         /*
 79  * JavaRDD<String> rdd =  80  * jsc.textFile("file:\\F:\\comment\\sn_comment\\iphonexr-2019-04-16-18-27-36\\"  81  * ); List<Comment> commentList = getCommentList(rdd,"iphonexr");  82          */
 83 
 84  jsc.close();  85 
 86  }  87 
 88     private static int storeData(JavaRDD<Comment> rdd) {  89         int successCount = 0;  90         /*
 91  * DataTypes.createStructType(Arrays.asList(  92  * DataTypes.createStructField("id",DataTypes.IntegerType,false),  93  * DataTypes.createStructField("name",DataTypes.StringType,false),  94  * DataTypes.createStructField("content",DataTypes.StringType,false),  95  * DataTypes.createStructField("creationtime",DataTypes.StringType,false),  96  * DataTypes.createStructField("label",DataTypes.StringType,true),  97  * DataTypes.createStructField("platform",DataTypes.StringType,false) ));  98          */
 99 
100         Dataset<Row> ds = session.createDataFrame(rdd, Comment.class); 101         ds.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", DB_PHOENIX_URL) 102                 .option("table", "comment").save(); 103  ; 104 
105         return successCount; 106  } 107 
108     @SuppressWarnings("unchecked") 109     private static <U> JavaRDD<Comment> getCommentRDD(List<Map<String, Object>> list) { 110         JavaRDD<Map<String, Object>> originalRDD = jsc.parallelize(list); 111         JavaRDD<List<Comment>> listCommentRDD = originalRDD.map(new Function<Map<String, Object>, List<Comment>>() { 112 
113             private static final long serialVersionUID = 1L; 114 
115             List<Comment> dataList = new ArrayList<>(); 116 
117             public List<Comment> call(Map<String, Object> v1) throws Exception { 118                 Set<Entry<String, Object>> entrySet = v1.entrySet(); 119                 Iterator<Entry<String, Object>> iterator = entrySet.iterator(); 120                 while (iterator.hasNext()) { 121                     Entry<String, Object> entry = iterator.next(); 122                     String referenceName = entry.getKey(); 123                     String platform = referenceName.split("#")[0]; 124                     List<Comment> commentList = (List<Comment>) entry.getValue(); 125                     commentList.forEach(cm -> { 126  cm.setPlatform(platform); 127  dataList.add(cm); 128  }); 129                     println(referenceName + "评论量------------------:" + commentList.size()); 130  } 131                 return dataList; 132  } 133  }).persist(StorageLevel.MEMORY_ONLY()); 134 
135         JavaRDD<Comment> commentRDD = listCommentRDD.flatMap(new FlatMapFunction<List<Comment>, Comment>() { 136 
137             private static final long serialVersionUID = 1L; 138 
139  @Override 140             public Iterator<Comment> call(List<Comment> t) throws Exception { 141                 return t.iterator(); 142  } 143  }); 144 
145         long totalSize = commentRDD.count(); 146         println("评论总量:-----------" + totalSize); 147 
148         // 设置id
149         JavaRDD<Comment> resultRDD = commentRDD.zipWithIndex().map(new Function<Tuple2<Comment, Long>, Comment>() { 150 
151             private static final long serialVersionUID = 1L; 152 
153  @Override 154             public Comment call(Tuple2<Comment, Long> v1) throws Exception { 155  v1._1.setId(Integer.valueOf(v1._2.toString())); 156                 return v1._1; 157  } 158  }); 159         return resultRDD; 160  } 161 
162     private static List<Comment> getCommentList(JavaRDD<String> rdd, String referenceName) { 163         List<Comment> commentList = new ArrayList<Comment>(); 164 
165         String originalStr = rdd.reduce(new Function2<String, String, String>() { 166             private static final long serialVersionUID = 1L; 167 
168             public String call(String v1, String v2) throws Exception { 169                 return v1.trim() + v2.trim(); 170  } 171  }); 172         String uuid = UUID.randomUUID().toString(); 173         originalStr = originalStr.replace("][", "]" + uuid + "["); 174 
175         // 解析json
176         String[] pages = originalStr.split(uuid); 177         for (String page : pages) { 178             try { 179                 JSONArray jsonArray = JSON.parseArray(page); 180                 for (Object obj : jsonArray) { 181                     JSONObject jsonObject = (JSONObject) obj; 182 // String referenceName = jsonObject.getString("referenceName");
183                     String creationTime = jsonObject.getString("creationTime"); 184                     String content = jsonObject.getString("content"); 185                     println("referenceName:" + referenceName); 186                     println("creationTime:" + creationTime); 187                     println("content:" + content); 188 
189                     // 封装
190                     Comment comment = new Comment(); 191  comment.setName(referenceName); 192  comment.setCreationtime(creationTime); 193  comment.setContent(content); 194 
195                     JSONArray labelArray = jsonObject.getJSONArray("label"); 196                     if (labelArray != null) { 197                         String label = ""; 198                         for (Object labelObj : labelArray) { 199                             JSONObject labelObject = (JSONObject) labelObj; 200                             label += labelObject.getString("labelName") + "#"; 201  } 202  comment.setLabel(label); 203                         println("label:" + label); 204  } 205  commentList.add(comment); 206  } 207             } catch (Exception e) { 208                 continue; 209  } 210  } 211         return commentList; 212  } 213 
214     private static class MyFileVisitor extends SimpleFileVisitor<Path> { 215         private List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); 216         String platform = null; 217 
218  @Override 219         public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { 220             println("当前访问的文件夹是------" + dir.toAbsolutePath() /* + dir.getFileName() */); 221             String fileName = dir.getFileName().toString(); 222             if (fileName.contains("_")) { 223                 platform = fileName.split("_")[0]; 224  } 225             if (platform != null) { 226                 if (fileName.contains("-")) { 227                     String referenceName = fileName.split("-")[0]; 228                     JavaRDD<String> rdd = jsc.textFile(dir.toAbsolutePath().toString()); 229                     List<Comment> commentList = getCommentList(rdd, referenceName); 230                     Map<String, Object> map = new HashMap<String, Object>(); 231                     // 平台_品牌--评论
232                     map.put(platform + "#" + referenceName, commentList); 233  list.add(map); 234  } 235  } 236 
237             return super.preVisitDirectory(dir, attrs); 238  } 239 
240  @Override 241         public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { 242             return super.visitFile(file, attrs); 243  } 244 
245  @Override 246         public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { 247             return super.visitFileFailed(file, exc); 248  } 249 
250  @Override 251         public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { 252             return super.postVisitDirectory(dir, exc); 253  } 254 
255         public List<Map<String, Object>> getData() { 256             return list; 257  } 258 
259  } 260 
261     private static void println(Object obj) { 262  System.out.println(obj); 263  } 264 
265 }

可能会报以下异常,但并不影响,彷佛和我使用的hbase版本有关app

在终端进行查看dom

说明:

1.StorgeData还能够再进行优化,好比解析json的时候能够直接构造rdd而不是用list,也能够改形成集群上运行的版本,但因为个人数据量很少,直接用本地模式就足够了

2.SaveMode只能是SaveMode.Overwrite其余模式phoenix都不支持,实际测试时发现仍是append

3.生成id时用了zipWithIndex,但连续的id容易形成集群热点问题,使用phoenix建表时最好加盐

4.与phoenix交互时也能够用spark的jdbc,能够参考http://www.javashuo.com/article/p-otlcstfw-nq.html

5.关于目录问题,spark支持对目录下的多个文件进行读取构造rdd,所以遍历时到父文件夹便可,固然,遍历到每一个文件也能够

附测试用例:

 1 /** 
 2  *  3  *@author Tele  4  *测试spark与phoenix集成  5  */
 6 public class SparkPhoneix {  7     private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local");  8     private static JavaSparkContext jsc = new JavaSparkContext(conf);  9     private static SparkSession session = new SparkSession(jsc.sc()); 10     
11     //连接信息
12     private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004"; 13     /*private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; 14  private static final String DB_PHOENIX_USER = ""; 15  private static final String DB_PHOENIX_PASS = ""; 16  private static final String DB_PHOENIX_FETCHSIZE = "10000";*/
17     
18     public static void main(String[] args) { 19          Dataset<Row> ds = session.read().format("org.apache.phoenix.spark") 20                                         .option("zkUrl", DB_PHOENIX_URL) 21                                         .option("table", "TEST") 22  .load(); 23          ds.createOrReplaceTempView("test"); 24          
25  ds.show(); 26          
27          
28          //插入数据测试_SALT,ID,INFO.ITEM,INFO.CONTENT,INFO.LABEL
29         StructType schema = DataTypes.createStructType(Arrays.asList( 30                     DataTypes.createStructField("id",DataTypes.IntegerType,false), 31                     DataTypes.createStructField("item",DataTypes.StringType,false), 32                     DataTypes.createStructField("content",DataTypes.StringType,false), 33                     DataTypes.createStructField("label",DataTypes.StringType,true) 34  )); 35         
36         
37         // 建立数据
38         List<Row> list = new ArrayList<Row>(); 39         Row row1 = RowFactory.create(3,"iphone","不错",null); 40  list.add(row1); 41         
42         Dataset<Row> dataset = session.createDataFrame(list,schema); 43         //对于phoenix只能使用overwrite模式,但实际操做时发现是append数据
44         dataset.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark") 45                                          .option("zkUrl", DB_PHOENIX_URL) 46                                          .option("table", "TEST").save();; 47  ds.show(); 48  session.stop(); 49  jsc.close(); 50  } 51 }
相关文章
相关标签/搜索