Redis
主页 > 数据库 > Redis >

分布式利器redis及redisson的延迟队列实践

2022-03-01 | 秩名 | 点击:

前言碎语

首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了。这个需求如果不是准时通知,而是每天定点通知就简单了。如果需要准时通知就只能上延迟队列了。使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等。

延迟队列多种实现方式

redisson中的延迟队列实现

怎么封装便于业务使用。

1.首先定义一个延迟job,里面包含一个map参数,和队列执行器的具体实现class,触发任务执行时,map参数会被传递到具体的业务执行器实现内

1

2

3

4

5

6

7

8

/**

 * Created by kl on 2018/7/20.

 * Content :延时job

 */

public class DelayJob {

    private Map jobParams;//job执行参数

    private Class aClass;//具体执行实例实现

}

2.定义一个延迟job执行器接口,业务需要实现这个接口,然后在execute方法内写自己的业务逻辑

1

2

3

4

5

6

7

/**

 * Created by kl on 2018/7/20.

 * Content :延时job执行器接口

 */

public interface ExecuteJob {

     void execute(DelayJob job);

}

3.消费已经到点的延时job服务,通过job参数调用业务执行器实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

@Component

public class JobTimer {

    static final String jobsTag = "customer_jobtimer_jobs";

    @Autowired

    private RedissonClient client;

    @Autowired

    private ApplicationContext context;

    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    @PostConstruct

    public void startJobTimer() {

        RBlockingQueueblockingQueue = client.getBlockingQueue(jobsTag);

        new Thread() {

            @Override

            public void run() {

                while (true) {

                    try {

                        DelayJob job = blockingQueue.take();

                        executorService.execute(new ExecutorTask(context, job));

                    } catch (Exception e) {

                        e.printStackTrace();

                        try {

                            TimeUnit.SECONDS.sleep(60);

                        } catch (Exception ex) {

                        }

                    }

                }

            }

        }.start();

    }

    class ExecutorTask implements Runnable {

        private ApplicationContext context;

        private DelayJob delayJob;

        public ExecutorTask(ApplicationContext context, DelayJob delayJob) {

            this.context = context;

            this.delayJob = delayJob;

        }

        @Override

        public void run() {

            ExecuteJob service = (ExecuteJob) context.getBean(delayJob.getaClass());

            service.execute(delayJob);

        }

    }

}

4.封装延时job服务

1

2

3

4

5

6

7

8

9

10

11

12

13

14

/**

 * Created by kl on 2018/7/20.

 * Content :延时job服务

 */

@Component

public class DelayJobService {

    @Autowired

    private RedissonClient client;

    public void submitJob(DelayJob job, Long delay, TimeUnit timeUnit){

        RBlockingQueueblockingQueue = client.getBlockingQueue(JobTimer.jobsTag);

        RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);

        delayedQueue.offer(job,delay,timeUnit);

    }

}

原文链接:http://www.kailing.pub/article/index/arcid/207.html
相关文章
最新更新