关于使用Redisson订阅数问题

目录
  • 一、前提
  • 二、源码分析
    • 1、redissonlock#lock() 方法
    • 2、详细看下subscribe()方法
    • 3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscribe()里面
    • 4、publishsubscribeservice#subscribe逻辑如下:
  • 三 总结

    一、前提

    最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsperconnection or subscriptionconnectionpoolsize 的大小不够,需要提高配置才能解决。

    二、源码分析

    下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:

    1、redissonlock#lock() 方法

    private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception {
            long threadid = thread.currentthread().getid();
            // 尝试获取,如果ttl == null,则表示获取锁成功
            long ttl = tryacquire(leasetime, unit, threadid);
            // lock acquired
            if (ttl == null) {
                return;
            }
    
            // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
            rfuture<redissonlockentry> future = subscribe(threadid);
            if (interruptibly) {
                commandexecutor.syncsubscriptioninterrupted(future);
            } else {
                commandexecutor.syncsubscription(future);
            }
    
            // 后面代码忽略
            try {
                // 无限循环获取锁,直到获取锁成功
                // ...
            } finally {
                // 取消订阅锁释放事件
                unsubscribe(future, threadid);
            }
    }

    总结下主要逻辑:

    • 获取当前线程的线程id;
    • tryaquire尝试获取锁,并返回ttl
    • 如果ttl为空,则结束流程;否则进入后续逻辑;
    • this.subscribe(threadid)订阅当前线程,返回一个rfuture;
    • 如果在指定时间没有监听到,则会产生如上异常。
    • 订阅成功后, 通过while(true)循环,一直尝试获取锁
    • fially代码块,会解除订阅

    所以上述这情况问题应该出现在subscribe()方法中

    2、详细看下subscribe()方法

    protected rfuture<redissonlockentry> subscribe(long threadid) {
        // entryname 格式:“id:name”;
        // channelname 格式:“redisson_lock__channel:name”;
        return pubsub.subscribe(getentryname(), getchannelname());
    }

    redissonlock#pubsub 是在redissonlock构造函数中初始化的:

    public redissonlock(commandasyncexecutor commandexecutor, string name) {
        // ....
        this.pubsub = commandexecutor.getconnectionmanager().getsubscribeservice().getlockpubsub();
    }

    而subscribeservice在masterslaveconnectionmanager的实现中又是通过如下方式构造的

    public masterslaveconnectionmanager(masterslaveserversconfig cfg, config config, uuid id) {
        this(config, id);
        this.config = cfg;
    
        // 初始化
        inittimer(cfg);
        initsingleentry();
    }
    
    protected void inittimer(masterslaveserversconfig config) {
        int[] timeouts = new int[]{config.getretryinterval(), config.gettimeout()};
        arrays.sort(timeouts);
        int mintimeout = timeouts[0];
        if (mintimeout % 100 != 0) {
            mintimeout = (mintimeout % 100) / 2;
        } else if (mintimeout == 100) {
            mintimeout = 50;
        } else {
            mintimeout = 100;
        }
    
        timer = new hashedwheeltimer(new defaultthreadfactory("redisson-timer"), mintimeout, timeunit.milliseconds, 1024, false);
    
        connectionwatcher = new idleconnectionwatcher(this, config);
    
        // 初始化:其中this就是masterslaveconnectionmanager实例,config则为masterslaveserversconfig实例:
        subscribeservice = new publishsubscribeservice(this, config);
    }

    publishsubscribeservice构造函数

    private final semaphorepubsub semaphorepubsub = new semaphorepubsub(this);
    public publishsubscribeservice(connectionmanager connectionmanager, masterslaveserversconfig config) {
        super();
        this.connectionmanager = connectionmanager;
        this.config = config;
        for (int i = 0; i < locks.length; i++) {
            // 这里初始化了一组信号量,每个信号量的初始值为1
            locks[i] = new asyncsemaphore(1);
        }
    }

    3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscribe()里面

    private final concurrentmap<string, e> entries = new concurrenthashmap<>();
    
    public rfuture<e> subscribe(string entryname, string channelname) {
          // 从publishsubscribeservice获取对应的信号量。 相同的channelname获取的是同一个信号量
         // public asyncsemaphore getsemaphore(channelname channelname) {
        //    return locks[math.abs(channelname.hashcode() % locks.length)];
        // }
        asyncsemaphore semaphore = service.getsemaphore(new channelname(channelname));
    
        atomicreference<runnable> listenerholder = new atomicreference<runnable>();    
        rpromise<e> newpromise = new redissonpromise<e>() {
            @override
            public boolean cancel(boolean mayinterruptifrunning) {
                return semaphore.remove(listenerholder.get());
            }
        };
    
        runnable listener = new runnable() {
    
            @override
            public void run() {
                //  如果存在redissonlockentry, 则直接利用已有的监听
                e entry = entries.get(entryname);
                if (entry != null) {
                    entry.acquire();
                    semaphore.release();
                    entry.getpromise().oncomplete(new transferlistener<e>(newpromise));
                    return;
                }
    
                e value = createentry(newpromise);
                value.acquire();
    
                e oldvalue = entries.putifabsent(entryname, value);
                if (oldvalue != null) {
                    oldvalue.acquire();
                    semaphore.release();
                    oldvalue.getpromise().oncomplete(new transferlistener<e>(newpromise));
                    return;
                }
    
                // 创建监听,
                redispubsublistener<object> listener = createlistener(channelname, value);
                // 订阅监听
                service.subscribe(longcodec.instance, channelname, semaphore, listener);
            }
        };
    
        // 最终会执行listener.run方法
        semaphore.acquire(listener);
        listenerholder.set(listener);
    
        return newpromise;
    }

    asyncsemaphore#acquire()方法

    public void acquire(runnable listener) {
        acquire(listener, 1);
    }
    
    public void acquire(runnable listener, int permits) {
        boolean run = false;
    
        synchronized (this) {
            // counter初始化值为1
            if (counter < permits) {
                // 如果不是第一次执行,则将listener加入到listeners集合中
                listeners.add(new entry(listener, permits));
                return;
            } else {
                counter -= permits;
                run = true;
            }
        }
    
        // 第一次执行acquire, 才会执行listener.run()方法
        if (run) {
            listener.run();
        }
    }

    梳理上述逻辑:

    1、从publishsubscribeservice获取对应的信号量, 相同的channelname获取的是同一个信号量
    2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
    3、如果已经存在redissonlockentry, 则利用已经订阅就行
    4、如果不存在redissonlockentry, 则会创建新的redissonlockentry,然后进行。

    从上面代码看,主要逻辑是交给了publishsubscribeservice#subscribe方法

    4、publishsubscribeservice#subscribe逻辑如下:

    private final concurrentmap<channelname, pubsubconnectionentry> name2pubsubconnection = new concurrenthashmap<>();
    private final queue<pubsubconnectionentry> freepubsubconnections = new concurrentlinkedqueue<>();
    public rfuture<pubsubconnectionentry> subscribe(codec codec, string channelname, asyncsemaphore semaphore, redispubsublistener<?>... listeners) {
        rpromise<pubsubconnectionentry> promise = new redissonpromise<pubsubconnectionentry>();
        // 主要逻辑入口, 这里要主要channelname每次都是新对象, 但内部覆写hashcode+equals。
        subscribe(codec, new channelname(channelname), promise, pubsubtype.subscribe, semaphore, listeners);
        return promise;
    }
    private void subscribe(codec codec, channelname channelname,  rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) {
        pubsubconnectionentry connentry = name2pubsubconnection.get(channelname);
        if (connentry != null) {
            // 从已有connection中取,如果存在直接把listeners加入到pubsubconnectionentry中
            addlisteners(channelname, promise, type, lock, connentry, listeners);
            return;
        }
        // 没有时,才是最重要的逻辑
        freepubsublock.acquire(new runnable() {
            @override
            public void run() {
                if (promise.isdone()) {
                    lock.release();
                    freepubsublock.release();
                    return;
                }
                // 从队列中取头部元素
                pubsubconnectionentry freeentry = freepubsubconnections.peek();
                if (freeentry == null) {
                    // 第一次肯定是没有的需要建立
                    connect(codec, channelname, promise, type, lock, listeners);
                    return;
                }
                // 如果存在则尝试获取,如果remainfreeamount小于0则抛出异常终止了。
                int remainfreeamount = freeentry.tryacquire();
                if (remainfreeamount == -1) {
                    throw new illegalstateexception();
                }
                pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, freeentry);
                if (oldentry != null) {
                    freeentry.release();
                    freepubsublock.release();
                    addlisteners(channelname, promise, type, lock, oldentry, listeners);
                    return;
                }
                // 如果remainfreeamount=0, 则从队列中移除
                if (remainfreeamount == 0) {
                    freepubsubconnections.poll();
                }
                freepubsublock.release();
                // 增加监听
                rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, freeentry, listeners);
                channelfuture future;
                if (pubsubtype.psubscribe == type) {
                    future = freeentry.psubscribe(codec, channelname);
                } else {
                    future = freeentry.subscribe(codec, channelname);
                }
                future.addlistener(new channelfuturelistener() {
                    @override
                    public void operationcomplete(channelfuture future) throws exception {
                        if (!future.issuccess()) {
                            if (!promise.isdone()) {
                                subscribefuture.cancel(false);
                            }
                            return;
                        }
                        connectionmanager.newtimeout(new timertask() {
                            @override
                            public void run(timeout timeout) throws exception {
                                subscribefuture.cancel(false);
                            }
                        }, config.gettimeout(), timeunit.milliseconds);
                    }
                });
            }
        });
    }
    private void connect(codec codec, channelname channelname, rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) {
        // 根据channelname计算出slot获取pubsubconnection
        int slot = connectionmanager.calcslot(channelname.getname());
        rfuture<redispubsubconnection> connfuture = nextpubsubconnection(slot);
        promise.oncomplete((res, e) -> {
            if (e != null) {
                ((rpromise<redispubsubconnection>) connfuture).tryfailure(e);
            }
        });
        connfuture.oncomplete((conn, e) -> {
            if (e != null) {
                freepubsublock.release();
                lock.release();
                promise.tryfailure(e);
                return;
            }
            // 这里会从配置中读取subscriptionsperconnection
            pubsubconnectionentry entry = new pubsubconnectionentry(conn, config.getsubscriptionsperconnection());
            // 每获取一次,subscriptionsperconnection就会减直到为0
            int remainfreeamount = entry.tryacquire();
            // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldentry中
            pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, entry);
            if (oldentry != null) {
                releasesubscribeconnection(slot, entry);
                freepubsublock.release();
                addlisteners(channelname, promise, type, lock, oldentry, listeners);
                return;
            }
            if (remainfreeamount > 0) {
                // 加入到队列中
                freepubsubconnections.add(entry);
            }
            freepubsublock.release();
            rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, entry, listeners);
            // 这里真正的进行订阅(底层与redis交互)
            channelfuture future;
            if (pubsubtype.psubscribe == type) {
                future = entry.psubscribe(codec, channelname);
            } else {
                future = entry.subscribe(codec, channelname);
            }
            future.addlistener(new channelfuturelistener() {
                @override
                public void operationcomplete(channelfuture future) throws exception {
                    if (!future.issuccess()) {
                        if (!promise.isdone()) {
                            subscribefuture.cancel(false);
                        }
                        return;
                    }
                    connectionmanager.newtimeout(new timertask() {
                        @override
                        public void run(timeout timeout) throws exception {
                            subscribefuture.cancel(false);
                        }
                    }, config.gettimeout(), timeunit.milliseconds);
                }
            });
        });
    }

    pubsubconnectionentry#tryacquire方法, subscriptionsperconnection代表了每个连接的最大订阅数。当tryacqcurie的时候会减少这个数量:

     public int tryacquire() {
        while (true) {
            int value = subscribedchannelsamount.get();
            if (value == 0) {
                return -1;
            }
            if (subscribedchannelsamount.compareandset(value, value - 1)) {
                return value - 1;
            }
        }
    }

    梳理上述逻辑:

    1、还是进行重复判断, 根据channelname从name2pubsubconnection中获取,看是否存在已经订阅:pubsubconnectionentry; 如果存在直接把新的listener加入到pubsubconnectionentry。
    2、从队列freepubsubconnections中取公用的pubsubconnectionentry, 如果没有就进入connect()方法

    2.1 会根据subscriptionsperconnection创建pubsubconnectionentry, 然后调用其tryacquire()方法 – 每调用一次就会减1
    2.2 将新的pubsubconnectionentry放入全局的name2pubsubconnection, 方便后续重复使用;
    2.3 同时也将pubsubconnectionentry放入队列freepubsubconnections中。- remainfreeamount > 0
    2.4 后面就是进行底层的subscribe和addlistener

    3、如果已经存在pubsubconnectionentry,则利用已有的pubsubconnectionentry进行tryacquire;
    4、如果remainfreeamount < 0 会抛出illegalstateexception异常;如果remainfreeamount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
    5、最后也是进行底层的subscribe和addlistener;

    三 总结

    根因: 从上面代码分析, 导致问题的根因是因为publishsubscribeservice 会使用公共队列中的freepubsubconnections, 如果同一个key一次性请求超过subscriptionsperconnection它的默认值5时,remainfreeamount就可能出现-1的情况, 那么就会导致commandexecutor.syncsubscription(future)中等待超时,也就抛出如上异常subscribe timeout: (7500ms). increase ‘subscriptionsperconnection’ and/or ‘subscriptionconnectionpoolsize’ parameters.

    解决方法: 在初始化redisson可以可指定这个配置项的值。

    相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-configuration#23-common-settings

    到此这篇关于关于使用redisson订阅数问题的文章就介绍到这了,更多相关redisson 订阅数 内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

    (0)
    上一篇 2022年3月21日
    下一篇 2022年3月21日

    相关推荐