spark dataframe 从mysql 中查询数据

本例是使用spark的dataframe方式从mysql中获取数据,DataFrame这个抽象类在spark sql 2.0.1的版本中已经没有了,取而代之的是使用DataSet<Row> 来解析从关系型数据库查出来的数据java

 

import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.StructField;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

public class SparkDataFrame {

    private static final Logger logger = LoggerFactory.getLogger(SparkDataFrame.class);

    private static final SparkContext sc =
            new SparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
    private static final SparkSession ss = new SparkSession(sc);

    private static final SQLContext sqlContext = new SQLContext(ss);

    public static void main(String[] args) {

        // 一个条件表示一个分区,手动分区,能够在配置文件中配置
        String[] predicates = new String[] {
                "1=1 order by insertdate between '2017-03-11 16:37:05' and '2017-03-14 09:52:28'",
                "1=1 order by insertdate between '2017-03-15 16:37:05' and '2017-03-16 09:52:28'",
                "1=1 order by insertdate between '2017-03-16 16:37:05' and '2017-03-17 09:52:28'",
                "1=1 order by insertdate between '2017-03-17 16:37:05' and '2017-03-18 09:52:28'",
                "1=1 order by insertdate between '2017-03-19 16:37:05' and '2017-03-20 09:52:28'" };


        String url = "jdbc:mysql://192.168.3.50:3306/database";
        String table = "t_electroniceinfowhole";
        Properties connectionProperties = new Properties();
        connectionProperties.setProperty("dbtable", table);// 设置表
        connectionProperties.setProperty("user", "root");// 设置用户名
        connectionProperties.setProperty("password", "123456");// 设置密码

        //Load MySQL query result as DataFrame
        Dataset<Row> jdbcDF = sqlContext.read().jdbc(url, table, predicates, connectionProperties);

        jdbcDF.foreach(new ForeachFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {
                Iterator<StructField> it = row.schema().iterator();
                JSONObject jsonObject = new JSONObject();
                while (it.hasNext()) {
                    StructField i = it.next();
                    int index = row.fieldIndex(i.name());
                    Object o = row.get(index);
                    jsonObject.put(i.name(), o);
                }
                logger.info(jsonObject.toJSONString());
                //TODO 
            }
        });
    }
}

 

源码中jdbc()方法有两个能够调用,一个就是本身传入一个String[] 做为分区,一个是使用lowerbound和upperbounds分区,但使用lowerbounds和upperbounds的时候须要在关系型数据库中有个type为int或者long的columnName,根据这个字段来分区,若是没有这个字段就不能分区了mysql

sparksql jdbc方法的源码:sql

//入参中有个columnName,用这个列值的范围进行分区
public Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, Properties connectionProperties) {
    JDBCPartitioningInfo partitioning = new JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions);
    Partition[] parts = org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation..MODULE$.columnPartition(partitioning);
    return this.jdbc(url, table, parts, connectionProperties);
}

//这个就好一些,能够自定义分区范围
public Dataset<Row> jdbc(String url, String table, String[] predicates, Properties connectionProperties) {
    Partition[] parts = (Partition[]).MODULE$.refArrayOps((Object[]).MODULE$.refArrayOps((Object[])predicates).zipWithIndex(scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Tuple2.class)))).map(new Serializable(this) {
        public static final long serialVersionUID = 0L;

        public final Partition apply(Tuple2<String, Object> x0$1) {
            if(x0$1 != null) {
                String part = (String)x0$1._1();
                int i = x0$1._2$mcI$sp();
                JDBCPartition var5 = new JDBCPartition(part, i);
                return var5;
            } else {
                throw new MatchError(x0$1);
            }
        }
    }, scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Partition.class)));
    return this.jdbc(url, table, parts, connectionProperties);
}
相关文章
相关标签/搜索