java spark 从mysql中读取数据

本样例是使用spark从mysql中读出数据java

import java.io.Serializable;
import java.sql.*;
import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import com.yhxd.einvoicegd.InvoiceAPI;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.JdbcRDD;
import scala.reflect.ClassManifestFactory$;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;

public class Main {

    private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Main.class);

    private static final JavaSparkContext sc =
            new JavaSparkContext(new SparkConf().setAppName("SparkJdbc").setMaster("local[*]"));

    private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
    private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.3.50:3306/9dfp";
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PWD = "123456";

    public static void main(String[] args) {
        DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);

        // Load data from MySQL
        JdbcRDD<JSONObject> jdbcRDD =
                new JdbcRDD<>(sc.sc(), dbConnection, "select * from b_fo where 0 >= ? and 0 <= ?", 0,
                        10, 2, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(JSONObject.class));

        // Convert to JavaRDD
        JavaRDD<JSONObject> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(JSONObject.class));

        javaRDD.foreach(new VoidFunction<JSONObject>() {
            @Override
            public void call(JSONObject jsonObject) throws Exception {
                System.out.println(jsonObject);
            }
        });
    }

    static class DbConnection extends AbstractFunction0<Connection> implements Serializable {

        private String driverClassName;
        private String connectionUrl;
        private String userName;
        private String password;

        public DbConnection(String driverClassName, String connectionUrl, String userName, String password) {
            this.driverClassName = driverClassName;
            this.connectionUrl = connectionUrl;
            this.userName = userName;
            this.password = password;
        }

        @Override
        public Connection apply() {
            try {
                Class.forName(driverClassName);
            } catch (ClassNotFoundException e) {
                LOGGER.error("Failed to load driver class", e);
            }

            Properties properties = new Properties();
            properties.setProperty("user", userName);
            properties.setProperty("password", password);

            Connection connection = null;
            try {
                connection = DriverManager.getConnection(connectionUrl, properties);
            } catch (SQLException e) {
                LOGGER.error("Connection failed", e);
            }

            return connection;
        }
    }

    static class MapResult extends AbstractFunction1<ResultSet, JSONObject> implements Serializable {

        public JSONObject apply(ResultSet resultSet) {
            ResultSetMetaData metaData = null;
            JSONObject jsonObj = new JSONObject();
            try {
                metaData = resultSet.getMetaData();
                int columnCount = metaData.getColumnCount();
                // 遍历每一列
                for (int i = 1; i <= columnCount; i++) {
                    String columnName = metaData.getColumnLabel(i);
                    String value = resultSet.getString(columnName);
                    jsonObj.put(columnName, value);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return jsonObj;
        }
    }
}

参考连接:mysql

http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/sql