在使用spring-cloud-gateway和nacos中遇到的ConcurrentModification异常分析

最近项目中升级了spring-boot版本(2.1.3.RELEASE),同时也把之前的Zuul替换成了spring-gate-way(2.1.1.RELEASE),并且集成了nacos(1.0.0)..

发布到测试环境的时候,出现了如下异常


reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.ConcurrentModificationException
Caused by: java.util.ConcurrentModificationException: null
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at reactor.core.publisher.FluxIterable$IterableSubscription.poll(FluxIterable.java:389)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:634)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:924)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)
at reactor.core.publisher.Flux.subscribe(Flux.java:7777)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:389)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:243)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:201)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:335)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
at reactor.core.publisher.Flux.subscribe(Flux.java:7777)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:389)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:243)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:201)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:335)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
at reactor.core.publisher.MonoFlattenIterable.subscribe(MonoFlattenIterable.java:101)
at reactor.core.publisher.FluxMaterialize.subscribe(FluxMaterialize.java:40)
at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)
at reactor.core.publisher.MonoFlattenIterable.subscribe(MonoFlattenIterable.java:101)
at reactor.core.publisher.FluxDematerialize.subscribe(FluxDematerialize.java:39)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:7777)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:7941)
at reactor.core.publisher.Flux.subscribe(Flux.java:7770)
at reactor.core.publisher.Flux.subscribe(Flux.java:7734)
at reactor.core.publisher.Flux.subscribe(Flux.java:7652)
at org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter.lambda$onApplicationEvent$0(WeightCalculatorWebFilter.java:133)
at org.springframework.beans.factory.ObjectProvider.ifAvailable(ObjectProvider.java:93)
at org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter.onApplicationEvent(WeightCalculatorWebFilter.java:133)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:402)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:359)
at org.springframework.cloud.gateway.route.RouteRefreshListener.reset(RouteRefreshListener.java:68)
at org.springframework.cloud.gateway.route.RouteRefreshListener.resetIfNeeded(RouteRefreshListener.java:63)
at org.springframework.cloud.gateway.route.RouteRefreshListener.onApplicationEvent(RouteRefreshListener.java:57)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:402)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:359)
at org.springframework.cloud.alibaba.nacos.discovery.NacosWatch.lambda$nacosServicesWatch$0(NacosWatch.java:156)
at com.alibaba.nacos.client.naming.core.EventDispatcher$Notifier.run(EventDispatcher.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

分析

上述异常是spring-cloud-gatewayWeightCalculatorWebFilter在监听到RefreshRouteEvent时回调onApplicationEvent抛出的

@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof PredicateArgsEvent) {
handle((PredicateArgsEvent) event);
}
else if (event instanceof WeightDefinedEvent) {
addWeightConfig(((WeightDefinedEvent) event).getWeightConfig());
}
else if (event instanceof RefreshRoutesEvent && routeLocator != null) {
//这里的locator.getRoutes().subscribe()会迭代GatewayProperties中routes的元素
routeLocator.ifAvailable(locator -> locator.getRoutes().subscribe()); // forces
// initialization
}

}

出现了ConcurrentModification这个异常,那就说明了routes中的元素肯定被修改了,话不多说,直接debug

首先在GatewayProperties中的setRoutes()方法中打一个断点,当断点被触发的时候,记下List routes的实例id

然后到AbstractList中找到modCount字段,设置断点

下面就静静的等待断点触发

大概就十几秒吧

当断点被触发时,顺着堆栈,找到源头

上面的截图中的方法属于NacosContextRefresher 这个类,这个方法发布了一个RefreshEvent

RefreshEventListener 监听了这个事件回调后,会执行下面这个方法

public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
log.debug("Event received " + event.getEventDesc());
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}

这个方法会调用ContextRefresher中的refresh方法


public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}

上述方法会使Environment中的配置属性刷新,重新加载,同时一些ConfigurationPropertie 类的属性会被重新绑定(这个时候GatewayProperties中之前的routes中的元素会被清除,然后设置新的路由信息,进而modCount这个值也被修改了),然后发布RefreshScopeRefreshedEvent

RouteRefreshListener 监听了 RefreshScopeRefreshedEvent这个事件,当回调时,发布RefreshRoutesEvent事件

spring-cloud-gateway 中的CachingRouteLocator和上面提到的WeightCalculatorWebFilter 都监听了RefreshRoutesEvent`事件

然后触发各个监听器的回调方法

由于上述的GatewayProperties中之前的routes集合已被修改过,当WeightCalculatorWebFilter中的方法回调时,会调用CachingRouteLocatorgetRoutes方法,然后的迭代routes中的元素

再来看CachingRouteLocator中的代码实现


public class CachingRouteLocator
implements RouteLocator, ApplicationListener<RefreshRoutesEvent> {

private final RouteLocator delegate;


//Routes
private final Flux<Route> routes;

//routes缓存
private final Map<String, List> cache = new HashMap<>();

public CachingRouteLocator(RouteLocator delegate) {
this.delegate = delegate;
//当调用Flux<Route>相关方法时,会先从cache中获取(key为routes,value为List<Route>),当cache中有对应route时,直接从缓存中获取,否则从GatewayProperties中获取
routes = CacheFlux.lookup(cache, "routes", Route.class)
.onCacheMissResume(() -> this.delegate.getRoutes()
.sort(AnnotationAwareOrderComparator.INSTANCE));
}

//返回Flux<Route>
@Override
public Flux<Route> getRoutes() {
return this.routes;
}

/**
* Clears the routes cache.
* @return routes flux
*/
public Flux<Route> refresh() {
this.cache.clear();
return this.routes;
}

@Override
public void onApplicationEvent(RefreshRoutesEvent event) {
refresh();
}

@Deprecated
/* for testing */ void handleRefresh() {
refresh();
}

}

上述Map<String,List> cache中有routes的缓存,所以getRoutes获取到的是被修改过的List<Route> routes,所以这个时候100%触发ConcurrentModificationException

下面上一张简要的流程图,来看整个流程(图画得丑)

如何解决

  1. 要想解决上述问题,需要让CachingRouteLocator先于WeightCalculatorWebFilter 触发RefreshRouteEvent事件回调

    但是spring事件广播器在获取对应事件的监听器时会对listener做排序(升序),由于WeightCalculatorWebFilter实现了Ordered,而CachingRouteLocator没有(优先级排最后),这是似乎有点难搞啊

    好在我们有AOP,可以对CachingRouteLocator做“增强”,代理CachingRouteLocator并且让他实现Ordered接口。相关代码如下

    import org.aopalliance.intercept.MethodInterceptor;
    import org.aopalliance.intercept.MethodInvocation;
    import org.springframework.aop.framework.AopProxyUtils;
    import org.springframework.aop.framework.ProxyFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.config.BeanPostProcessor;
    import org.springframework.cloud.gateway.route.CachingRouteLocator;
    import org.springframework.core.Ordered;
    import org.springframework.stereotype.Component;

    /**
    * Enhance the CachingRouteLocator
    * @author suchu
    */
    @Component
    public class CachingRouteLocatorHook implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (targetClass.getName().equals(CachingRouteLocator.class.getName())) {
    ProxyFactory factory = new ProxyFactory();
    factory.addInterface(Ordered.class);
    factory.setTarget(bean);
    factory.setProxyTargetClass(true);
    factory.addAdvice((MethodInterceptor) methodInvocation -> {
    if ("getOrder".equals(methodInvocation.getMethod().getName())) {
    //这里就简单返回1吧,只要比WeightCalculator的order小就行了.
    return 1;
    } else {
    return methodInvocation.proceed();
    }
    });
    return factory.getProxy();
    }
    return bean;
    }
    }
  2. 等spring-cloud-gateway后面修复吧,或者换成其他网关(2333)

总结

  1. 学到新的debug技巧(只触发指定实例id的断点)

  2. 了解nacos的一些基本原理

  3. 了解了reactor的一些基本概念,和基本操作

  4. 自己太菜了,还得多学习