目录
- 生产消息服务
- 消费消息服务,定时任务
- 日志
- 测试
leftpush消息入队,rightpop对应,消息出队。
rightpop(redisconstant.mq_list, 0l, timeunit.seconds)阻塞出队,0表示永久阻塞
生产消息服务
@service
public class redisservice {
@autowired
private redistemplate<string, string> redistemplate;
public object publish() {
orderdto dto = new orderdto();
dto.setid(1);
dto.setcreatetime(new date());
dto.setmoney("12.34");
dto.setorderno("orderno1");
string s = json.tojsonstring(dto);
listoperations<string, string> listoperations = redistemplate.opsforlist();
//leftpush和rightpop对应,左边入队,右边出队
listoperations.leftpush(redisconstant.mq_list, s);
//因为出队是阻塞读取的,所以上一步入队后,数据立刻就被驱走了,下一步size=0
long size = listoperations.size(redisconstant.mq_list);
list<string> list = new arraylist<>();
if (size != null && size > 0) {
list = listoperations.range(redisconstant.mq_list, 0, size - 1);
}
return list;
}
}
测试
@restcontroller
@requestmapping("redislist")
public class redislistcontroller {
@autowired
private redisservice redisservice;
@getmapping("publish")
public object publish() {
return redisservice.publish();
}
}
消费消息服务,定时任务
@component
public class redisconsumetask {
@autowired
private redisservice redisservice;
@tasklock(redisconstant.consume_redis_list)
@scheduled(cron = "0/10 * * * * ?")
public void consumemqlist() {
redisservice.consumemqlist();
}
}
@service
@slf4j
public class redisservice {
@autowired
private redistemplate<string, string> redistemplate;
public void consumemqlist() {
listoperations<string, string> listoperations = redistemplate.opsforlist();
//0时间,表示阻塞永久
//待机一小时后,再次发消息,消费不了了,阻塞有问题啊。还得轮寻啊
//string s = listoperations.rightpop(redisconstant.mq_list, 0l, timeunit.seconds);
string s = listoperations.rightpop(redisconstant.mq_list);
if (s == null) {
return;
}
log.info("{} = {}", redisconstant.mq_list, s);
orderdto dto = json.parseobject(s, orderdto.class);
log.info("dto = {}", dto);
}
}
日志
@component
@aspect
public class tasklockaop {
@autowired
private redislockregistry redislockregistry;
@around("execution(@tasklock * * (..))")
public object taskaround(proceedingjoinpoint pjp) throws throwable {
tasklock taskannotation = ((methodsignature)pjp.getsignature()).getmethod().getannotation(tasklock.class);
string lockkey = taskannotation.value();
lock lock = redislockregistry.obtain(lockkey);
try {
lock.trylock(30l, timeunit.seconds);
system.out.println("任务开始, " + lockkey + ", " + new date());
return pjp.proceed();
} finally {
lock.unlock();
system.out.println("任务结束, " + lockkey + ", " + new date());
}
}
}
测试
http://localhost:9040/redislist/publish
[“{“createtime”:1574394538430,“id”:1,“money”:“12.34”,“orderno”:“orderno1”}”]
下面一直阻塞,任务开始了,不收到消息,永远不会结束。
阻塞有问题,改用轮询了。
先启动发送消息服务,发送消息。后启动消费消息服务,可以消费消息。这一点,比发布订阅要稳定。
关联项目https://github.com/mingwulipo/cloud-demo.git
到此这篇关于redis用list做消息队列的实现示例的文章就介绍到这了,更多相关redis list消息队列内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!