一、利用阿里的开源数据库同步工具-canal来解析不一样的数据库表binlog日志,解析完成的数据,咱们要入库不一样的数据库,不一样的表。
二、每一个表对应一个Mapper类,插入不一样的表,咱们须要选择不一样的Mapper来执行同一个函数:insertSelective
三、普通的完成方式,咱们须要根据不一样的表名采用“if”条件判断,选择不一样的Mapper,这样的代码是比较冗余的。java
package com.jane.binlog.dao; public interface IProcessor<T> { int insertSelective(T record); int updateSelective(T record); }
package com.jane.binlog.dao; import java.lang.annotation.*; @Target({ ElementType.METHOD, ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface ProcessorMapper { String value() default ""; }
一、注解的值就是表的名字。
二、Mapper实现接口IProcessor,传入操做对象的类型spring
import java.util.Date; @Repository @ProcessorMapper("pos_sale") public interface PosSaleMapper extends IProcessor<PosSale> { int insert(PosSale record); }
package com.jane.binlog.service; import com.alibaba.fastjson.JSONObject; import com.jane.binlog.dao.IProcessor; import com.jane.binlog.dao.ProcessorMapper; import com.jane.binlog.entity.PosSale; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.HashMap; import java.util.Map; @Service public class PosSaleService { private static Logger LOG = LoggerFactory.getLogger(PosSaleService.class); public static Map<String, Class> map = new HashMap<>(); //定义表和对象的关系 static { map.put("pos_sale", PosSale.class); } private Map<String, IProcessor> mapperMap = new HashMap<String, IProcessor>(); @Autowired private ApplicationContext applicationContext; //利用spring上下文,注入带有自定义注解的全部mapper到一个hashmap中。 @PostConstruct public void init() { String[] classNames = applicationContext.getBeanNamesForAnnotation(ProcessorMapper.class); for (String name: classNames) { Class<?> type = applicationContext.getType(name); boolean posSaleMapper = type.isAnnotationPresent(ProcessorMapper.class); if (posSaleMapper) { String value = type.getAnnotation(ProcessorMapper.class).value(); mapperMap.put(value, (IProcessor) applicationContext.getBean(name)); } } } /** * 数据写入操做 * @param table * @param data * @param op */ public void binlogInsert(String table, Map<String, Object> data, String op) { try { //获取表对应的对象类 Class clazz = map.get(table); //组装对象数据 Object obj = this.assumePosSale(data, clazz); //实现写操做 IProcessor processor = mapperMap.get(table); processor.insertSelective(obj); } catch(Exception e) { throw e; } } /** * 把数据map转化为对应表对象 * @param data * @param clazz * @return */ private Object assumePosSale(Map<String, Object> data, Class clazz) { return JSONObject.parseObject(JSONObject.toJSONString(data), clazz); } }