需求:将前些日子采集的评论存储到hbase中java
思路:ios
先用fastjson解析评论,而后构造rdd,最后使用spark与phoenix交互,把数据存储到hbase中sql
部分数据:apache
1 [ 2 { 3 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 4 "creationTime": "2019-04-08 01:13:42", 5 "content": "此用户没有填写评价内容", 6 "label": [] 7 }, 8 { 9 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 10 "creationTime": "2019-03-29 11:49:36", 11 "content": "不错", 12 "label": [] 13 }, 14 { 15 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 16 "creationTime": "2019-03-21 21:13:07", 17 "content": "正品没什么毛病。信号好像是照别的差一点,可是还能够,不是特别差。分辨率不是那么好,可是也不是特别差。通常般。手机不卡。打游戏很顺畅。官方正品没有翻车。", 18 "label": [ 19 { 20 "labelName": "功能齐全" 21 } 22 ] 23 }, 24 { 25 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 26 "creationTime": "2019-03-22 09:56:22", 27 "content": "不错是正品", 28 "label": [ 29 { 30 "labelName": "系统流畅" 31 }, 32 { 33 "labelName": "声音大" 34 }, 35 { 36 "labelName": "作工精致" 37 }, 38 { 39 "labelName": "待机时间长" 40 } 41 ] 42 }, 43 { 44 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 45 "creationTime": "2019-03-13 07:27:56", 46 "content": "性价比很高的手机,用习惯了ios转安卓7个月,从新回归,只能说,用苹果省心。苏宁质量有保障,送货快,价格优惠,推荐购买!", 47 "label": [ 48 { 49 "labelName": "系统流畅" 50 }, 51 { 52 "labelName": "功能齐全" 53 }, 54 { 55 "labelName": "包装通常" 56 } 57 ] 58 }, 59 { 60 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 61 "creationTime": "2019-02-25 22:03:18", 62 "content": "弟弟就想要一个苹果手机。原本打算买8p的。而后我,推荐他这款xr,是最新款。价格整体来讲性价比,比8p好。买了很快就到了,次日就。屏幕很大,仍是面容id。苹果x大不少。比***x小一点", 63 "label": [ 64 { 65 "labelName": "外观漂亮" 66 }, 67 { 68 "labelName": "作工精致" 69 }, 70 { 71 "labelName": "反应快" 72 }, 73 { 74 "labelName": "系统流畅" 75 } 76 ] 77 }, 78 { 79 "referenceName": "【券后低至5388】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 80 "creationTime": "2019-02-21 12:45:22", 81 "content": "物流很棒,xr价格能够接受、性能稳定!", 82 "label": [] 83 }, 84 { 85 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 86 "creationTime": "2019-02-22 12:14:55", 87 "content": "很不错的手机除了有点厚,极致的体验,黑边彻底没有影响", 88 "label": [ 89 { 90 "labelName": "拍照效果好" 91 }, 92 { 93 "labelName": "待机时间长" 94 }, 95 { 96 "labelName": "电池耐用" 97 }, 98 { 99 "labelName": "性价比高 " 100 } 101 ] 102 }, 103 { 104 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 105 "creationTime": "2019-02-13 00:28:03", 106 "content": "很是很是好的商品。很不错,下次再来", 107 "label": [ 108 { 109 "labelName": "信号稳定" 110 }, 111 { 112 "labelName": "反应快" 113 }, 114 { 115 "labelName": "声音大" 116 }, 117 { 118 "labelName": "作工精致" 119 } 120 ] 121 }, 122 { 123 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G 手机", 124 "creationTime": "2019-04-02 17:29:43", 125 "content": "此用户没有填写评价内容", 126 "label": [] 127 } 128 ] 129 [ 130 { 131 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 132 "creationTime": "2019-04-05 18:13:14", 133 "content": "满意嘻嘻", 134 "label": [ 135 { 136 "labelName": "音质好" 137 }, 138 { 139 "labelName": "拍照效果好" 140 }, 141 { 142 "labelName": "功能齐全" 143 }, 144 { 145 "labelName": "外观漂亮" 146 } 147 ] 148 }, 149 { 150 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 151 "creationTime": "2019-03-21 00:13:17", 152 "content": "棒棒哒", 153 "label": [] 154 }, 155 { 156 "referenceName": "【双12爆款】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 157 "creationTime": "2019-01-19 10:23:57", 158 "content": "双十二买的正好遇上手机坏了就买了xr太贵没舍得买一直用苹果的系统用顺手了不肯意换", 159 "label": [ 160 { 161 "labelName": "反应快" 162 }, 163 { 164 "labelName": "作工精致" 165 }, 166 { 167 "labelName": "信号稳定" 168 }, 169 { 170 "labelName": "待机时间长" 171 }, 172 { 173 "labelName": "性价比通常般" 174 } 175 ] 176 }, 177 { 178 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 179 "creationTime": "2019-03-21 12:47:34", 180 "content": "用了几天感受还能够,只是信号不是很好,整体上是能够的", 181 "label": [ 182 { 183 "labelName": "音质好" 184 }, 185 { 186 "labelName": "分辨率高" 187 } 188 ] 189 }, 190 { 191 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 192 "creationTime": "2019-03-09 08:32:15", 193 "content": "苹果手机作工精细,手感不错,外观设计也是颇有时尚感!系统运行十分流畅,操做体验不错;功能齐全,屏幕分辨率高,整体来讲很满意!", 194 "label": [ 195 { 196 "labelName": "作工精致" 197 }, 198 { 199 "labelName": "系统流畅" 200 }, 201 { 202 "labelName": "功能齐全" 203 } 204 ] 205 }, 206 { 207 "referenceName": "【低至5399】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 208 "creationTime": "2019-01-15 22:44:53", 209 "content": "真心喜欢,一直在徘徊x.仍是xr.我的以为真不错,一直信赖苏宁,新机为激活,价钱能接受,值得推荐,黑边什么的不影响。", 210 "label": [ 211 { 212 "labelName": "拍照效果好" 213 }, 214 { 215 "labelName": "外观漂亮" 216 }, 217 { 218 "labelName": "屏幕清晰" 219 } 220 ] 221 }, 222 { 223 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 224 "creationTime": "2019-03-08 22:12:18", 225 "content": "手机运行流畅,外观也漂亮,黑色彰显档次,音质也好,又是双卡双待手机,性价比不错,比起XS便宜很多。", 226 "label": [ 227 { 228 "labelName": "外观漂亮" 229 }, 230 { 231 "labelName": "系统流畅" 232 } 233 ] 234 }, 235 { 236 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 237 "creationTime": "2019-03-01 15:54:33", 238 "content": "手机很好,电池很是抗用,价钱也很是美丽,值得购买。", 239 "label": [ 240 { 241 "labelName": "系统流畅" 242 }, 243 { 244 "labelName": "电池耐用" 245 }, 246 { 247 "labelName": "分辨率高" 248 }, 249 { 250 "labelName": "待机时间长" 251 }, 252 { 253 "labelName": "包装通常" 254 } 255 ] 256 }, 257 { 258 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 259 "creationTime": "2019-02-19 09:37:25", 260 "content": "春节期间配送超快,手机没有任何问题,苏宁易购确实作到了全网最低价", 261 "label": [] 262 }, 263 { 264 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 265 "creationTime": "2019-02-23 13:27:15", 266 "content": "挺好的懒得拍照了借了几张黑边确实大", 267 "label": [ 268 { 269 "labelName": "待机时间长" 270 }, 271 { 272 "labelName": "反应快" 273 }, 274 { 275 "labelName": "作工精致" 276 } 277 ] 278 } 279 ]
数据是json数组,所以采用fastjson进行解析,为了最终存数据的更方便,须要构造Comment类json
依赖api
1 <dependency>
2 <groupId>com.alibaba</groupId>
3 <artifactId>fastjson</artifactId>
4 <version>1.2.47</version>
5 </dependency>
6
7 <dependency>
8 <groupId>junit</groupId>
9 <artifactId>junit</artifactId>
10 <version>4.9</version>
11 </dependency>
12
13
14 <!-- hbase -->
15 <!-- <dependency>
16 <groupId>org.apache.hbase</groupId>
17 <artifactId>hbase-client</artifactId>
18 <version>2.0.2</version>
19 </dependency>
20 <dependency>
21 <groupId>org.apache.hbase</groupId>
22 <artifactId>hbase-server</artifactId>
23 <version>2.0.2</version>
24 </dependency>
25 <dependency>
26 <groupId>org.apache.hbase</groupId>
27 <artifactId>hbase-common</artifactId>
28 <version>2.0.2</version>
29 </dependency> -->
30
31
32 <!-- phoenix -->
33 <dependency>
34 <groupId>org.apache.phoenix</groupId>
35 <artifactId>phoenix-core</artifactId>
36 <version>5.0.0-HBase-2.0</version>
37 </dependency>
38
39
40 <!-- phoenix_spark -->
41 <dependency>
42 <groupId>org.apache.phoenix</groupId>
43 <artifactId>phoenix-spark</artifactId>
44 <version>5.0.0-HBase-2.0</version>
45 </dependency>
46
47
48 <!-- spark -->
49 <dependency>
50 <groupId>org.apache.spark</groupId>
51 <artifactId>spark-core_2.11</artifactId>
52 <version>2.0.2</version>
53 </dependency>
54
55
56 <dependency>
57 <groupId>org.apache.spark</groupId>
58 <artifactId>spark-sql_2.11</artifactId>
59 <version>2.0.2</version>
60 </dependency>
Comment.java数组
1 package cn.tele.bean; 2
3 /**
4 * 5 * @author Tele 6 * 7 */
8 public class Comment { 9 private Integer id; 10 private String name; 11 private String content; 12 private String creationtime; 13 private String label; 14 private String platform; 15
16 public Integer getId() { 17 return id; 18 } 19
20 public void setId(Integer id) { 21 this.id = id; 22 } 23
24 public String getName() { 25 return name; 26 } 27
28 public void setName(String name) { 29 this.name = name; 30 } 31
32 public String getContent() { 33 return content; 34 } 35
36 public void setContent(String content) { 37 this.content = content; 38 } 39
40 public String getCreationtime() { 41 return creationtime; 42 } 43
44 public void setCreationtime(String creationtime) { 45 this.creationtime = creationtime; 46 } 47
48 public String getLabel() { 49 return label; 50 } 51
52 public void setLabel(String label) { 53 this.label = label; 54 } 55
56 public String getPlatform() { 57 return platform; 58 } 59
60 public void setPlatform(String platform) { 61 this.platform = platform; 62 } 63 }
StoreData.javasession
1 package cn.tele.spark; 2
3 import java.io.IOException; 4 import java.nio.file.FileVisitResult; 5 import java.nio.file.Files; 6 import java.nio.file.Path; 7 import java.nio.file.Paths; 8 import java.nio.file.SimpleFileVisitor; 9 import java.nio.file.attribute.BasicFileAttributes; 10 import java.sql.SQLException; 11 import java.util.ArrayList; 12 import java.util.HashMap; 13 import java.util.Iterator; 14 import java.util.List; 15 import java.util.Map; 16 import java.util.Map.Entry; 17 import java.util.Set; 18 import java.util.UUID; 19 import org.apache.spark.SparkConf; 20 import org.apache.spark.api.java.JavaRDD; 21 import org.apache.spark.api.java.JavaSparkContext; 22 import org.apache.spark.api.java.function.FlatMapFunction; 23 import org.apache.spark.api.java.function.Function; 24 import org.apache.spark.api.java.function.Function2; 25 import org.apache.spark.sql.Dataset; 26 import org.apache.spark.sql.Row; 27 import org.apache.spark.sql.SaveMode; 28 import org.apache.spark.sql.SparkSession; 29 import org.apache.spark.storage.StorageLevel; 30 import com.alibaba.fastjson.JSON; 31 import com.alibaba.fastjson.JSONArray; 32 import com.alibaba.fastjson.JSONObject; 33 import cn.tele.bean.Comment; 34 import scala.Tuple2; 35
36 /**
37 * 存储爬取的评论到hbase中 38 * 39 * @author Tele 40 * 41 */
42 public class StoreData { 43 private static SparkConf conf = new SparkConf().setAppName("storedata").setMaster("local").set("spark.serializer", 44 "org.apache.spark.serializer.KryoSerializer"); 45 private static JavaSparkContext jsc = new JavaSparkContext(conf); 46 private static SparkSession session = new SparkSession(jsc.sc()); 47 static { 48 // 注册
49 conf.registerKryoClasses(new Class[] { Comment.class }); 50 } 51 // 连接信息
52 private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004"; 53 /*
54 * private static final String DB_PHOENIX_DRIVER = 55 * "org.apache.phoenix.jdbc.PhoenixDriver"; private static final String 56 * DB_PHOENIX_USER = ""; private static final String DB_PHOENIX_PASS = ""; 57 * private static final String DB_PHOENIX_FETCHSIZE = "10000"; 58 */
59
60 public static void main(String[] args) throws SQLException { 61
62 // 遍历文件夹
63 Path path = Paths.get("F:\\comment\\"); 64
65 try { 66 MyFileVisitor myFileVisitor = new MyFileVisitor(); 67 Files.walkFileTree(path, myFileVisitor); 68 List<Map<String, Object>> list = myFileVisitor.getData(); 69 JavaRDD<Comment> commentRDD = getCommentRDD(list); 70 // 存储至hbase
71 storeData(commentRDD); 72
73 } catch (IOException e) { 74 e.printStackTrace(); 75 } 76
77 // 读取数据
78 /*
79 * JavaRDD<String> rdd = 80 * jsc.textFile("file:\\F:\\comment\\sn_comment\\iphonexr-2019-04-16-18-27-36\\" 81 * ); List<Comment> commentList = getCommentList(rdd,"iphonexr"); 82 */
83
84 jsc.close(); 85
86 } 87
88 private static int storeData(JavaRDD<Comment> rdd) { 89 int successCount = 0; 90 /*
91 * DataTypes.createStructType(Arrays.asList( 92 * DataTypes.createStructField("id",DataTypes.IntegerType,false), 93 * DataTypes.createStructField("name",DataTypes.StringType,false), 94 * DataTypes.createStructField("content",DataTypes.StringType,false), 95 * DataTypes.createStructField("creationtime",DataTypes.StringType,false), 96 * DataTypes.createStructField("label",DataTypes.StringType,true), 97 * DataTypes.createStructField("platform",DataTypes.StringType,false) )); 98 */
99
100 Dataset<Row> ds = session.createDataFrame(rdd, Comment.class); 101 ds.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", DB_PHOENIX_URL) 102 .option("table", "comment").save(); 103 ; 104
105 return successCount; 106 } 107
108 @SuppressWarnings("unchecked") 109 private static <U> JavaRDD<Comment> getCommentRDD(List<Map<String, Object>> list) { 110 JavaRDD<Map<String, Object>> originalRDD = jsc.parallelize(list); 111 JavaRDD<List<Comment>> listCommentRDD = originalRDD.map(new Function<Map<String, Object>, List<Comment>>() { 112
113 private static final long serialVersionUID = 1L; 114
115 List<Comment> dataList = new ArrayList<>(); 116
117 public List<Comment> call(Map<String, Object> v1) throws Exception { 118 Set<Entry<String, Object>> entrySet = v1.entrySet(); 119 Iterator<Entry<String, Object>> iterator = entrySet.iterator(); 120 while (iterator.hasNext()) { 121 Entry<String, Object> entry = iterator.next(); 122 String referenceName = entry.getKey(); 123 String platform = referenceName.split("#")[0]; 124 List<Comment> commentList = (List<Comment>) entry.getValue(); 125 commentList.forEach(cm -> { 126 cm.setPlatform(platform); 127 dataList.add(cm); 128 }); 129 println(referenceName + "评论量------------------:" + commentList.size()); 130 } 131 return dataList; 132 } 133 }).persist(StorageLevel.MEMORY_ONLY()); 134
135 JavaRDD<Comment> commentRDD = listCommentRDD.flatMap(new FlatMapFunction<List<Comment>, Comment>() { 136
137 private static final long serialVersionUID = 1L; 138
139 @Override 140 public Iterator<Comment> call(List<Comment> t) throws Exception { 141 return t.iterator(); 142 } 143 }); 144
145 long totalSize = commentRDD.count(); 146 println("评论总量:-----------" + totalSize); 147
148 // 设置id
149 JavaRDD<Comment> resultRDD = commentRDD.zipWithIndex().map(new Function<Tuple2<Comment, Long>, Comment>() { 150
151 private static final long serialVersionUID = 1L; 152
153 @Override 154 public Comment call(Tuple2<Comment, Long> v1) throws Exception { 155 v1._1.setId(Integer.valueOf(v1._2.toString())); 156 return v1._1; 157 } 158 }); 159 return resultRDD; 160 } 161
162 private static List<Comment> getCommentList(JavaRDD<String> rdd, String referenceName) { 163 List<Comment> commentList = new ArrayList<Comment>(); 164
165 String originalStr = rdd.reduce(new Function2<String, String, String>() { 166 private static final long serialVersionUID = 1L; 167
168 public String call(String v1, String v2) throws Exception { 169 return v1.trim() + v2.trim(); 170 } 171 }); 172 String uuid = UUID.randomUUID().toString(); 173 originalStr = originalStr.replace("][", "]" + uuid + "["); 174
175 // 解析json
176 String[] pages = originalStr.split(uuid); 177 for (String page : pages) { 178 try { 179 JSONArray jsonArray = JSON.parseArray(page); 180 for (Object obj : jsonArray) { 181 JSONObject jsonObject = (JSONObject) obj; 182 // String referenceName = jsonObject.getString("referenceName");
183 String creationTime = jsonObject.getString("creationTime"); 184 String content = jsonObject.getString("content"); 185 println("referenceName:" + referenceName); 186 println("creationTime:" + creationTime); 187 println("content:" + content); 188
189 // 封装
190 Comment comment = new Comment(); 191 comment.setName(referenceName); 192 comment.setCreationtime(creationTime); 193 comment.setContent(content); 194
195 JSONArray labelArray = jsonObject.getJSONArray("label"); 196 if (labelArray != null) { 197 String label = ""; 198 for (Object labelObj : labelArray) { 199 JSONObject labelObject = (JSONObject) labelObj; 200 label += labelObject.getString("labelName") + "#"; 201 } 202 comment.setLabel(label); 203 println("label:" + label); 204 } 205 commentList.add(comment); 206 } 207 } catch (Exception e) { 208 continue; 209 } 210 } 211 return commentList; 212 } 213
214 private static class MyFileVisitor extends SimpleFileVisitor<Path> { 215 private List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); 216 String platform = null; 217
218 @Override 219 public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { 220 println("当前访问的文件夹是------" + dir.toAbsolutePath() /* + dir.getFileName() */); 221 String fileName = dir.getFileName().toString(); 222 if (fileName.contains("_")) { 223 platform = fileName.split("_")[0]; 224 } 225 if (platform != null) { 226 if (fileName.contains("-")) { 227 String referenceName = fileName.split("-")[0]; 228 JavaRDD<String> rdd = jsc.textFile(dir.toAbsolutePath().toString()); 229 List<Comment> commentList = getCommentList(rdd, referenceName); 230 Map<String, Object> map = new HashMap<String, Object>(); 231 // 平台_品牌--评论
232 map.put(platform + "#" + referenceName, commentList); 233 list.add(map); 234 } 235 } 236
237 return super.preVisitDirectory(dir, attrs); 238 } 239
240 @Override 241 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { 242 return super.visitFile(file, attrs); 243 } 244
245 @Override 246 public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { 247 return super.visitFileFailed(file, exc); 248 } 249
250 @Override 251 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { 252 return super.postVisitDirectory(dir, exc); 253 } 254
255 public List<Map<String, Object>> getData() { 256 return list; 257 } 258
259 } 260
261 private static void println(Object obj) { 262 System.out.println(obj); 263 } 264
265 }
可能会报以下异常,但并不影响,彷佛和我使用的hbase版本有关app
在终端进行查看dom
说明:
1.StorgeData还能够再进行优化,好比解析json的时候能够直接构造rdd而不是用list,也能够改形成集群上运行的版本,但因为个人数据量很少,直接用本地模式就足够了
2.SaveMode只能是SaveMode.Overwrite其余模式phoenix都不支持,实际测试时发现仍是append
3.生成id时用了zipWithIndex,但连续的id容易形成集群热点问题,使用phoenix建表时最好加盐
4.与phoenix交互时也能够用spark的jdbc,能够参考http://www.javashuo.com/article/p-otlcstfw-nq.html
5.关于目录问题,spark支持对目录下的多个文件进行读取构造rdd,所以遍历时到父文件夹便可,固然,遍历到每一个文件也能够
附测试用例:
1 /**
2 * 3 *@author Tele 4 *测试spark与phoenix集成 5 */
6 public class SparkPhoneix { 7 private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local"); 8 private static JavaSparkContext jsc = new JavaSparkContext(conf); 9 private static SparkSession session = new SparkSession(jsc.sc()); 10
11 //连接信息
12 private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004"; 13 /*private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; 14 private static final String DB_PHOENIX_USER = ""; 15 private static final String DB_PHOENIX_PASS = ""; 16 private static final String DB_PHOENIX_FETCHSIZE = "10000";*/
17
18 public static void main(String[] args) { 19 Dataset<Row> ds = session.read().format("org.apache.phoenix.spark") 20 .option("zkUrl", DB_PHOENIX_URL) 21 .option("table", "TEST") 22 .load(); 23 ds.createOrReplaceTempView("test"); 24
25 ds.show(); 26
27
28 //插入数据测试_SALT,ID,INFO.ITEM,INFO.CONTENT,INFO.LABEL
29 StructType schema = DataTypes.createStructType(Arrays.asList( 30 DataTypes.createStructField("id",DataTypes.IntegerType,false), 31 DataTypes.createStructField("item",DataTypes.StringType,false), 32 DataTypes.createStructField("content",DataTypes.StringType,false), 33 DataTypes.createStructField("label",DataTypes.StringType,true) 34 )); 35
36
37 // 建立数据
38 List<Row> list = new ArrayList<Row>(); 39 Row row1 = RowFactory.create(3,"iphone","不错",null); 40 list.add(row1); 41
42 Dataset<Row> dataset = session.createDataFrame(list,schema); 43 //对于phoenix只能使用overwrite模式,但实际操做时发现是append数据
44 dataset.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark") 45 .option("zkUrl", DB_PHOENIX_URL) 46 .option("table", "TEST").save();; 47 ds.show(); 48 session.stop(); 49 jsc.close(); 50 } 51 }