Flink维表关联方式

在实际生产中,咱们常常会有这样的需求,须要以原始数据流做为基础,而后关联大量的外部表来补充一些属性。例如,咱们在订单数据中,但愿能获得订单收货人所在省的名称,通常来讲订单中会记录一个省的 ID,那么须要根据 ID 去查询外部的维度表补充省名称属性。mysql

在 Flink 流式计算中,咱们的一些维度属性通常存储在 MySQL/HBase/Redis 中,这些维表数据存在定时更新,须要咱们根据业务进行关联。根据咱们业务对维表数据关联的时效性要求,有如下几种解决方案:sql

  1. 实时查询维表关联数据库

  2. 预加载维表关联json

  3. 热存储关联缓存

  4. 其余网络

 

实时查询维表关联

实时查询维表是指用户在 Flink 算子中直接访问外部数据库,好比用 MySQL 来进行关联,这种方式是同步方式,数据保证是最新的。可是,当咱们的流计算数据过大,会对外部系统带来巨大的访问压力,一旦出现好比链接失败、线程池满等状况,因为咱们是同步调用,因此通常会致使线程阻塞、Task 等待数据返回,影响总体任务的吞吐量。并且这种方案对外部系统的 QPS 要求较高,在大数据实时计算场景下,QPS 远远高于普通的后台系统,峰值高达十万到几十万,总体做业瓶颈转移到外部系统。并发

这种方式的核心是,咱们能够在 Flink 的 Map 算子中创建访问外部系统的链接。下面以订单数据为例,咱们根据下单用户的城市 ID,去关联城市名称,核心代码实现以下:异步

public class Order {
    private Integer cityId;
    private String userName;
    private String items;
    private String cityName;
​
    public Order(Integer cityId, String userName, String items, String cityName) {
        this.cityId = cityId;
        this.userName = userName;
        this.items = items;
        this.cityName = cityName;
    }
​
    public Order() {
    }
​
    public Integer getCityId() {
        return cityId;
    }
​
    public void setCityId(Integer cityId) {
        this.cityId = cityId;
    }
​
    public String getUserName() {
        return userName;
    }
​
    public void setUserName(String userName) {
        this.userName = userName;
    }
​
    public String getItems() {
        return items;
    }
​
    public void setItems(String items) {
        this.items = items;
    }
​
    public String getCityName() {
        return cityName;
    }
​
    public void setCityName(String cityName) {
        this.cityName = cityName;
    }
​
    @Override
    public String toString() {
        return "Order{" +
                "cityId=" + cityId +
                ", userName='" + userName + '\'' +
                ", items='" + items + '\'' +
                ", cityName='" + cityName + '\'' +
                '}';
    }
}

 

public class DimSync extends RichMapFunction<String,Order> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
    private Connection conn = null;
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
    }
    public Order map(String in) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(in);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //根据city_id 查询 city_name
        PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
        pst.setInt(1,cityId);
        ResultSet resultSet = pst.executeQuery();
        String cityName = null;
        while (resultSet.next()){
            cityName = resultSet.getString(1);
        }
        pst.close();
        return new Order(cityId,userName,items,cityName);
    }
    public void close() throws Exception {
        super.close();
        conn.close();
    }
}

 

在上面这段代码中,RichMapFunction 中封装了整个查询维表,而后进行关联这个过程。须要注意的是,通常咱们在查询小数据量的维表状况下才使用这种方式,而且要妥善处理链接外部系统的线程,通常还会用到线程池。最后,为了保证链接及时关闭和释放,必定要在最后的 close 方式释放链接,不然会将 MySQL 的链接数打满致使任务失败。async

预加载维表关联

全量预加载数据是为了解决每条数据流经咱们的数据系统都会对外部系统发起访问,以及对外部系统频繁访问而致使的链接和性能问题。这种思路是,每当咱们的系统启动时,就将维度表数据所有加载到内存中,而后数据在内存中进行关联,不须要直接访问外部数据库。ide

这种方式的优点是咱们只须要一次性地访问外部数据库,大大提升了效率。但问题在于,一旦咱们的维表数据发生更新,那么 Flink 任务是没法感知的,可能会出现维表数据不一致,针对这种状况咱们能够采起定时拉取维表数据。而且这种方式因为是将维表数据缓存在内存中,对计算节点的内存消耗很高,因此不能适用于数量很大的维度表。

咱们仍是用上面的场景,根据下单用户的城市 ID 去关联城市名称,核心代码实现以下:

public class WholeLoad extends RichMapFunction<String,Order> {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
    ScheduledExecutorService executor = null;
    private Map<String,String> cache;
​
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    load();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },5,5, TimeUnit.MINUTES);//每隔 5 分钟拉取一次维表数据
    }
​
    @Override
    public Order map(String value) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(value);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        String cityName = cache.get(cityId);
        return new Order(cityId,userName,items,cityName);
    }
​
    public void load() throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
        PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
        ResultSet rs = statement.executeQuery();
        //全量更新维度数据到内存
        while (rs.next()) {
            String cityId = rs.getString("city_id");
            String cityName = rs.getString("city_name");
            cache.put(cityId, cityName);
        }
        con.close();
    }
}

 

在上面的例子中,咱们使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据。这种方式适用于那些实时场景不是很高,维表数据较小的场景。

优势:实现简单

缺点:仅支持小数据量维表

适用场景:维表小,变动频率低,对变动及时性要求低

方案2:

经过Distributed Cache 分发本地维度文件到task manager后加载到内存关联。

实现方式:

经过env.registerCachedFile注册文件。

实现RichFunction,在open()中经过RuntimeContext获取cache文件。

解析和使用文件数据。

优势:不须要外部数据库

缺点:支持维度数据量比较小,更新须要更改文件并重启做业

适用场景:维度数据是以文件形式,数据量小,更新频率低。好比:静态码表,配置文件。

 

热存储关联

 

 

 

在这里推荐使用 Guava 库提供的 CacheBuilder 来建立咱们的缓存:

CacheBuilder.newBuilder()
        //最多存储10000条
        .maximumSize(10000)
        //过时时间为1分钟
        .expireAfterWrite(60, TimeUnit.SECONDS)
        .build();

总体的实现思路是:咱们利用 Flink 的 RichAsyncFunction 读取 Hbase 的数据到缓存中,咱们在关联维度表时先去查询缓存,若是缓存中不存在这条数据,就利用客户端去查询 Hbase,而后插入到缓存中。

首先咱们须要一个 Hbase 的异步客户端:

<dependency>
    <groupId>org.hbase</groupId>
    <artifactId>asynchbase</artifactId>
    <version>1.8.2</version>
</dependency>

核心的代码实现以下:

public class LRU extends RichAsyncFunction<String,Order> {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
    String table = "info";
    Cache<String, String> cache = null;
    private HBaseClient client = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //建立hbase客户端
        client = new HBaseClient("127.0.0.1","7071");
        cache = CacheBuilder.newBuilder()
                //最多存储10000条
                .maximumSize(10000)
                //过时时间为1分钟
                .expireAfterWrite(60, TimeUnit.SECONDS)
                .build();
    }
​
    @Override
    public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
​
        JSONObject jsonObject = JSONObject.parseObject(input);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //读缓存
        String cacheCityName = cache.getIfPresent(cityId);
​
        if(cacheCityName != null){
            Order order = new Order();
            order.setCityId(cityId);
            order.setItems(items);
            order.setUserName(userName);
            order.setCityName(cacheCityName);
            resultFuture.complete(Collections.singleton(order));
        }else {
            //若是缓存获取失败再从hbase获取维度数据
            client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                for (KeyValue kv : arg) {
                    String value = new String(kv.value());
                    Order order = new Order();
                    order.setCityId(cityId);
                    order.setItems(items);
                    order.setUserName(userName);
                    order.setCityName(value);
                    resultFuture.complete(Collections.singleton(order));
                    cache.put(String.valueOf(cityId), value);
                }
                return null;
            });
​
        }
    }
​
}

 

这里须要特别注意的是,咱们用到了异步 IO (RichAsyncFunction),这个功能的出现就是为了解决与外部系统交互时网络延迟成为系统瓶颈的问题。

咱们在流计算环境中,在查询外部维表时,假如访问是同步进行的,那么总体能力势必受限于外部系统。正是由于异步 IO 的出现使得访问外部系统能够并发的进行,而且不须要同步等待返回,大大减轻了由于网络等待时间等引发的系统吞吐和延迟问题。

咱们在使用异步 IO 时,必定要使用异步客户端,若是没有异步客户端咱们能够本身建立线程池模拟异步请求。

优势:维度数据不受限于内存,支持较多维度数据

缺点:须要热存储资源,维度更新反馈到结果有延迟(热存储导入,cache)

适用场景:维度数据量大,可接受维度更新有必定的延迟。

其余

除了上述常见的处理方式,咱们还能够经过将维表消息广播出去,或者自定义异步线程池访问维表,甚至还能够本身扩展 Flink SQL 中关联维表的方式直接使用 SQL Join 方法关联查询结果。

整体来说,关联维表的方式就以上几种方式,而且基于这几种方式还会衍生出各类各样的解决方案。咱们在评价一个方案的优劣时,应该从业务自己出发,不一样的业务场景下使用不一样的方式。

相关文章
相关标签/搜索