工控智汇

工控智汇

spring boot 并发消费消息,如何保证入库的数据是最新的

admin 73 162

springboot/cloud(十九)并发消费消息,如何保证入库的数据是最新的?

消息中间件在解决异步处理,模块间解耦和,和高流量场景的削峰,等情况下有着很广泛的应用.

本文将跟大家一起讨论以下其中的异常场景,如题.

场景

在实际工作中,大家可能也都遇到过这样的需求:

如:系统A中的某些重要的数据,想在每次数据变更的时候,将当前最新的数据备份下来,当然,这个备份的动作不能影响当前数据变更的进程.

也更不期望因为备份的操作,影响当前进程的性能.

分析

这是一个比较常见的,可以异步处理的需求,业务数据变更和数据备份之间并没有强一致性的要求,大致的架构如下:

producer作为消息产生者,会通过指定的交换机(exchange)和路由键(routingkey),将消息传输到指定的队列(queue)中,通常producer也会有多个节点

consume作为消息的消费者,会依次从队列(queue)中拿到消息,进行业务处理,最终将数据写入数据库中,并且为了更快速的消费消息,consume通常会部署多个节点,并且每个节点中也会有多个线程同时消费消息

queue作为消息队列,保证了消息被消费的时序性,以及唯一性(一条消息只能被消费一次)

dlxQueue作为死信队列,当queue中的的消息无法被正常消费时,当重处理N次后,将会被放入死信队列,并有专门的consume来消费和处理,比如:通知相关人员进行人工干预,等.

问题

producer会源源不断的产生消息,有新的数据,也有更新老的数据,

而consume则是拿到消息,做insert或者update的操作.

但是由于consume是多线程并发消费消息的,那么就会出现当前线程拿到的消息并非最新版本的消息,如果这个时候进行了update操作的话,很有可能会覆盖掉已经是最新版本的数据了

如:当前数据库里的数据为1,业务操作先是将1改为了2,然后马上的又将2改为了3,这两个操作时间非常接近,几乎是同时,然后产生的消息也几乎同时的进入了消息中间件,

但是在queue里依然有先后,2在前3在后(queue机制保证),那么这个时候,consume来消费了,由于consume是多线程的,所以,2和3会被分配到两条线程中同时被处理

这时,如果2的这条线程先结束,3的这条线程后结束,那么则数据正常,最终数据被更新成3

但是,如果3的这条线程先结束了,2的这条线程是后结束的,那么,最新的数据就会被老数据覆盖掉

这种情况显然是不满足需求记录当前最新的数据的,

并且这种情况很容易发生,虽然queue里保证了消息的先后,以及唯一性,但是消息被consume在线程中消费确实同时处理的

脏读的问题

通常以上这种情况,网络上的一些解决方案,都是在数据中加入版本(version)的概念来解决,本文也是(上文提及的1,2,3,其实就是版本的概念).

通常网络上的描述是,在update的时候,根据数据库中的最新版本号,如果当前消息的版本号小于数据库最新的版本号,则放弃更新,大于,则更新

这一段逻辑很简单,但是也很容易产生误解,最大的问题在于获得最新版本号,在多线程环境下,数据库的脏读也是蛮严重的,脏读的存在,导致你查询出来的数据并非是最新的数据

如:上面的一个场景,数据库中最新的版本号是1,有版本号2和3两个消息是即将要消费的,按照上面的逻辑,处理程序应该先查数据库,拿到当前最新的版本.

这个时候,两条线程查询到的结果有可能都是1,这时21,并且31,两条线程依然都会执行,同样的:

如果2的这条线程先结束,3的这条线程后结束,那么则数据正常,最终数据被更新成3

如果3的这条线程先结束了,2的这条线程是后结束的,那么,最新的数据就会被老数据覆盖掉

同样达不到想要的效果

如何保证入库的数据是最新的?

其实要实现很简单,首先,要知道,对于同一行数据,sql的执行也是有先后顺序的,其实到底更新为2的sql语句先执行,还是更新为3的sql语句先执行,并不重要

重要的是,将判断版本号的语句放入更新条件中进行判断.

例子:同样是上面的场景,数据库中的版本为1,这时2和3同时更新,谁先结束,谁也不知道,也无法控制(其实有办法,但是损失性能,当前场景需要的是高效)

但是我们可以在条件中加入"version2"这样的条件

SQL语句样例:

UPDATETEST_MSGSETVERSION={data},LAST_UPDATE_DATE={businessKey}ANDVERSION{updateVersion}"的方法进行更新,保证了将最新的数据更新到数据库中

最后的线程休眠,是为了模拟处理时间,以便造成更多的并发情况

intcount(@Param("businessKey")StringbusinessKey);intinsert(TestMsgtestMsg);intupdate(TestMsgtestMsg);selectid="count"resultType="int"SELECTCOUNT(*)FROMTEST_MSGWHEREBUSINESS_KEY={businessKey},{data},{version},DATA={lastUpdatedDate}WHEREBUSINESS_KEY={version}/update

以上为相关的sqlmap定义以及mapper接口的定义

CREATETABLETEST_MSG(BUSINESS_KEYVARCHAR(100)NOTNULLPRIMARYKEY,VERSIONBIGINTNOTNULL,DATAVARCHAR(100)NOTNULL,LAST_UPDATE_DATEDATETIMENOTNULL)COMMENT'TEST_MSG';

以上为表结构的定义