目录
- 一、前提
- 二、源码分析
- 1、redissonlock#lock() 方法
- 2、详细看下subscribe()方法
- 3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscribe()里面
- 4、publishsubscribeservice#subscribe逻辑如下:
- 三 总结
一、前提
最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsperconnection or subscriptionconnectionpoolsize 的大小不够,需要提高配置才能解决。
二、源码分析
下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:
1、redissonlock#lock() 方法
private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception {
long threadid = thread.currentthread().getid();
// 尝试获取,如果ttl == null,则表示获取锁成功
long ttl = tryacquire(leasetime, unit, threadid);
// lock acquired
if (ttl == null) {
return;
}
// 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
rfuture<redissonlockentry> future = subscribe(threadid);
if (interruptibly) {
commandexecutor.syncsubscriptioninterrupted(future);
} else {
commandexecutor.syncsubscription(future);
}
// 后面代码忽略
try {
// 无限循环获取锁,直到获取锁成功
// ...
} finally {
// 取消订阅锁释放事件
unsubscribe(future, threadid);
}
}
总结下主要逻辑:
- 获取当前线程的线程id;
- tryaquire尝试获取锁,并返回ttl
- 如果ttl为空,则结束流程;否则进入后续逻辑;
- this.subscribe(threadid)订阅当前线程,返回一个rfuture;
- 如果在指定时间没有监听到,则会产生如上异常。
- 订阅成功后, 通过while(true)循环,一直尝试获取锁
- fially代码块,会解除订阅
所以上述这情况问题应该出现在subscribe()方法中
2、详细看下subscribe()方法
protected rfuture<redissonlockentry> subscribe(long threadid) {
// entryname 格式:“id:name”;
// channelname 格式:“redisson_lock__channel:name”;
return pubsub.subscribe(getentryname(), getchannelname());
}
redissonlock#pubsub 是在redissonlock构造函数中初始化的:
public redissonlock(commandasyncexecutor commandexecutor, string name) {
// ....
this.pubsub = commandexecutor.getconnectionmanager().getsubscribeservice().getlockpubsub();
}
而subscribeservice在masterslaveconnectionmanager的实现中又是通过如下方式构造的
public masterslaveconnectionmanager(masterslaveserversconfig cfg, config config, uuid id) {
this(config, id);
this.config = cfg;
// 初始化
inittimer(cfg);
initsingleentry();
}
protected void inittimer(masterslaveserversconfig config) {
int[] timeouts = new int[]{config.getretryinterval(), config.gettimeout()};
arrays.sort(timeouts);
int mintimeout = timeouts[0];
if (mintimeout % 100 != 0) {
mintimeout = (mintimeout % 100) / 2;
} else if (mintimeout == 100) {
mintimeout = 50;
} else {
mintimeout = 100;
}
timer = new hashedwheeltimer(new defaultthreadfactory("redisson-timer"), mintimeout, timeunit.milliseconds, 1024, false);
connectionwatcher = new idleconnectionwatcher(this, config);
// 初始化:其中this就是masterslaveconnectionmanager实例,config则为masterslaveserversconfig实例:
subscribeservice = new publishsubscribeservice(this, config);
}
publishsubscribeservice构造函数
private final semaphorepubsub semaphorepubsub = new semaphorepubsub(this);
public publishsubscribeservice(connectionmanager connectionmanager, masterslaveserversconfig config) {
super();
this.connectionmanager = connectionmanager;
this.config = config;
for (int i = 0; i < locks.length; i++) {
// 这里初始化了一组信号量,每个信号量的初始值为1
locks[i] = new asyncsemaphore(1);
}
}
3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscribe()里面
private final concurrentmap<string, e> entries = new concurrenthashmap<>();
public rfuture<e> subscribe(string entryname, string channelname) {
// 从publishsubscribeservice获取对应的信号量。 相同的channelname获取的是同一个信号量
// public asyncsemaphore getsemaphore(channelname channelname) {
// return locks[math.abs(channelname.hashcode() % locks.length)];
// }
asyncsemaphore semaphore = service.getsemaphore(new channelname(channelname));
atomicreference<runnable> listenerholder = new atomicreference<runnable>();
rpromise<e> newpromise = new redissonpromise<e>() {
@override
public boolean cancel(boolean mayinterruptifrunning) {
return semaphore.remove(listenerholder.get());
}
};
runnable listener = new runnable() {
@override
public void run() {
// 如果存在redissonlockentry, 则直接利用已有的监听
e entry = entries.get(entryname);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getpromise().oncomplete(new transferlistener<e>(newpromise));
return;
}
e value = createentry(newpromise);
value.acquire();
e oldvalue = entries.putifabsent(entryname, value);
if (oldvalue != null) {
oldvalue.acquire();
semaphore.release();
oldvalue.getpromise().oncomplete(new transferlistener<e>(newpromise));
return;
}
// 创建监听,
redispubsublistener<object> listener = createlistener(channelname, value);
// 订阅监听
service.subscribe(longcodec.instance, channelname, semaphore, listener);
}
};
// 最终会执行listener.run方法
semaphore.acquire(listener);
listenerholder.set(listener);
return newpromise;
}
asyncsemaphore#acquire()方法
public void acquire(runnable listener) {
acquire(listener, 1);
}
public void acquire(runnable listener, int permits) {
boolean run = false;
synchronized (this) {
// counter初始化值为1
if (counter < permits) {
// 如果不是第一次执行,则将listener加入到listeners集合中
listeners.add(new entry(listener, permits));
return;
} else {
counter -= permits;
run = true;
}
}
// 第一次执行acquire, 才会执行listener.run()方法
if (run) {
listener.run();
}
}
梳理上述逻辑:
1、从publishsubscribeservice获取对应的信号量, 相同的channelname获取的是同一个信号量
2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
3、如果已经存在redissonlockentry, 则利用已经订阅就行
4、如果不存在redissonlockentry, 则会创建新的redissonlockentry,然后进行。
从上面代码看,主要逻辑是交给了publishsubscribeservice#subscribe方法
4、publishsubscribeservice#subscribe逻辑如下:
private final concurrentmap<channelname, pubsubconnectionentry> name2pubsubconnection = new concurrenthashmap<>();
private final queue<pubsubconnectionentry> freepubsubconnections = new concurrentlinkedqueue<>();
public rfuture<pubsubconnectionentry> subscribe(codec codec, string channelname, asyncsemaphore semaphore, redispubsublistener<?>... listeners) {
rpromise<pubsubconnectionentry> promise = new redissonpromise<pubsubconnectionentry>();
// 主要逻辑入口, 这里要主要channelname每次都是新对象, 但内部覆写hashcode+equals。
subscribe(codec, new channelname(channelname), promise, pubsubtype.subscribe, semaphore, listeners);
return promise;
}
private void subscribe(codec codec, channelname channelname, rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) {
pubsubconnectionentry connentry = name2pubsubconnection.get(channelname);
if (connentry != null) {
// 从已有connection中取,如果存在直接把listeners加入到pubsubconnectionentry中
addlisteners(channelname, promise, type, lock, connentry, listeners);
return;
}
// 没有时,才是最重要的逻辑
freepubsublock.acquire(new runnable() {
@override
public void run() {
if (promise.isdone()) {
lock.release();
freepubsublock.release();
return;
}
// 从队列中取头部元素
pubsubconnectionentry freeentry = freepubsubconnections.peek();
if (freeentry == null) {
// 第一次肯定是没有的需要建立
connect(codec, channelname, promise, type, lock, listeners);
return;
}
// 如果存在则尝试获取,如果remainfreeamount小于0则抛出异常终止了。
int remainfreeamount = freeentry.tryacquire();
if (remainfreeamount == -1) {
throw new illegalstateexception();
}
pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, freeentry);
if (oldentry != null) {
freeentry.release();
freepubsublock.release();
addlisteners(channelname, promise, type, lock, oldentry, listeners);
return;
}
// 如果remainfreeamount=0, 则从队列中移除
if (remainfreeamount == 0) {
freepubsubconnections.poll();
}
freepubsublock.release();
// 增加监听
rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, freeentry, listeners);
channelfuture future;
if (pubsubtype.psubscribe == type) {
future = freeentry.psubscribe(codec, channelname);
} else {
future = freeentry.subscribe(codec, channelname);
}
future.addlistener(new channelfuturelistener() {
@override
public void operationcomplete(channelfuture future) throws exception {
if (!future.issuccess()) {
if (!promise.isdone()) {
subscribefuture.cancel(false);
}
return;
}
connectionmanager.newtimeout(new timertask() {
@override
public void run(timeout timeout) throws exception {
subscribefuture.cancel(false);
}
}, config.gettimeout(), timeunit.milliseconds);
}
});
}
});
}
private void connect(codec codec, channelname channelname, rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) {
// 根据channelname计算出slot获取pubsubconnection
int slot = connectionmanager.calcslot(channelname.getname());
rfuture<redispubsubconnection> connfuture = nextpubsubconnection(slot);
promise.oncomplete((res, e) -> {
if (e != null) {
((rpromise<redispubsubconnection>) connfuture).tryfailure(e);
}
});
connfuture.oncomplete((conn, e) -> {
if (e != null) {
freepubsublock.release();
lock.release();
promise.tryfailure(e);
return;
}
// 这里会从配置中读取subscriptionsperconnection
pubsubconnectionentry entry = new pubsubconnectionentry(conn, config.getsubscriptionsperconnection());
// 每获取一次,subscriptionsperconnection就会减直到为0
int remainfreeamount = entry.tryacquire();
// 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldentry中
pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, entry);
if (oldentry != null) {
releasesubscribeconnection(slot, entry);
freepubsublock.release();
addlisteners(channelname, promise, type, lock, oldentry, listeners);
return;
}
if (remainfreeamount > 0) {
// 加入到队列中
freepubsubconnections.add(entry);
}
freepubsublock.release();
rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, entry, listeners);
// 这里真正的进行订阅(底层与redis交互)
channelfuture future;
if (pubsubtype.psubscribe == type) {
future = entry.psubscribe(codec, channelname);
} else {
future = entry.subscribe(codec, channelname);
}
future.addlistener(new channelfuturelistener() {
@override
public void operationcomplete(channelfuture future) throws exception {
if (!future.issuccess()) {
if (!promise.isdone()) {
subscribefuture.cancel(false);
}
return;
}
connectionmanager.newtimeout(new timertask() {
@override
public void run(timeout timeout) throws exception {
subscribefuture.cancel(false);
}
}, config.gettimeout(), timeunit.milliseconds);
}
});
});
}
pubsubconnectionentry#tryacquire方法, subscriptionsperconnection代表了每个连接的最大订阅数。当tryacqcurie的时候会减少这个数量:
public int tryacquire() {
while (true) {
int value = subscribedchannelsamount.get();
if (value == 0) {
return -1;
}
if (subscribedchannelsamount.compareandset(value, value - 1)) {
return value - 1;
}
}
}
梳理上述逻辑:
1、还是进行重复判断, 根据channelname从name2pubsubconnection中获取,看是否存在已经订阅:pubsubconnectionentry; 如果存在直接把新的listener加入到pubsubconnectionentry。
2、从队列freepubsubconnections中取公用的pubsubconnectionentry, 如果没有就进入connect()方法
2.1 会根据subscriptionsperconnection创建pubsubconnectionentry, 然后调用其tryacquire()方法 – 每调用一次就会减1
2.2 将新的pubsubconnectionentry放入全局的name2pubsubconnection, 方便后续重复使用;
2.3 同时也将pubsubconnectionentry放入队列freepubsubconnections中。- remainfreeamount > 0
2.4 后面就是进行底层的subscribe和addlistener
3、如果已经存在pubsubconnectionentry,则利用已有的pubsubconnectionentry进行tryacquire;
4、如果remainfreeamount < 0 会抛出illegalstateexception异常;如果remainfreeamount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
5、最后也是进行底层的subscribe和addlistener;
三 总结
根因: 从上面代码分析, 导致问题的根因是因为publishsubscribeservice 会使用公共队列中的freepubsubconnections, 如果同一个key一次性请求超过subscriptionsperconnection它的默认值5时,remainfreeamount就可能出现-1的情况, 那么就会导致commandexecutor.syncsubscription(future)中等待超时,也就抛出如上异常subscribe timeout: (7500ms). increase ‘subscriptionsperconnection’ and/or ‘subscriptionconnectionpoolsize’ parameters.
解决方法: 在初始化redisson可以可指定这个配置项的值。
相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-configuration#23-common-settings
到此这篇关于关于使用redisson订阅数问题的文章就介绍到这了,更多相关redisson 订阅数 内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!