首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了。这个需求如果不是准时通知,而是每天定点通知就简单了。如果需要准时通知就只能上延迟队列了。使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等。
怎么封装便于业务使用。
1.首先定义一个延迟job,里面包含一个map参数,和队列执行器的具体实现class,触发任务执行时,map参数会被传递到具体的业务执行器实现内
1 2 3 4 5 6 7 8 |
/** * Created by kl on 2018/7/20. * Content :延时job */ public class DelayJob { private Map jobParams;//job执行参数 private Class aClass;//具体执行实例实现 } |
2.定义一个延迟job执行器接口,业务需要实现这个接口,然后在execute方法内写自己的业务逻辑
1 2 3 4 5 6 7 |
/** * Created by kl on 2018/7/20. * Content :延时job执行器接口 */ public interface ExecuteJob { void execute(DelayJob job); } |
3.消费已经到点的延时job服务,通过job参数调用业务执行器实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
@Component public class JobTimer { static final String jobsTag = "customer_jobtimer_jobs"; @Autowired private RedissonClient client; @Autowired private ApplicationContext context; ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); @PostConstruct public void startJobTimer() { RBlockingQueueblockingQueue = client.getBlockingQueue(jobsTag); new Thread() { @Override public void run() { while (true) { try { DelayJob job = blockingQueue.take(); executorService.execute(new ExecutorTask(context, job)); } catch (Exception e) { e.printStackTrace(); try { TimeUnit.SECONDS.sleep(60); } catch (Exception ex) { } } } } }.start(); } class ExecutorTask implements Runnable { private ApplicationContext context; private DelayJob delayJob; public ExecutorTask(ApplicationContext context, DelayJob delayJob) { this.context = context; this.delayJob = delayJob; } @Override public void run() { ExecuteJob service = (ExecuteJob) context.getBean(delayJob.getaClass()); service.execute(delayJob); } } } |
4.封装延时job服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
/** * Created by kl on 2018/7/20. * Content :延时job服务 */ @Component public class DelayJobService { @Autowired private RedissonClient client; public void submitJob(DelayJob job, Long delay, TimeUnit timeUnit){ RBlockingQueueblockingQueue = client.getBlockingQueue(JobTimer.jobsTag); RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue); delayedQueue.offer(job,delay,timeUnit); } } |