redis实现延时队列的两种方式(小结)

背景

项目中的流程监控,有几种节点,需要监控每一个节点是否超时。按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢;2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟。所以就想到用延迟队列的方式去实现。

一,redis的过期key监控

1,开启过期key监听

在redis的配置里把这个注释去掉

notify-keyspace-events ex

然后重启redis

2,使用redis过期监听实现延迟队列

继承keyexpirationeventmessagelistener类,实现父类的方法,就可以监听key过期时间了。当有key过期,就会执行这里。这里就把需要的key过滤出来,然后发送给kafka队列。

@component
@slf4j
public class rediskeyexpirationlistener extends keyexpirationeventmessagelistener {

  @autowired
  private kafkaproducerservice kafkaproducerservice;

  public rediskeyexpirationlistener(redismessagelistenercontainer listenercontainer) {
    super(listenercontainer);
  }

  /**
   * 针对 redis 数据失效事件,进行数据处理
   * @param message
   * @param pattern
   */
  @override
  public void onmessage(message message, byte[] pattern){
    if(message == null || stringutils.isempty(message.tostring())){
      return;
    }
    string content = message.tostring();
    //key的格式为  flag:时效类型:运单号 示例如下
    try {
      if(content.startswith(abnconstant.ems)){
        kafkaproducerservice.sendmessagesync(topicconstant.ems_waybill_abn_queue,content);
      }else if(content.startswith(abnconstant.yunda)){
        kafkaproducerservice.sendmessagesync(topicconstant.yunda_waybill_abn_queue,content);
      }
    } catch (exception e) {
      log.error("监控过期key,发送kafka异常,",e);
    }
  }
}

可以看的出来,这种方式其实是很简单的,但是有几个问题需要注意,一是,这个尽量单机运行,因为多台机器都会执行,浪费cpu,增加数据库负担。二是,机器频繁部署的时候,如果有时间间隔,会出现数据的漏处理。

二,redis的zset实现延迟队列

1,生产者实现

可以看到生产者很简单,其实就是利用zset的特性,给一个zset添加元素而已,而时间就是它的score。

public void produce(integer taskid, long exetime) {
  system.out.println("加入任务, taskid: " + taskid + ", exetime: " + exetime + ", 当前时间:" + localdatetime.now());
  redisops.getjedis().zadd(redisops.key, exetime, string.valueof(taskid));
}

2,消费者实现

消费者的代码也不难,就是把已经过期的zset中的元素给删除掉,然后处理数据。

public void consumer() {
  executors.newsinglethreadexecutor().submit(new runnable() {
    @override
    public void run() {
      while (true) {
        set<string> taskidset = redisops.getjedis().zrangebyscore(redisops.key, 0, system.currenttimemillis(), 0, 1);
        if (taskidset == null || taskidset.isempty()) {
          system.out.println("没有任务");
 
        } else {
          taskidset.foreach(id -> {
            long result = redisops.getjedis().zrem(redisops.key, id);
            if (result == 1l) {
              system.out.println("从延时队列中获取到任务,taskid:" + id + " , 当前时间:" + localdatetime.now());
            }
          });
        }
        try {
          timeunit.milliseconds.sleep(100);
        } catch (interruptedexception e) {
          e.printstacktrace();
        }
      }
    }
  });
}

可以看到这种方式其实是比上个方式要好的。因为,他的那两个缺点都被克服掉了。多台机器也没事儿,也不用再担心部署时间间隔长的问题。

总结

两个方式都是不错的,都能解决问题。碰到问题,多思考,多总结。

到此这篇关于redis实现延时队列的两种方式(小结)的文章就介绍到这了,更多相关redis 延时队列内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐