Redis延迟队列和分布式延迟队列的简答实现

        最近,又重新学习了下redis,redis不仅能快还能慢,简直利器,今天就为大家介绍一下redis延迟队列和分布式延迟队列的简单实现。

  在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。

1. 实现一个简单的延迟队列。

  我们知道目前java可以有delayedqueue,我们首先开一个delayqueue的结构类图。delayqueue实现了delay、blockingqueue接口。也就是delayqueue是一种阻塞队列。

  我们在看一下delay的类图。delayed接口也实现了comparable接口,也就是我们使用delayed的时候需要实现compareto方法。因为队列中的数据需要排一下先后,根据我们自己的实现。delayed接口里边有一个方法就是getdelay方法,用于获取延迟时间,判断是否时间已经到了延迟的时间,如果到了延迟的时间就可以从队列里边获取了。

  我们创建一个message类,实现了delayed接口,我们主要把getdelay和compareto进行实现。在message的构造方法的地方传入延迟的时间,单位是毫秒,计算好触发时间firetime。同时按照延迟时间的升序进行排序。我重写了里边的tostring方法,用于将message按照我写的方法进行输出。

  里边的main方法里边声明了两个message,一个延迟5秒,一个延迟7秒,时间到了之后会将接取出并且打印。输出的结果如下,正是我们所期望的。

1587218430786:start
1587218435789:hello
1587218437793:world

  这个方法实现起来真的非常简单。但是缺点也是很明显的,就是数据在内存里边,数据比较容易丢失。那么我们需要采用redis实现分布式的任务处理。

  2. 使用redis的list实现分布式延迟队列。

  本地需要安装一个redis,我自己是使用docker构建一个redis,非常快速,命令也没多少。我们直接启动redis并且暴露6379端口。进入之后直接使用客户端命令即可查看和调试数据。

  我本地采用spring-boot的方式连接redis,pom文件列一下,供大家参考。

  加上redis的配置放到application.properties里边即可实现redis连接,非常的方便。

  接下来实现一个基于redis的list数据类型进行实现的一个类。我们使用redistemplate操作redis,这个里边封装好我们所需要的redis的一些方法,用起来非常方便。这个类允许延迟任务做多有10w个,也是避免数据量过大对redis造成影响。如果在线上使用的时候也需要考虑延迟任务的多少。太多几百万几千万的时候可能数据量非常大,我们需要计算redis的空间是否够。这个代码也是非常的简单,一个用于存放需要延迟的消息,采用offer的方法。另外一个是启动一个线程, 如果消息时间到了,那么就将数据lpush到redis里边。

  接下来我们看一下,我们写一个测试的controller。大家看一下这个请求/redis/listdelayedqueue的代码位置。我们也是生成了两个消息,然后把消息放到队列里边,另外我们在启动一个线程任务,用于将数据从redis的list中获取。方法也非常简单。

  我就不把运行结果写出来了,感兴趣的同学自己自行试验。当然这个方法也是从内存中拿出数据,到时间之后放到redis里边,还是会存在程序启动的时候,任务进行丢失。我们继续看另外一种方法更好的进行这个问题的处理。

3.使用redis的zset实现分布式延迟队列。

  我们需要再写一个zset的队列处理。下边的offermessage主要是把消息直接放入缓存中。采用redis的zset的zadd方法。zadd(key, value, score) 即将key=value的数据赋予一个score, 放入缓存中。score就是计算出来延迟的毫秒数。

  上边的controller方法已经写好了测试的方法。/redis/zsetdelayedqueue,里边主要使用zset的zrangebyscore(key, min, max)。主要是从score从0,当前时间的毫秒数获取。取出数据后再采用removerangebyscore,将数据删除。这样数据可以直接写到redis里边,然后取出数据后直接处理。这种方法比前边的方法稍微好一些,但是实际上还存在一些问题,因为依赖redis,如果redis内存不足或者连不上的时候,系统将变得不可用。

4. 总结一下,另外还有哪些可以延迟队列。

  上面的方法其实还是存在问题的,比如系统重启的时候还是会造成任务的丢失。所以我们在生产上使用的时候,我们还需要将任务保存起来,比如放到数据库和文件存储系统将数据存储起来,这样做到double-check,双重检查,最终达到任务的99.999%能够处理。

  其实还有很多东西可以实现延迟队列。

  1) rabbitmq就可以实现此功能。这个消息队列可以把数据保存起来并且进行处理。

  2)kafka也可以实现这个功能。

  3)netty的hashedwheeltimer也可以实现这个功能。

最后放上我的代码: https://github.com/stonehqs/delayqueue

到此这篇关于redis延迟队列和分布式延迟队列的简答实现的文章就介绍到这了,更多相关redis延迟队列和分布式延迟队列内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐