使用springevent事件驱动模型(观察者模式)结合redis bitmap 运用 实现每日数据统计

观察者模式
当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。好比,当一个对象被修改时,则会自动通知它的依赖对象。观察者模式属于行为型模式。redis

主要解决:一个对象状态改变给其余对象通知的问题,并且要考虑到易用和低耦合,保证高度的协做。算法

什么时候使用:一个对象(目标对象)的状态发生改变,全部的依赖对象(观察者对象)都将获得通知,进行广播通知。数据库

如何解决:使用面向对象技术,能够将这种依赖关系弱化。设计模式

优势: 一、观察者和被观察者是抽象耦合的。 二、创建一套触发机制。缓存

缺点: 一、若是一个被观察者对象有不少的直接和间接的观察者的话,将全部的观察者都通知到会花费不少时间。 二、若是在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能致使系统崩溃。 三、观察者模式没有相应的机制让观察者知道所观察的目标对象是怎么发生变化的,而仅仅只是知道观察目标发生了变化。异步

Spring Boot 之事件(Event)
Spring的事件通知机制是一项颇有用的功能,使用事件机制咱们能够将相互耦合的代码解耦,从而方便功能的修改与添加。本文我来学习并分析一下Spring中事件的原理。ide

举个例子,假设有一个添加评论的方法,在评论添加成功以后须要进行修改redis缓存、给用户添加积分等等操做。固然能够在添加评论的代码后面假设这些操做,可是这样的代码违反了设计模式的多项原则:单一职责原则、迪米特法则、开闭原则。一句话说就是耦合性太大了,好比未来评论添加成功以后还须要有另一个操做,这时候咱们就须要去修改咱们的添加评论代码了。学习

在之前的代码中,我使用观察者模式来解决这个问题。不过Spring中已经存在了一个升级版观察者模式的机制,这就是监听者模式。经过该机制咱们就能够发送接收任意的事件并处理。ui

Spring 官方文档翻译以下 :翻译

ApplicationContext 经过 ApplicationEvent 类和 ApplicationListener 接口进行事件处理。 若是将实现 ApplicationListener 接口的 bean 注入到上下文中,则每次使用 ApplicationContext 发布 ApplicationEvent 时,都会通知该 bean。 本质上,这是标准的观察者设计模式。

Spring的事件(Application Event)其实就是一个观察者设计模式,一个 Bean 处理完成任务后但愿通知其它 Bean 或者说 一个Bean 想观察监听另外一个Bean的行为。

Spring 事件只须要几步:

自定义事件,继承 ApplicationEvent
定义监听器,实现 ApplicationListener 或者经过 @EventListener 注解到方法上
定义发布者,经过 ApplicationEventPublisher
实际代码:
建立event文件夹

并建立event object类和handle类,一个handle类能够对应多个object类。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class EverydayStatisticEventObject {
 
  private Integer id;
 
  private String os;
 
  private String proxy;
 
  private StatisticEventType statisticEventType;
 
}

建立枚举类 处理不一样的事件类型,运用观察者模式

public enum StatisticEventType {
   
  //注册数统计
  REGISTER_COUNTER,
  //活跃数统计
  ACTIVE_COUNTER,
  //裂变数统计
  FISSION_COUNTER,
  //播放数统计
  PLAYED_COUNTER,
  //广告点击数统计
  ADCLICK_COUNTER;
 
  private StatisticEventType() {
  }
}

在事务service类中注入

@Autowired
private ApplicationEventPublisher publisher;
处理完相应的业务逻辑后,调取publish操做,将事务发布出去

其一

public LoginLog increaseLoginLog(String ip, int uid, String username) {
    User user = mixinsService.getUser(uid);
    LoginLog loginLog = new LoginLog();
    loginLog.setLoginIp(ip);
    loginLog.setLoginTime(new Date());
    loginLog.setUid(uid);
    loginLog.setUsername(username);
    loginLog.setProxy(user.getProxy());
    loginLog.setChannel(user.getChannel());
    loginLog.setUserType(user.getUserType());
    loginLog.setOs(user.getOs());
    LoginLog log = loginLogRepository.save(loginLog);
    
    //发布事件
    publisher.publishEvent(new EverydayStatisticEventObject(log.getUid(), log.getOs(), log.getProxy(),StatisticEventType.ACTIVE_COUNTER));
    ChannelDailyDataManager.fireEvent(new UserActiveEvent(user.getChannel()));
    return log;
  }

Google Guava Cache缓存
Google Guava Cache是一种很是优秀本地缓存解决方案,提供了基于容量,时间和引用的缓存回收方式。基于容量的方式内部实现采用LRU算法,基于引用回收很好的利用了Java虚拟机的垃圾回收机制。其中的缓存构造器CacheBuilder采用构建者模式提供了设置好各类参数的缓存对象,缓存核心类LocalCache里面的内部类Segment与jdk1.7及之前的ConcurrentHashMap很是类似,都继承于ReetrantLock,还有六个队列,以实现丰富的本地缓存方案。

Guava Cache与ConcurrentMap的区别
Guava Cache与ConcurrentMap很类似,但也不彻底同样。最基本的区别是ConcurrentMap会一直保存全部添加的元素,直到显式地移除。相对地,Guava Cache为了限制内存占用,一般都设定为自动回收元素。在某些场景下,尽管LoadingCache 不回收元素,它也是颇有用的,由于它会自动加载缓存。

//bitmap的偏移量offset生产,offset越大,占用内存越多,因此以每日第一个id做为minid,做为被减数
//使用guava cache缓存机制获取最小id,设置过时时间为每一天,天天清空一次

private LoadingCache<String, Integer> minId = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.DAYS).build(new CacheLoader<String, Integer>() {
    @Override
    public Integer load(String s) throws Exception {
      Date date = LocalDate.parse(StringUtils.substringAfter(s, "@")).toDate();
      if (ACTIVE_COUNTER.startsWith(s)) {
        LoginLog loginLog = loginLogRepository.getTopByLoginTimeBeforeOrderByIdDesc(date);
        if (loginLog != null) {
          return loginLog.getId();
        }
      } else if (PLAYED_COUNTER.startsWith(s)) {
        ViewHistory viewHistory = viewHistoryRepository.getTopByViewtimeBeforeOrderByIdDesc(date);
        if (viewHistory != null) {
          return viewHistory.getId();
        }
      } else if (ADCLICK_COUNTER.startsWith(s)) {
        AdvClickHistory advClickHistory = advClickHistoryRepository.getTopByCreateTimeBeforeOrderByIdDesc(date);
        if (advClickHistory != null) {
          return advClickHistory.getId();
        }
      }
      return 0;
    }
  });

用Redis bitmap统计活跃用户、留存
对于个int型的数来讲,若用来记录id,则只能记录一个,而若转换为二进制存储,则能够表示32个,空间的利用率提高了32倍.对于海量数据的处理,这样的存储方式会节省不少内存空间.对于未登录的用户,可使用Hash算法,把对应的用户标识哈希为一个数字id.对于一亿个数据来讲,咱们也只须要1000000000/8/1024/1024大约12M空间左右.

而Redis已经为咱们提供了SETBIT的方法,使用起来很是的方便,咱们在item页面能够不停地使用SETBIT命令,设置用户已经访问了该页面,也可使用GETBIT的方法查询某个用户是否访问。最后经过BITCOUNT统计该网页天天的访问数量。

优势: 占用内存更小,查询方便,能够指定查询某个用户,对于非登录的用户,可能不一样的key映射到同一个id,不然须要维护一个非登录用户的映射,有额外的开销。

//使用观察者模式,根据不一样的type来判断不一样的事务

public String progressChanged(EverydayStatisticEventObject registerEventObject) {
    String Type = "";
    StatisticEventType eventType = registerEventObject.getStatisticEventType();
    switch (eventType) {
      case REGISTER_COUNTER:
        Type = REGISTER_COUNTER;
        break;
      case ACTIVE_COUNTER:
        Type = ACTIVE_COUNTER;
        break;
      case FISSION_COUNTER:
        Type = FISSION_COUNTER;
        break;
      case PLAYED_COUNTER:
        Type = PLAYED_COUNTER;
        break;
      case ADCLICK_COUNTER:
        Type = ADCLICK_COUNTER;
        break;
      default:
        break;
    }
    return Type;
  }
//事件监听器
  //异步
  @EventListener
  @Async
  public void registerCountEvent(EverydayStatisticEventObject registerEventObject) {
 
 
    String date = LocalDate.now().toString(STATISTIC_DATE_FMT);
    String type = progressChanged(registerEventObject);
    
    //数据库主键id 减去当天第一个id 这样天天的偏移量都是从一开始能够有效减小偏移量对内存的占用。
    int offset = registerEventObject.getId() + 1 - minId.getUnchecked(StringUtils.join(type, "@", date));
 
    String key = StringUtils.join(STATISTIC_CACHE_KEY_PREFIX, type,
      date, ":", registerEventObject.getOs());
 
 
    setBitmap(offset, key);
 
    String proxyKey = StringUtils.join(STATISTIC_CACHE_KEY_PREFIX, type,
      date, ":", registerEventObject.getProxy(), ":", registerEventObject.getOs());
 
    setBitmap(offset, proxyKey);
 
        
       /* redisTemplate.execute((RedisCallback) connection -> {
            Long count = connection.bitCount(key.getBytes());
            log.info("key={},count = {},offset={}",key,count,offset);
            return true;
        });
        redisTemplate.execute((RedisCallback) connection -> {
            Long count = connection.bitCount(proxyKey.getBytes());
            log.info("proxyKey={},count = {},offset={}",proxyKey,count,offset);
            return true;
        });*/
  }
 
private void setBitmap(int offset, String key) {
 
    byte[] bitKey = key.getBytes();
 
    redisTemplate.execute((RedisCallback) connection -> {
      boolean exists = connection.getBit(bitKey, offset);
      if (!exists) {
        connection.setBit(bitKey, offset, true);
        //设置过时时间 天天的数据统计 只保留2天
        connection.expire(bitKey, 60L * 60 * 24 * 2);  //2 days
        return true;
      }
      return false;
    });
  }
相关文章
相关标签/搜索