文章很长,并且持续更新,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈(总入口) 奉上如下珍贵的学习资源:html
入大厂 、作架构、大力提高Java 内功 必备的精彩博文 | 2021 秋招涨薪1W + 必备的精彩博文 |
---|---|
1:Redis 分布式锁 (图解-秒懂-史上最全) | 2:Zookeeper 分布式锁 (图解-秒懂-史上最全) |
3: Redis与MySQL双写一致性如何保证? (面试必备) | 4: 面试必备:秒杀超卖 解决方案 (史上最全) |
5:面试必备之:Reactor模式 | 6: 10分钟看懂, Java NIO 底层原理 |
7:TCP/IP(图解+秒懂+史上最全) | 8:Feign原理 (图解) |
9:DNS图解(秒懂 + 史上最全 + 高薪必备) | 10:CDN图解(秒懂 + 史上最全 + 高薪必备) |
10: 分布式事务( 图解 + 史上最全 + 吐血推荐 ) |
Java 面试题 30个专题 , 史上最全 , 面试必刷 | 阿里、京东、美团... 随意挑、横着走!!! |
---|---|
1: JVM面试题(史上最强、持续更新、吐血推荐) | 2:Java基础面试题(史上最全、持续更新、吐血推荐 |
3:架构设计面试题 (史上最全、持续更新、吐血推荐) | 4:设计模式面试题 (史上最全、持续更新、吐血推荐) |
1七、分布式事务面试题 (史上最全、持续更新、吐血推荐) | 一致性协议 (史上最全) |
2九、多线程面试题(史上最全) | 30、HR面经,过五关斩六将后,当心阴沟翻船! |
9.网络协议面试题(史上最全、持续更新、吐血推荐) | 更多专题, 请参见【 疯狂创客圈 高并发 总目录 】 |
SpringCloud 精彩博文 | |
---|---|
nacos 实战(史上最全) | sentinel (史上最全+入门教程) |
SpringCloud gateway (史上最全) | 更多专题, 请参见【 疯狂创客圈 高并发 总目录 】 |
美团面试题:Redis与MySQL双写一致性如何保证?java
这道题其实就是在问缓存和数据库在双写场景下,一致性是如何保证的?mysql
本文将很是全面的,跟你们一块儿来探讨如何回答这个问题。nginx
本文的行文次序,首先介绍集中式缓存的缓存模式和数据一致性,而后介绍 二级缓存的架构和数据一致性,最后介绍 三级缓存的架构和数据一致性git
不吹牛,本文在全网数据一致性的全部博文中,绝对算是史上最全的。github
本文最为全面的介绍了 redis 与 db 双写数据一致性解决方案,web
固然, 会参考了最新的一些文章, 可是解决那些 复制来复制去的bug,面试
另外,本文增长了 L2 、L3 多级缓存的一致性问题redis
本文很是经典,绝对的高分面试必备, 建议边学习、边思考,而且必定要实战算法
- 若是有问题,欢迎来疯狂创客圈找尼恩和18罗汉门一块儿交流
- 本文后续也会不断升级迭代,持续保持史上最全位置。
一致性就是数据保持一致,在分布式系统中,能够理解为多个节点中数据的值是一致的。
缓存能够提高性能、缓解数据库压力,可是使用缓存也会致使数据不一致性的问题。通常咱们是如何使用缓存呢?有三种经典的缓存模式:
Cache-Aside Pattern,即旁路缓存模式,它的提出是为了尽量地解决缓存与数据库的数据不一致问题。
Cache-Aside Pattern的读请求流程以下:
读的时候,先读缓存,缓存命中的话,直接返回数据;
缓存没有命中的话,就去读数据库,从数据库取出数据,放入缓存后,同时返回响应。
Cache-Aside Pattern的写请求流程以下:
更新的时候,先更新数据库,而后再删除缓存。
Read/Write Through模式中,服务端把缓存做为主要数据存储。应用程序跟数据库缓存交互,都是经过抽象缓存层完成的。
Read-Through的简要读流程以下
从缓存读取数据,读到直接返回
若是读取不到的话,从数据库加载,写入缓存后,再返回响应。
这个简要流程是否是跟Cache-Aside很像呢?
其实Read-Through就是多了一层Cache-Provider,流程以下:
Read-Through的优势
Read-Through实际只是在Cache-Aside之上进行了一层封装,它会让程序代码变得更简洁,同时也减小数据源上的负载。
Write-Through模式下,当发生写请求时,也是由缓存抽象层完成数据源和缓存数据的更新,流程以下:
Write behind跟Read-Through/Write-Through有类似的地方,都是由Cache Provider来负责缓存和数据库的读写。它两又有个很大的不一样:Read/Write Through是同步更新缓存和数据的,Write Behind则是只更新缓存,不直接更新数据库,经过批量异步的方式来更新数据库。
这种方式下,缓存和数据库的一致性不强,对一致性要求高的系统要谨慎使用。
可是它适合频繁写的场景,MySQL的InnoDB Buffer Pool机制就使用到这种模式。
Cache Aside 更新模式实现起来比较简单,可是须要维护两个数据存储:
Read/Write Through 的写模式须要维护一个数据存储(缓存),实现起来要复杂一些。
Write Behind Caching 更新模式和Read/Write Through 更新模式相似,区别是Write Behind Caching 更新模式的数据持久化操做是异步的,可是Read/Write Through 更新模式的数据持久化操做是同步的。
Write Behind Caching 的优势是直接操做内存速度快,屡次操做能够合并持久化到数据库。缺点是数据可能会丢失,例如系统断电等。
有些小伙伴可能会问, Cache-Aside在写入请求的时候,为何是删除缓存而不是更新缓存呢?
咱们在操做缓存的时候,到底应该删除缓存仍是更新缓存呢?咱们先来看个例子:
操做的次序以下:
线程A先发起一个写操做,第一步先更新数据库
线程B再发起一个写操做,第二步更新了数据库
如今,因为网络等缘由,线程B先更新了缓存, 线程A更新缓存。
这时候,缓存保存的是A的数据(老数据),数据库保存的是B的数据(新数据),数据不一致了,脏数据出现啦。若是是删除缓存取代更新缓存则不会出现这个脏数据问题。
更新缓存相对于删除缓存,还有两点劣势:
1 若是你写入的缓存值,是通过复杂计算才获得的话。 更新缓存频率高的话,就浪费性能啦。
2 在写多读少的状况下,数据不少时候还没被读取到,又被更新了,这也浪费了性能呢(实际上,写多的场景,用缓存也不是很划算了)
任何的措施,也不是绝对的好, 只有分场景看是否是适合,更新缓存的措施,也是有用的:
在读多写少的场景,价值大。
美团二面:Redis与MySQL双写一致性如何保证?
Cache-Aside缓存模式中,有些小伙伴仍是有疑问,在写入请求的时候,为何是先操做数据库呢?为何不先操做缓存呢?
假设有A、B两个请求,请求A作更新操做,请求B作查询读取操做。
A、B两个请求的操做流程以下:
酱紫就有问题啦,缓存和数据库的数据不一致了。
缓存保存的是老数据,数据库保存的是新数据。所以,Cache-Aside缓存模式,选择了先操做数据库而不是先操做缓存。
重要:缓存是经过牺牲强一致性来提升性能的。
这是由CAP理论决定的。缓存系统适用的场景就是非强一致性的场景,它属于CAP中的AP。
CAP理论,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。
CAP理论做为分布式系统的基础理论,它描述的是一个分布式系统在如下三个特性中:
最多知足其中的两个特性。也就是下图所描述的。分布式系统要么知足CA,要么CP,要么AP。没法同时知足CAP。
I. 什么是 一致性、可用性和分区容错性
分区容错性:指的分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供知足一致性和可用性的服务。也就是说部分故障不影响总体使用。
事实上咱们在设计分布式系统是都会考虑到bug,硬件,网络等各类缘由形成的故障,因此即便部分节点或者网络出现故障,咱们要求整个系统仍是要继续使用的
(不继续使用,至关于只有一个分区,那么也就没有后续的一致性和可用性了)
可用性: 一直能够正常的作读写操做。简单而言就是客户端一直能够正常访问并获得系统的正常响应。用户角度来看就是不会出现系统操做失败或者访问超时等问题。
一致性:在分布式系统完成某写操做后任何读操做,都应该获取到该写操做写入的那个最新的值。至关于要求分布式系统中的各节点时时刻刻保持数据的一致性。
因此使用缓存提高性能,就是会有数据更新的延迟。这须要咱们在设计时结合业务仔细思考是否适合用缓存。而后缓存必定要设置过时时间,这个时间过短、或者太长都很差:
可是,经过一些方案优化处理,是能够保证弱一致性,最终一致性的。
3种方案保证数据库与缓存的一致性
- 延时双删策略
- 删除缓存重试机制
- 读取biglog异步删除缓存
有些小伙伴可能会说,不必定要先操做数据库呀,采用缓存延时双删策略就好啦?
什么是延时双删呢?
1 先删除缓存
2 再更新数据库
3 休眠一会(好比1秒),再次删除缓存。
参考代码以下:
这个休眠一会,通常多久呢?都是1秒?
这个休眠时间 = 读业务逻辑数据的耗时 + 几百毫秒。
为了确保读请求结束,写请求能够删除读请求可能带来的缓存脏数据。
无论是延时双删仍是Cache-Aside的先操做数据库再删除缓存,若是第二步的删除缓存失败呢?
删除失败会致使脏数据哦~
删除失败就多删除几回呀,保证删除缓存成功呀~ 因此能够引入删除缓存重试机制
写请求更新数据库
缓存由于某些缘由,删除失败
把删除失败的key放到消息队列
消费消息队列的消息,获取要删除的key
重试删除缓存操做
重试删除缓存机制还能够,就是会形成好多业务代码入侵。
其实,还能够经过数据库的binlog来异步淘汰key。
以mysql为例 可使用阿里的canal将binlog日志采集发送到MQ队列里面,而后编写一个简单的缓存删除消息者订阅binlog日志,根据更新log删除缓存,而且经过ACK机制确认处理这条更新log,保证数据缓存一致性
PushConsumer为了保证消息确定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会从新投递。首先,消费的时候,咱们须要注入一个消费回调,具体sample代码以下:
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); delcache(key);//执行真正删除 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消费成功 } });
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ才会认为这批消息(默认是1条)是消费完成的。
若是这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息须要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。
为了保证消息是确定被至少消费成功一次,RocketMQ会把这批消费失败的消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而若是一直这样重复消费都持续失败到必定次数(默认16次),就会投递到DLQ死信队列。应用能够监控死信队列来作人工干预。
Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。Pub/Sub是目前普遍使用的通讯模型,它采用事件做为基本的通讯机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为类似。
Redis经过publish和subscribe命令实现订阅和发布的功能。订阅者能够经过subscribe向redis server订阅本身感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者经过publish命令向redis server发送特定类型的信息时,订阅该消息类型的所有订阅者都会收到此消息。
可是呢还有个问题, 「若是是主从数据库呢」?
由于主从DB同步存在延时时间。若是删除缓存以后,数据同步到备库以前已经有请求过来时, 「会从备库中读到脏数据」,如何解决呢?解决方案以下流程图:
综上所述,在分布式系统中,缓存和数据库同时存在时,若是有写操做的时候,「先操做数据库,再操做缓存」。以下:
1.读取缓存中是否有相关数据
2.若是缓存中有相关数据value,则返回
3.若是缓存中没有相关数据,则从数据库读取相关数据放入缓存中key->value,再返回
4.若是有更新数据,则先更新数据库,再删除缓存
5.为了保证第四步删除缓存成功,使用binlog异步删除
6.若是是主从数据库,binglog取自于从库
7.若是是一主多从,每一个从库都要采集binlog,而后消费端收到最后一台binlog数据才删除缓存,或者为了简单,收到一次更新log,删除一次缓存
在不少业务状况下,咱们都会在系统中加入redis缓存作查询优化, 使用es 作全文检索。
若是数据库数据发生更新,这时候就须要在业务代码中写一段同步更新redis的代码。这种数据同步的代码跟业务代码糅合在一块儿会不太优雅,能不能把这些数据同步的代码抽出来造成一个独立的模块呢,答案是能够的。
若是你还对SpringBoot
、canal
、RocketMQ
、MySQL
、ElasticSearch
不是很了解的话,这里我为你们整理个它们的官网网站,以下
这里主要介绍一下canal,其余的自行学习。
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费.。
canal是一个假装成slave订阅mysql的binlog,实现数据同步的中间件。
说明:
instance模块:
到这里咱们对canal
有了一个初步的认识,接下咱们就进入实战环节。
对于自建 MySQL
, 须要先开启 Binlog
写入功能,配置binlog-format
为ROW
模式,my.cnf 中配置以下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 重复
**
注意:**针对阿里云 RDS for MySQL
, 默认打开了 binlog , 而且帐号默认具备 binlog dump 权限 , 不须要任何权限或者 binlog 设置,能够直接跳过这一步
受权canal
链接 MySQL 帐号具备做为 MySQL slave
的权限, 若是已有帐户可直接 使用grant 命令受权。
#建立用户名和密码都为canal CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
canal提供web ui 进行Server管理、Instance管理。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
咱们先配置canal.admin以后。经过web ui来配置 cancal server,这样使用界面操做很是的方便。
vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
初始化元数据库
mysql -h127.0.0.1 -uroot -p # 导入初始化SQL > source conf/canal_manager.sql
canal_manager.sql
sh bin/startup.sh
使用用户名:admin 密码为:123456 登陆
登陆成功,会自动跳转到以下界面。这时候咱们的canal.admin就搭建成功了。
下载 canal.deployer, 访问 release 页面 , 选择须要的包下载, 如以 1.1.4版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解压完成能够看到以下结构:
进入conf 目录。能够看到以下的配置文件。
咱们先对canal.properties
不作任何修改。
使用canal_local.properties
的配置覆盖canal.properties
# register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =
使用以下命令启动canal server
sh bin/startup.sh local
启动成功。同时咱们在canal.admin web ui中刷新 server 管理,能够到canal server 已经启动成功。
这时候咱们的canal.server 搭建已经成功。
选择Instance 管理-> 新建Instance
填写 Instance名称:cms_article
#mysql serverId canal.instance.mysql.slaveId = 1234 #position info,须要改为本身的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,须要改为本身的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal #改为本身的数据库信息(须要监听的数据库) canal.instance.defaultDatabaseName = cms-manage canal.instance.connectionCharset = UTF-8 #table regex 须要过滤的表 这里数据库的中全部表 canal.instance.filter.regex = .\*\\..\* # MQ 配置 日志数据会发送到cms_article这个topic上 canal.mq.topic=cms_article # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* #单分区处理消息 canal.mq.partition=0
咱们这里为了演示之建立一张表。
配置好以后,我须要点击保存。此时在Instances 管理中就能够看到此时的实例信息。
canal 1.1.1版本以后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
本案例以RocketMQ
为例
咱们仍然使用web ui 界面操做。点击 server 管理 - > 点击配置
修改配置文件
# ... # 可选项: tcp(默认), kafka, RocketMQ canal.serverMode = RocketMQ # ... # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 192.168.0.200:9078 canal.mq.retries = 0 # flagMessage模式下能够调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 因为kafka最大消息体限制请勿超过1M(900K如下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false
修改好以后保存。会自动重启。
此时咱们就能够在rocketmq的控制台看到一个cms_article topic已经自动建立了。
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <!-- 根据我的须要依赖 --> <dependency> <groupId>javax.persistence</groupId> <artifactId>persistence-api</artifactId> </dependency>
package com.crazymaker.springcloud.stock.consumer; import com.alibaba.otter.canal.protocol.FlatMessage; import com.crazymaker.springcloud.common.exception.BusinessException; import com.crazymaker.springcloud.common.util.JsonUtil; import com.crazymaker.springcloud.standard.redis.RedisRepository; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.ReflectionUtils; import javax.annotation.Resource; import javax.persistence.Id; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; /** * 抽象CanalMQ通用处理服务 **/ @Slf4j public abstract class AbstractCanalMQ2RedisService<T> implements CanalSynService<T> { @Resource private RedisTemplate<String, Object> redisTemplate; @Resource RedisRepository redisRepository; private Class<T> classCache; /** * 获取Model名称 * * @return Model名称 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if (flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set<T> data = getData(flatMessage); if (SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if (SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if (SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL须要同步,删库清空,更新字段处理 } @Override public void insert(Collection<T> list) { insertOrUpdate(list); } @Override public void update(Collection<T> list) { insertOrUpdate(list); } private void insertOrUpdate(Collection<T> list) { redisTemplate.executePipelined((RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection<T> list) { Set<String> keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set<String> keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisRepository.delAll(keys); } /** * 封装redis的key * * @param t 原对象 * @return key */ protected String getWrapRedisKey(T t) { // return new StringBuilder() // .append(ApplicationContextHolder.getApplicationName()) // .append(":") // .append(getModelName()) // .append(":") // .append(getIdValue(t)) // .toString(); throw new IllegalStateException( "基类 方法 'getWrapRedisKey' 还没有实现!"); } /** * 获取类泛型 * * @return 泛型Class */ protected Class<T> getTypeArguement() { if (classCache == null) { classCache = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return classCache; } /** * 获取Object标有@Id注解的字段值 * * @param t 对象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 获取Class标有@Id注解的字段名称 * * @return id字段名称 */ protected Field getIdField() { Class<T> clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO类未设置@Id注解"); throw new BusinessException("PO类未设置@Id注解"); } /** * 转换Canal的FlatMessage中data成泛型对象 * * @param flatMessage Canal发送MQ信息 * @return 泛型对象集合 */ protected Set<T> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { T t = JsonUtil.mapToPojo(map, getTypeArguement()); targetData.add(t); } return targetData; } }
rocketMQ
是支持广播消费的,只须要在消费端进行配置便可,默认状况下使用的是集群消费,这就意味着若是咱们配置了多个消费者实例,只会有一个实例消费消息。
对于更新Redis来讲,一个实例消费消息,完成redis的更新,这就够了。
package com.crazymaker.springcloud.stock.consumer; import com.alibaba.otter.canal.protocol.FlatMessage; import com.crazymaker.springcloud.seckill.dao.po.SeckillGoodPO; import com.google.common.collect.Sets; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Set; @Slf4j @Service //广播模式 //@RocketMQMessageListener(topic = "seckillgood", consumerGroup = "UpdateRedis", messageModel = MessageModel.BROADCASTING) //集群模式 @RocketMQMessageListener(topic = "seckillgood", consumerGroup = "UpdateRedis") @Data public class UpdateRedisGoodConsumer extends AbstractCanalMQ2RedisService<SeckillGoodPO> implements RocketMQListener<FlatMessage> { private String modelName = "seckillgood"; @Override public void onMessage(FlatMessage s) { process(s); } // @Cacheable(cacheNames = {"seckill"}, key = "'seckillgood:' + #goodId") /** * 封装redis的key * * @param t 原对象 * @return key */ protected String getWrapRedisKey(SeckillGoodPO t) { return new StringBuilder() // .append(ApplicationContextHolder.getApplicationName()) .append("seckill") .append(":") // .append(getModelName()) .append("seckillgood") .append(":") .append(t.getId()) .toString(); } /** * 转换Canal的FlatMessage中data成泛型对象 * * @param flatMessage Canal发送MQ信息 * @return 泛型对象集合 */ protected Set<SeckillGoodPO> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<SeckillGoodPO> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { SeckillGoodPO po = new SeckillGoodPO(); po.setId(Long.valueOf(map.get("id"))); //省略其余的属性 targetData.add(po); } return targetData; } }
根据须要能够重写里面的方法,DDL
处理暂时还没完成,只是整个Demo,完整的实战活儿,仍是留给你们本身干吧。
尼恩的忠实建议:
理论水平的提高,看看视频、看看书,只有两个字,就是须要:多看。
实战水平的提高,只有两个字,就是须要:多干。
基于binlog同步的缓存的数据一致性实战,很是重要,建议你们必定要干一票。
尼恩的忠实建议:
理论水平的提高,看看视频、看看书,只有两个字,就是须要:多看。
实战水平的提高,只有两个字,就是须要:多干。
基于binlog同步的缓存的数据一致性实战的具体材料、源码、问题,欢迎来 疯狂创客圈社群交流。
高并发Java发烧友社群 - 疯狂创客圈 总入口 点击了解详情:
美团面试题:Redis与MySQL双写一致性如何保证?
若是回答完了上面的内容,可以获得 100分的话,加上下面的回答内容,你就能够获得120分,让面试官有惊奇、惊喜的感受了。
注意:让面试官有惊奇、惊喜的感受以后,基本面试就很容易经过。
了解到了咱们为何要使用缓存,以及缓存能解决咱们什么样的问题。可是使用缓存时也须要注意一些问题:
若是只是单纯的整合Redis缓存,那么可能出现以下的问题
为了解决以上可能出现的问题,让缓存层更稳定,健壮,咱们使用二级缓存架构
1级为本地缓存,或者进程内的缓存(如 Ehcache) —— 速度快,进程内可用
2级为集中式缓存(如 Redis)—— 可同时为多节点提供服务
为何要引入本地缓存
相对于IO操做 速度快,效率高 相对于Redis Redis是一种优秀的分布式缓存实现,受限于网卡等缘由,远水救不了近火
因此:
DB + Redis + LocalCache = 高效存储,高效访问
本地缓存通常适合于缓存只读、量少、高频率访问的数据。如秒杀商品数据。
或者每一个部署节点独立的数据,如长链接服务中,每一个部署节点因为都是维护了不一样的链接,每一个链接的数据都是独立的,而且随着链接的断开而删除。若是数据在集群的不一样部署节点须要共享和保持一致,则须要使用分布式缓存来统一存储,实现应用集群的全部应用进程都在该统一的分布式缓存中进行数据存取便可。
本地缓存位于同一个JVM的堆中,相对于分布式缓存的好处是,故性能更好,减小了跨网络传输,
可是本地缓存因为占用 JVM 内存空间 (或者进程的内存空间),故不能进行大数据量的数据存储。
本地缓存只支持被该应用进程访问,通常没法被其余应用进程访问,若是对应的数据库数据,存在数据更新,则须要同步更新不一样节点的本地缓存副本,来保证数据一致性
本地缓存的更新,复杂度较高而且容易出错,如基于 Redis 的发布订阅机制、或者消息队列MQ来同步更新各个部署节点。
数据库 | 本地缓存 | 分布式缓存 | |
---|---|---|---|
存储位置 | 存盘,数据不丢失 | 不存盘,以前的数据丢失 | 不存盘,数据丢失 |
持久化 | 能够 | 不能够 | 不能够 |
访问速度 | 慢 | 最快 | 快 |
可扩展 | 可存在其余机器的硬盘 | 只能存在本机内存 | 可存在其余机器的内存 |
使用场景 | 须要实现持久化保存 | 须要快速访问,但须要考虑内存大小 | 1)须要快速访问,不须要考虑内存大小 2)须要实现持久化,但会丢失一些数据 3)须要让缓存集中在一块儿,访问任一机器上内存中的数据均可以从缓存中获得 |
单独使用本地缓存与集中式缓存,都会有各自的短板。
有这么一个网站,某个页面天天的访问量是 1000万,每一个页面从缓存读取的数据是 50K。缓存数据存放在一个 Redis 服务,机器使用千兆网卡。那么这个 Redis 一天要承受 500G 的数据流,至关于平均每秒钟是 5.78M 的数据。而网站通常都会有高峰期和低峰期,两个时间流量的差别多是百倍以上。咱们假设高峰期每秒要承受的流量比平均值高 50 倍,也就是说高峰期 Redis 服务每秒要传输超过 250 兆的数据。请注意这个 250 兆的单位是 byte,而千兆网卡的单位是“bit” ,你懂了吗? 这已经远远超过 Redis 服务的网卡带宽。
因此若是你能发现这样的问题,通常你会这么作:
若是你采用第2种方法来解决上述的场景中碰到的问题,那么你最好准备 5 个 Redis 服务来支撑。
在缓存服务这块成本直接攀升了 5 倍。你有钱固然没任何问题,可是结构就变得很是复杂了,并且可能你缓存的数据量其实不大,1000 万高频次的缓存读写 Redis 也能轻松应付,但是由于带宽的问题,你不得不付出 5 倍的成本。
按照80/20原则,若是咱们把20%的热点数据,放在本地缓存,若是咱们不用每次页面访问的时候都去 Redis 读取数据,那么 Redis 上的数据流量至少下降 80%的带宽流量,甚至于一个很小的 Redis 集群能够轻松应付。
做为须要超高并发的访问数据,属于 20% 的热点数据
这属于提早预测静态热点数据类型。
具体参参见疯狂创客圈的 亿级 IM中台实战
这属于提早预测静态热点数据类型。
还有的是提早不能识别出来的,如电商系统中的热点商品那就完美了。
经过流计算识别出来的热点数据,可以动态地实时发现热点。
这属于实时预测动态热点数据类型。因为数据量大,能够经过流计算框架 storm 或者 fink 实现,
不够,此项工做,通常属于大数据团队的工做。
第一级缓存使用内存(同时支持 Ehcache 2.x、Ehcache 3.x 、Guava、 Caffeine),第二级缓存使用 Redis(推荐)/Memcached
本地缓存与集中式缓存的结合架构,大体的架构图,以下:
经过消息队列,或者其余广播模式的发布订阅,保持各个一级缓存的数据一致性。
这一点,与Cache-Aside模式不一样,Cache-Aside只是删除缓存便可。可是热点数据,若是删除,很容易致使缓存击穿。
对于秒杀这样的场景,瞬间有十几万甚至上百万的请求要同时读取商品。若是没有缓存,每个请求连带的数据操做都须要应用与数据库生成connection,而数据库的最大链接数是有限的,一旦超过数据库会直接宕机。这就是缓存击穿。
缓存击穿与 缓存穿透的简单区别:
缓存击穿是指数据库中有数据,可是缓存中没有,大量的请求打到数据库;
缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户极可能是攻击者,攻击会致使数据库压力过大。
方案1:biglog同步保障数据一致性
方案2:使用程序方式发送更新消息,保障数据一致性
方案1,能够经过biglog同步,来保障二级缓存的数据一致性,具体的架构以下
rocketMQ
是支持广播消费的,只须要在消费端进行配置便可,rocketMQ
默认状况下使用的是集群消费,这就意味着若是咱们配置了多个消费者实例,只会有一个实例消费消息。
对于更新Redis来讲,一个实例消费消息,完成redis的更新,这就够了。
对于更新Guava或者其余1级缓存来讲,一个实例消费消息,是不够的,须要每个实例都消息,因此,必须设置 rocketMQ 客户端的消费模式,为 广播模式;
@RocketMQMessageListener(topic = "seckillgood", consumerGroup = "UpdateGuava", messageModel = MessageModel.BROADCASTING)
使用程序方式保障数据一致性的架构,能够编写一个通用的2级缓存通用组件,当数据更新的时候,去发送消息,具体的架构以下:
方案2和方案1 的总体区别不大,只不过 方案2 须要本身写代码(或者中间组件)发送数据的变化通知。
方案1 的一个优点:能够和 创建索引等其余的消费者,共用binlog的消息队列。
其余的区别,你们能够自行探索。
对于高并发的请求,接入层Nginx有着巨大的做用,能反向代理,负载均衡,动静分离以及和Lua整合,能够实现请求定向分发等很是有用的功能,同理Nginx层能够实现缓存的功能
能够利用接入层Nginx的进程内缓存,缓存极热数据的高并发访问,在接入层,当请求过来时,判断本地缓存中是否存在,若是存在着直接返回请求结果(或者展示静态资源的数据),这样的请求不会直接发送到后端服务层
为了解决以上可能出现的问题,让缓存层更稳定,健壮,咱们引入三级缓存架构
三级缓存架构 图: 具体以下图所示
原文: lua_shared_dict
syntax:lua_shared_dict <name> <size> default: no context: http phase: depends on usage
声明一个共享内存区域 name,以充当基于 Lua 字典 ngx.shared.<name>
的共享存储。
lua_shared_dict 指令定义的共享内存老是被当前 Nginx 服务器实例中全部的 Nginx worker 进程所共享。
size 参数接受大小单位,如 k,m:
http { #指定缓存信息 lua_shared_dict seckill_cache 128m; ... }
详细参见: ngx.shared.DICT
而后在lua脚本中使用:
local shared_memory = ngx.shared.seckill_cache
便可以取到放在共享内存中的数据。对共享内存的操做也是如set ,get 之类。
--优先从缓存获取,不然访问上游接口 local seckill_cache = ngx.shared.seckill_cache local goodIdCacheKey = "goodId_" .. goodId local goodCache = seckill_cache:get(goodIdCacheKey) if goodCache == "" or goodCache == nil then ngx.log(ngx.DEBUG,"cache not hited " .. goodId) --回源上游接口,好比Java 后端rest接口 local res = ngx.location.capture("/stock-provider/api/seckill/good/detail/v1", { method = ngx.HTTP_POST, -- args = requestBody , -- 重要:将请求参数,原样向上游传递 always_forward_body = false, -- 也能够设置为false 仅转发put和post请求方式中的body. }) --返回上游接口的响应体 body goodCache = res.body; --单位为s seckill_cache:set(goodIdCacheKey, goodCache, 10 * 60 * 60) end ngx.say(goodCache);
ngx.shared.DICT的实现是采用红黑树实现,当申请的缓存被占用完后若是有新数据须要存储则采用 LRU 算法淘汰掉“多余”数据。
LRU原理
LRU的设计原理就是,当数据在最近一段时间常常被访问,那么它在之后也会常常被访问。这就意味着,若是常常访问的数据,咱们须要然其可以快速命中,而不常访问的数据,咱们在容量超出限制内,要将其淘汰。
L3与L2同样,都是本地缓存,优势和缺点以下:
本地缓存只支持被该应用进程访问,通常没法被其余应用进程访问,若是对应的数据库数据,存在数据更新,则须要同步更新不一样节点的本地缓存副本,来保证数据一致性
本地缓存的更新,复杂度较高而且容易出错,如基于 Redis 的发布订阅机制、或者消息队列MQ来同步更新各个部署节点。
L3级缓存主要用于极热数据,如秒杀的商品数据(对于秒杀这样的场景,瞬间有十几万甚至上百万的请求要同时读取商品。若是没有命中本地缓存,可能致使缓存击穿。
缓存击穿与 缓存穿透的简单区别:
- 缓存击穿是指数据库中有数据,可是缓存中没有,大量的请求打到数据库;
- 缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户极可能是攻击者,攻击会致使数据库压力过大。
为了防止缓存击穿,同时也保持数据一致性,具体的方案为:
L3级缓存的数据一致性保障以及防止缓存击穿方案:
1.数据预热(或者叫预加载)
2.设置热点数据永远不过时,经过 ngx.shared.DICT的缓存的LRU机制去淘汰
3.若是缓存主动更新,在快过时以前更新,若有变化,经过订阅变化的机制,主动本地刷新
4.提供兜底方案,若是本地缓存没有,则经过后端服务获取数据,而后缓存起来
L3级缓存的数据一致性实战,至关重要,建议必定要动手实战一票。
尼恩的忠实建议:
理论水平的提高,看看视频、看看书,只有两个字,就是须要:多看。
实战水平的提高,只有两个字,就是须要:多干。
L3级缓存的数据一致性实战的具体材料、源码、问题,欢迎来 疯狂创客圈社群交流。
高并发Java发烧友社群 - 疯狂创客圈 总入口 点击了解详情: