Spark- JdbcRDD以及注意事项

先上Demojava

package com.rz.spark.base

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRDDDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    val getConn=()=>{
      DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8","root","root")
    }

    // 建立RDD,这个RDD会记录之后从MySQL中读取数据
    val jdbcRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,
      getConn,
      "select * from logs where id >= ? and id <= ?",
      1,
      5,
      2, //分区数量
      rs => {
        val id = rs.getInt(1)
        val name = rs.getString(2)
        val age = rs.getInt(3)
        (id, name, age) //将数据库查询出来的数据集转成想要的数据格式
      }
    )
    val rs = jdbcRDD.collect()
    print(rs.toBuffer)
  }
}

返回查询结果正确mysql

现象

修改查询的SQL,返回的数据量不对。sql

"select * from logs where id >= ? and id < ?"

缘由

在触发Action的时候,Task在每一个分区上的业务逻辑是相同的(id >= ? and id < ?"),只是读取的数据和处理的数据不同。RDD根据数据量和分区数据,均匀地分配每一个分区Task读取数据的范围。数据库

分区1读取[1,2)的数据,分区2读取[3,5)的数据。apache

使用相同的逻辑分区1丢掉了id=2的数据,这是为何,id >= 1 and id < 5"只返回3条数据的缘由,若是只有一个分区的时候可以读取到正确的数据量。this

解决办法

为了不出现丢数据,读取数据时,区间两端都包含。id >= 1 and id < =5。spa

相关文章
相关标签/搜索