本例是使用spark的dataframe方式从mysql中获取数据,DataFrame这个抽象类在spark sql 2.0.1的版本中已经没有了,取而代之的是使用DataSet<Row> 来解析从关系型数据库查出来的数据java
import com.alibaba.fastjson.JSONObject; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.StructField; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.Iterator; public class SparkDataFrame { private static final Logger logger = LoggerFactory.getLogger(SparkDataFrame.class); private static final SparkContext sc = new SparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); private static final SparkSession ss = new SparkSession(sc); private static final SQLContext sqlContext = new SQLContext(ss); public static void main(String[] args) { // 一个条件表示一个分区,手动分区,能够在配置文件中配置 String[] predicates = new String[] { "1=1 order by insertdate between '2017-03-11 16:37:05' and '2017-03-14 09:52:28'", "1=1 order by insertdate between '2017-03-15 16:37:05' and '2017-03-16 09:52:28'", "1=1 order by insertdate between '2017-03-16 16:37:05' and '2017-03-17 09:52:28'", "1=1 order by insertdate between '2017-03-17 16:37:05' and '2017-03-18 09:52:28'", "1=1 order by insertdate between '2017-03-19 16:37:05' and '2017-03-20 09:52:28'" }; String url = "jdbc:mysql://192.168.3.50:3306/database"; String table = "t_electroniceinfowhole"; Properties connectionProperties = new Properties(); connectionProperties.setProperty("dbtable", table);// 设置表 connectionProperties.setProperty("user", "root");// 设置用户名 connectionProperties.setProperty("password", "123456");// 设置密码 //Load MySQL query result as DataFrame Dataset<Row> jdbcDF = sqlContext.read().jdbc(url, table, predicates, connectionProperties); jdbcDF.foreach(new ForeachFunction<Row>() { @Override public void call(Row row) throws Exception { Iterator<StructField> it = row.schema().iterator(); JSONObject jsonObject = new JSONObject(); while (it.hasNext()) { StructField i = it.next(); int index = row.fieldIndex(i.name()); Object o = row.get(index); jsonObject.put(i.name(), o); } logger.info(jsonObject.toJSONString()); //TODO } }); } }
源码中jdbc()方法有两个能够调用,一个就是本身传入一个String[] 做为分区,一个是使用lowerbound和upperbounds分区,但使用lowerbounds和upperbounds的时候须要在关系型数据库中有个type为int或者long的columnName,根据这个字段来分区,若是没有这个字段就不能分区了mysql
sparksql jdbc方法的源码:sql
//入参中有个columnName,用这个列值的范围进行分区 public Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, Properties connectionProperties) { JDBCPartitioningInfo partitioning = new JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions); Partition[] parts = org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation..MODULE$.columnPartition(partitioning); return this.jdbc(url, table, parts, connectionProperties); } //这个就好一些,能够自定义分区范围 public Dataset<Row> jdbc(String url, String table, String[] predicates, Properties connectionProperties) { Partition[] parts = (Partition[]).MODULE$.refArrayOps((Object[]).MODULE$.refArrayOps((Object[])predicates).zipWithIndex(scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Tuple2.class)))).map(new Serializable(this) { public static final long serialVersionUID = 0L; public final Partition apply(Tuple2<String, Object> x0$1) { if(x0$1 != null) { String part = (String)x0$1._1(); int i = x0$1._2$mcI$sp(); JDBCPartition var5 = new JDBCPartition(part, i); return var5; } else { throw new MatchError(x0$1); } } }, scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Partition.class))); return this.jdbc(url, table, parts, connectionProperties); }