FAT ,基于springboot , 使用zookeeper,redis , spring async , spring transactionManager的强一致性分布式事务解决方案
纯编码方式,强一致性。
使用redis/zookeeper作为注册中心 ,代理事务的执行,使用spring async异步处理事务线程。
基于注解使用,对业务代码可以说是零入侵,目前内置适配spring-cloud(Feign调用) , dubbo。
同时具备一定的扩展性与兼容性,因为存在自定义的服务框架,或者以后会涌现出更多的流行分布式服务框架,所以会提供一些组件适配自定义服务框架。
<dependency>
<groupId>com.github.cjyican</groupId>
<artifactId>fat-common</artifactId>
<version>1.0.6-RELEASE</version>
</dependency>
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
@EnableFat
public class FatboyEurekaRibbonApplication {
public static void main(String[] args) {
SpringApplication.run(FatboyEurekaRibbonApplication.class, args);
}
}
使用redis/zookeeper作为注册中心,优先使用zookeeper。为隔离业务使用的redis和注册中心的redis,提供了一套属性配置。 在业务redis/zookeeper作为注册中心与注册中心相同时,也需要配置。 请保证各个服务的注册中心配置一致,否则无法协调分布式事务。
#Fat
# Redis数据库索引(默认为0)
fat.redis.database=0
# Redis服务器地址
fat.redis.host=
# Redis服务器连接端口
fat.redis.port=6379
# Redis服务器连接密码(默认为空)
fat.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
fat.redis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
fat.redis.pool.max-wait=-1
# 连接池中的最大空闲连接
fat.redis.pool.max-idle=10
# 连接池中的最小空闲连接
fat.redis.pool.min-idle=2
# 连接超时时间(毫秒)
fat.redis.timeout=1000
# 集群模式,如有配置,将优先使用集群
fat.redis.cluster.nodes=x.x.x.x:x,x.x.x.x:x
# zookeeper服务器地址
fat.zookeeper.host=x.x.x.x:x,x.x.x.x:x
# zookeeper活跃时间
fat.zookeeper.sessionTimeout=x.x.x.x:x,x.x.x.x:x
应用标识,与spirng.application.name一致,必须配置
spring.application.name=fatboy-eureka-ribbon
在需要开启分布式事务管理的入口方法中加入注解@FatServiceRegister,注意不要重复添加。dubbo的直接加在serviceImpl.method上面就可以了。
@RequestMapping("/user-service/{userId}/updateUserOrderNum1")
@FatServiceRegister
public Integer updateUserOrderNum1(@PathVariable("userId") Long userId , @RequestParam("lastOrderId") Long lastOrderId ) throws Exception {
int userResult = service.updateUserOrderNum(userId , lastOrderId);
prodFeign.updateStock(1l);
//int i = 10 / 0;测试使用
return userResult;
}
注解解析
注意@FatTransaction必须要与@Transactional配合使用已获取用户配置的事务信息,否则将会报错
@FatTransaction
@Transactional
public Integer updateUserOrderNum(Long userId ,Long lastOrderId ) throws Exception {
User user = new User();
user.setLastOrderId(lastOrderId);
user.setUserId(userId);
int i = mapper.updateUserOrderNum(user);
//int j = 10 /0 ;//测试使用
return i;
}
注解解析
/**
* 等待当前服务返回值的超时时间 , 默认3秒
*/
long waitResultMillisSeconds() default 3000;
/**
* 服务等待提交时间 ,默认3秒
*/
long waitCommitMillisSeconds() default 3000;
OK,到这里这个接口的服务链路已经完成了,可以跑起来了。简单吧,嘿嘿嘿。
图不重要,重要的思想和代码,下面介绍一下FAT的一些设计和源码
采取了异步执行业务操作,操作事务的方式。需要阻塞当前事务提交线程,主线程会不会响应很慢?在FAT里面,主线程得到响应是非常快的,因为在服务链路的场景里,调用服务B,需要使用到服务A的结果,而这个服务A的结果,实际上是不需要等到事务提交的,所以调用服务A的时间上是业务执行的结果时间,不是事务提交的时间。
FAT对于事务的监控阻塞,目前设计是三个阶段:
1,业务执行完毕的阻塞,即服务执行完业务操作,向注册中心注册业务完成标示,此为第一次阻塞,用于等待服务链路其他服务的执行。此时是可以通过等待超时回滚整个分布式事务的。此阻塞还有另一个意义,让整个分布式事务的时间线趋于平衡状态,降低超时出错几率。
2,所有服务执行完毕,事务准备提交的阻塞。即所有服务的业务操作已经完毕,注册中心已经获取到所有服务的完成标识。此阻塞将不会有超时限制。
3,分组协调器的阻塞,在服务调服务的场景中,会进行事务分组,每个事务组完成,将会到分组协调器注册标识,当所有事务分组完成,事务才开始进行提交。
注意:
在阶段3之前,也就是业务操作过程,业务超时 or 服务链路挂了 or 客户端挂了 or 注册中心挂了,整个事务都会由于协调超时而回滚,不会出现不一致的情况,但是某个服务挂了,由于事务尚未提交,该服务的事务需要DB手工操作。
阶段3,某个服务挂了,将不影响其他服务的事务提交,但是某个服务挂了,由于事务尚未提交,该服务的事务需要DB手工操作。注册中心挂了,已经提交的不影响,剩余的将会回滚。
综上,应当保证服务的业务操作效率以及注册中心的稳定性。
直接看代码,注释非常清晰
主要处理流程都集中在
https://github.com/cjyican/fat/tree/master/src/main/java/com/cjy/fat/resolve
https://github.com/cjyican/fat/blob/master/src/main/java/com/cjy/fat/resolve/ServiceRegisterAspect.java
https://github.com/cjyican/fat/blob/master/src/main/java/com/cjy/fat/resolve/ServiceRegisterResolver.java
https://github.com/cjyican/fat/blob/master/src/main/java/com/cjy/fat/resolve/TransactionAspect.java
https://github.com/cjyican/fat/blob/master/src/main/java/com/cjy/fat/resolve/handler/ServiceRunningHandler.java
https://github.com/cjyican/fat/blob/master/src/main/java/com/cjy/fat/resolve/CommitResolver.java
FAT使用Sping Async处理事务流程,自然需要用到线程池,线程池默认有配置,也可以根据项目运行情况自定义,以下为配置信息
@Value("${fat.thread.core_pool_size:20}")
private int corePoolSize ;
@Value("${fat.thread.max_pool_size:50}")
private int maxPoolSize ;
@Value("${fat.thread.queue_capacity:200}")
private int queueCapacity ;
@Value("${fat.thread.keep_alive_seconds:60}")
private int keepAliveSeconds;
根据项目运行情况配置,默认0.1秒
/**
* 间歇消费时间(毫秒)默认200毫秒
* 争抢可提交标识的时候,可能发生错误,避免继续阻塞,导致jdbcConnection/数据库事务迟迟不肯放手,为了提高响应速度,
* 将pop的阻塞时间分段请求
*/
@Value("${tx.commit.blankTime:100}")
private long commitBlankTime ;
@Value("${tx.waitResult.blankTime:100}")
private long waitResultBlankTime;
以后应该会有更多的流行服务框架,所以一个个适配是不可能的了,这辈子都不可能出一个适配一个的了(开玩笑的,希望我的设计可以兼容吧)
/**
* 获取事务信息提供给自定义拦截器
* @return
*/
public static final Map<String , String> buildRemoteData(){
//返回新的对象,不开放修改入口,避免被客户端串改
Map<String , String> map = new HashMap<>();
map.put(STR_ROOT_TX_KEY, getRootTxKey());
return map;
}
自定义服务框架,需要在调用服务时,把这个map数据包的key-value传达到服务上下文。 例如,dubbo的自定义SPI-Filter,需要遍历此map,RpcContext.getContext().setAttachment(key,value)
自定义服务框架需要提供该接口,并需要加入到spring上下文中,让FAT获取到数据源,进而把分布式上下文信息传播到本地
public interface CustomRemoteDataAdapter {
/**
* eg:dubbo:RpcContent.getAttachments();
* @return
*/
Map<String , String> convertRemoteDataToMap();
}
例如Dubbo
@Compoent
public Class DubboRemoteDataAdapter implements CustomRemoteDataAdapter{
/**
* eg:dubbo:RpcContent.getAttachments();
* 在github的readme.md写代码是一种怎么样的体验
*/
public Map<String , String> convertRemoteDataToMap(){
return RpcContent.getAttachments();
}
}
1,调用服务,建议把服务提取到事务方法外执行,比如feign,可以提取到controller中调用,避免事务耗时过长
2,调用多个服务时,建议把耗时长的服务优先调用
3,建议分开业务redis与注册中心redis,避免业务操作的redis IO压力过大
4,建议设定事务时间,避免事务长期保持,占据数据库资源
|-修复上个版本自定义服务api,无法获取到分布式事务上下文的bug
|-支持注册中心集群模式,参看上文(使用示例)
|-FAT处理事务的线程池配置属性名变更(不兼容旧版本) fb.thread.xx --> fat.thread.xx
|-添加@EnableFat注解使用,Fat分布式事务开关,ps:在dubbo场景中,因为使用的是SPI自定义Filter,在添加了fat的依赖,却不添加@EnableFat注解的话会报错。
|-性能优化:1,本地业务操作等待返回值,不再需要使用JSON序列化,提高主线程等待速度,事务协调速度
2,事务出错监听性能优化,在服务链路需要开启开启了多个事务组的情况下,下层服务出错将更快事务分组协调器感知,响应速度更快
3,本地异常捕捉提高准确度,事务执行线程的异常将通过地址传递到主线程,服务响应速度更快,异常信息更准确
4,分布式事务协调过程的阻塞过程优化,此前为三段阻塞确认(参看上文的‘性能分析’)优化为两段阻塞监听:
5,由于以上优化,注册中心redis将会更节省空间,减缓IO压力
6,支持自定义的事务管理器,JpaTransactionManager,DataSourceTransactionManager阶段1:本地业务完成的等待阻塞。此阻塞发生在当本地业务操作完成后,进行阻塞监听注册中心所在事务协调器,等待所在事务组所有服务完成业务操作。该阻塞监听将会有超时限制。此阶段出现业务异常 OR 某服务挂了 OR 注册中心挂了 OR 客户端挂了 OR 等待超时,都会由于事务确认机制而回滚,确保一致。但某个服务挂了,由于该服务的事务尚未完成,需要DB手工操作。
阶段2,各事务组完成后,分组协调器的阻塞。此阻塞发生在所在事务组,所有服务已经完成其业务操作,监听注册中心事务分组协调器等待其他事务组整体完成的节点。此阻塞监听不会有超时限制。此阶段出现业务异常 OR 某服务 OR 注册中心 OR 客户端 OR 等待超时,都会由于事务确认机制而回滚,确保一致。但某个服务挂了,由于该服务的事务尚未完成,需要DB手工操作。
阶段2后说明整个事务链路已经可以提交,注册中心挂了,已经获取到提交标识的,提交不影响,未提交的事务(已经设置事务超时时间的将会回滚,未设置的,将会保持,需要DB手工操作),服务之间再无联系。
|-第一个稳定版本
1,会持续更新维护 2,(学学前端?)打造FAT分布式事务管控平台FAT-monitor
FAT是我学java以来第一次想与大家分享的东西
希望可以相互学习,互相交流!
thank you for your star.