延时队列(Delayed)实现(支持失败重试机制自定义重试时间)

1+

最初的业务场景就是:

需要需要使用restTemplate调用个接口并且调用失败后需要延时重复调用(最多3次),第一次5秒,第二次10秒,第三次15秒。

1. 主要功能

最起初的话思考如果只是简单这样的话其实也好实现,重写httpClient里面就有相关的超时重试机制,但是如果要是实现了某个整体的方法来进行失败重试那不能更好。所以就根据这个想法实现了以下的几个功能:

  1. 可以更简单的针对整个方法来进行延时或者有失败重试的调用执行。
  2. 可以设置首次是否延迟执行以及延迟执行的时间。
  3. 可以设置失败重试次数以及开发者并可自定义配置重试延时时间策略(默认四种:渐进步长、固定时间、固定步长、斐波那契数列)。
  4. 支持查看每次执行结果(包括失败重试的执行结果)。
  5. 执行器统一管理所有任务。
  6. 支持任务自定义顺序完成(流水线完成任务) 例如1 -> 2 -> 3,以及支持查看流水线任务中每个任务的执行结果。

2. 具体实现代码

具体实现的话是根据Delayed延时队列基础之上来实现的。关于怎么Delayed的使用在这里也就不多说了,大家自行百度学习。

1.1 使用方式:

这个我已经打包发布至maven中央仓库中,大家可以直接在项目的pom文件中添加下面依赖即可,并且在此com.b0c0.common依赖中还正在不断完善,现在是只有这个延时队列和一个通用的方法日志拦截输出。

<dependency>
    <groupId>com.b0c0</groupId>
    <artifactId>common</artifactId>
    <version>0.0.8</version>
</dependency>

1.2 使用示例:

package com.b0c0.common.delayedQueue;

import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;
import com.b0c0.common.domain.vo.GeneralResultVo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Logger;

public class GeneralDelayedQueueExecuteTest {

    private static final Logger logger = Logger.getLogger(GeneralDelayedQueueExecuteTest.class.getName());

    public static void main(String[] args) {
        GeneralDelayedQueueExecuteTest test = new GeneralDelayedQueueExecuteTest();
//        test.run();
//        try {
//            test.runAsync();
//        } catch (ExecutionException e) {
//            e.printStackTrace();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        test.runLine();
    }


    /**
     * 同步执行示例
     */
    public void run() {
        GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                new TestConsumer1(), "1", "body", 1, 500, 50);
        GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                new TestConsumer1(), "2", "body", 3, 100, 100);
        GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                new TestConsumer1(), "3", "body", 3, 150, 150);

        GeneralDelayedQueueExecute.run(delayedQueue1);
        GeneralDelayedQueueExecute.run(delayedQueue2);
        GeneralDelayedQueueExecute.run(delayedQueue3);
    }

    /**
     * 异步执行示例
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void runAsync() throws ExecutionException, InterruptedException {
        GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                new TestConsumer1(), "1", "body", 5, 500, 5);
        GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                new TestConsumer1(), "2", "body", 3, 10000, 2);
        GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                new TestConsumer1(), "3", "body", 3, 1500, 300);
        Future<GeneralResultVo<String>> future1 = GeneralDelayedQueueExecute.runAsync(delayedQueue1);
        Future<GeneralResultVo<String>> future2 = GeneralDelayedQueueExecute.runAsync(delayedQueue2);
        Future<GeneralResultVo<String>> future3 = GeneralDelayedQueueExecute.runAsync(delayedQueue3);
        System.out.println("time ->" + System.currentTimeMillis() + " future1:" + future1.get().getReslutData());
        System.out.println("time ->" + System.currentTimeMillis() + " future2:" + future2.get().getReslutData());
        System.out.println("time ->" + System.currentTimeMillis() + " future3:" + future3.get().getReslutData());

    }

    /**
     * 流水线执行示例
     */
    public void runLine() {
        GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                new TestConsumer1(), "1", "body", 2, 500, 5);
        GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                new TestConsumer2(), "2", "body", 3, 10, 2);
        GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                new TestConsumer1(), "3", "body", 3, 600, 100);
        List<GeneralDelayedQueue> list = new ArrayList<>();
        list.add(delayedQueue1);
        list.add(delayedQueue2);
        list.add(delayedQueue3);
        GeneralResultVo<String> a = GeneralDelayedQueueExecute.runLine(list);
        TestVo t = (TestVo) delayedQueue3.getBodyData().getPreResult();
        System.out.println(t.getA());
        System.out.println(a.getReslutData());

    }


    static class TestConsumer1 implements GeneralQueueConsumerable {


        @Override
        public GeneralResultVo<String> run(GeneralDelayedQueue task) {
            GeneralDelayedQueue.BodyData<String,String> resultVo = task.getBodyData();
            String body = resultVo.getBody();
            String id = task.getId();
            int currExecuteNum = task.getCurrExecuteNum();
            logger.info("thread ->" + Thread.currentThread().getId() + " time ->" + System.currentTimeMillis() + " 消费延时队列 id -> " + id + " ,第 -> " + (currExecuteNum + 1) + " 次,body -> " + body);
            if (task.getId().equals("3")) {
                return GeneralResultVo.fail();
            } else {
                return GeneralResultVo.success("sss");
            }
        }
    }

    static class TestConsumer2 implements GeneralQueueConsumerable {

        @Override
        public GeneralResultVo<TestVo> run(GeneralDelayedQueue task) {
            GeneralDelayedQueue.BodyData<String,String> resultVo = task.getBodyData();
            String body = resultVo.getBody();
            String id = task.getId();
            int currExecuteNum = task.getCurrExecuteNum();
            logger.info("thread ->" + Thread.currentThread().getId() + "time ->" + System.currentTimeMillis() + " 消费延时队列 id -> " + id + " ,第 -> " + (currExecuteNum + 1) + " 次,body -> " + body);
            TestVo testVo = new TestVo();
            testVo.setA("a");
            testVo.setB("b");
            return GeneralResultVo.success(testVo);
        }
    }

    public static class TestVo {
        private String a;
        private String b;

        public String getA() {
            return a;
        }

        public void setA(String a) {
            this.a = a;
        }

        public String getB() {
            return b;
        }

        public void setB(String b) {
            this.b = b;
        }
    }


}

 

2.1.延时队列实体类:

首先我们要是实现这个的话既然要根据Delayed延时队列机制来实现,我们就首先实现一个Delayed接口的基础延时队列实体,里面来保存我们的信息,例如延时时间、重试时间、执行次数、以及开发者根据业务需要自定义的主题内容等。该类如下所示:

package com.b0c0.common.delayedQueue;


import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;


/**
 * @program: springbootdemo
 * @description: 通用延时队列实体
 * @author: lidongsheng
 * @createData: 2020-09-21 14:01
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-21 14:01
 * @updateContent:
 * @Version: 0.0.8
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */
public class GeneralDelayedQueue<T> implements Delayed {


    public static class BodyData<T,V>{
        /**
         * 开发者自定义的需要的数据体.
         */
        private T body;

        /**
         * 如果为任务链任务,上一个任务的执行结果会保存到这里,
         */
        private V preResult;

        public T getBody() {
            return body;
        }

        protected void setBody(T body) {
            this.body = body;
        }

        public V getPreResult() {
            return preResult;
        }

        protected void setPreResult(V preResult) {
            this.preResult = preResult;
        }
    }

    //任务的唯一id
    private String id;
    /**
     * 任务的自定义数据体
     */
    private BodyData bodyData;
    /**
     * 任务当前的执行次数(可设置此值为maxExecuteNum来达到强制中断之后的重试执行)
     *
     */
    private int currExecuteNum;
    /**
     * 最大执行次数
     * 此值为1 表示只执行一次,不开启重发
     * 此值大于1 表示开启重发,并且此值为最大执行次数(包含首次执行)。
     */
    private int maxExecuteNum;

    /**
     * 任务的首次执行的延时时间,只有任务的首次执行时会用到此值进行延时执行
     */
    private long delayedTime;

    /**
     * 任务的重发延时时间,重发自定义延时策略会用到此值
     */
    private long retryTime;
    /**
     * 任务的过期时间,任务到达了过期时间就会执行
     * 检测延迟任务是否到期
     */
    private long expireTime;
    /**
     * 上次的延时时间
     */
    private long lastTime = -1;
    /**
     * 时间单位
     */
    private TimeUnit timeUnit;

    /**
     * 执行结果一直保存,可在执行器中随时获取,直至开发人员手动调用删除
     * 注意:如果设置为true了,请务必手动调用GeneralDelayedQueueExecute.clearTask 进行删除。否则任务相关信息将一直存在于内存中
     */
    private boolean keepResults;

    private GeneralQueueConsumerable consumerable;

    public String getId() {
        return id;
    }

    public <T,V>BodyData<T,V> getBodyData() {
        return bodyData;
    }

    public static<T,V> BodyData<T,V> initBodyData(T userData) {
        BodyData<T,V> bodyData= new BodyData<>();
        bodyData.setBody(userData);
        return bodyData;
    }

    public int getCurrExecuteNum() {
        return currExecuteNum;
    }

    public int getMaxExecuteNum() {
        return maxExecuteNum;
    }

    public long getDelayedTime() {
        return delayedTime;
    }

    public long getRetryTime() {
        return retryTime;
    }

    public long getLastTime() {
        return lastTime;
    }

    protected void setLastTime(long lastTime) {
        this.lastTime = lastTime;
    }

    protected void setCurrExecuteNum(int currExecuteNum) {
        this.currExecuteNum = currExecuteNum;
    }

    protected void setExpireTime(long expireTime) {
        this.expireTime = expireTime;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    public GeneralQueueConsumerable getConsumerable() {
        return consumerable;
    }

    public void setConsumerable(GeneralQueueConsumerable consumerable) {
        this.consumerable = consumerable;
    }

    public boolean isKeepResults() {
        return keepResults;
    }

    /**
     * 完整参数的构造方法
     *
     * @param consumerable  具体任务方法
     * @param id            唯一标识
     * @param userData      主题内容
     * @param keepResults   true表示执行结果一直保存,可在执行器中随时获取,直至开发人员手动调用删除
     * @param maxExecuteNum 最大执行次数
     * @param delayedTime   首次执行延时时间
     * @param retryTime     重试延时时间
     * @param timeUnit      时间单位
     */
    public GeneralDelayedQueue(GeneralQueueConsumerable consumerable,String id, T userData,boolean keepResults, int maxExecuteNum, long delayedTime, long retryTime,TimeUnit timeUnit) {
        this.consumerable = consumerable;
        this.id = id;
        this.bodyData = initBodyData(userData);
        this.keepResults = keepResults;
        this.currExecuteNum = 0;
        this.maxExecuteNum = maxExecuteNum;
        this.delayedTime = delayedTime;
        this.retryTime = retryTime;
        this.timeUnit = timeUnit;

    }


    /**
     * 构造方法 默认时间单位秒,自动捕获异常
     *
     * @param consumerable  具体任务方法
     * @param id            唯一标识
     * @param userData      主题内容
     * @param maxExecuteNum 最大执行次数
     * @param delayedTime   首次执行延时时间
     * @param retryTime     重试延时时间
     */
    public GeneralDelayedQueue(GeneralQueueConsumerable consumerable,String id, T userData, int maxExecuteNum, long delayedTime, long retryTime) {
        this(consumerable,id, userData,false, maxExecuteNum, delayedTime, retryTime, TimeUnit.MILLISECONDS);
    }


    @Override
    public int compareTo(Delayed delayed) {
        long result = this.getDelay(TimeUnit.NANOSECONDS)
                - delayed.getDelay(TimeUnit.NANOSECONDS);
        if (result < 0) {
            return -1;
        } else if (result > 0) {
            return 1;
        } else {
            return 0;
        }
    }


    /**
     * 检测延迟任务是否到期
     * 如果返回的是负数则说明到期否则还没到期
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

}

 

 

2.2  延时队列消费者

延时队列消费者,也就是开发者要进行执行的具体业务方法,开发者实现这个GeneralQueueConsumerable接口里面的run方法即可执行自己的业务逻辑,该接口如下所示:

package com.b0c0.common.delayedQueue.base;


import com.b0c0.common.delayedQueue.GeneralDelayedQueue;
import com.b0c0.common.domain.vo.GeneralResultVo;

/**
 * @program: springbootdemo
 * @description: 通用延时队列消费
 * @author: lidongsheng
 * @createData: 2020-09-21 15:01
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-21 15:01
 * @updateContent:
 * @Version: 0.0.8
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */


/**
 * 开发者实现这个GeneralQueueConsumerable接口来实现重试
 */
public interface GeneralQueueConsumerable {

    /**
     * 开发者要进行执行的具体业务方法,重写run方法即可执行自己的业务逻辑。
     * @param task 具体任务
     * @return 返回false根据重发的策略进行重发执行方法,如果设置了自定义的重发,则会根据重试机制进行重试,true表示执行结束。
     */

    <T>GeneralResultVo<T> run(GeneralDelayedQueue task);
}

 

2.3  自定义重试时间

自定义的重试时间实现RetryTimeTypeable接口里面的getTime方法即可。该接口如下所示:
package com.b0c0.common.delayedQueue.base;

import com.b0c0.common.delayedQueue.GeneralDelayedQueue;

/**
 * @program: springbootdemo
 * @description: 重试时间的type接口,自定义的重试时间实现此接口即可
 * @author: lidongsheng
 * @createData: 2020-09-25 19:07
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-25 19:07
 * @updateContent: 重试时间的type接口
 * @Version: 0.0.8
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */

public interface RetryTimeTypeable {
    /**
     * 返回延时时间
     * 开发者实现这个此接口重写里面的getTime方法即可根据具体需要进行定义重试机制延时时间。
     * @param task
     * @return 返回此次的延时时间,单位和GeneralDelayedQueue构造方法中设置的时间单位一致(默认为秒)。
     */
    long getTime(GeneralDelayedQueue task);
}

 

2.4  默认的四种重试时间实现

  1. 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
  2. 固定时间
  3. 固定步长
  4. 斐波那契数列

这四种感觉是最常用的重试时间,已经默认集成,使用时可直接调用。该类如下所示:

package com.b0c0.common.delayedQueue;


import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @program: springbootdemo
 * @description: 默认的重试时间实现接口
 * @author: lidongsheng
 * @createData: 2020-09-25 19:41
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-25 19:41
 * @updateContent:
 * @Version: 0.0.8
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */

public class DefaultRetryTimeTypeator {


    /**
     * 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
     * 栗子 retryTime = 5
     * 第一次重试延时 5 = 0 + 1 * 5
     * 第二次重试延时 15 = 5 + 2 * 5
     * 第三次重试延时  30 = 15 + 3 * 5
     * 第四次重试延时  50 = 30 + 4 * 5
     * ... 第10次重试延时  225
     * ... 第20次重试延时  950
     * ... 第30次重试延时  2175
     * ... 第40次重试延时  3900
     * ... 第50次重试延时  6125
     * @return
     */
    public static RetryTimeTypeable AdvanceStepTimeRetryTimeTypeator() {
        return new AdvanceStepTimeRetryTimeTypeator();
    }

    /**
     * 固定时间
     * 栗子 retryTime = 5
     * 第一次重试延时 5 第二次重试延时 5 第三次重试延时 5
     *
     * @return
     */
    public static RetryTimeTypeable FixDelayedRetryTimeTypeator() {
        return new FixDelayedRetryTimeTypeator();
    }

    /**
     * 固定步长
     * 栗子 retryTime = 5
     * 第一次重试延时 5 第二次重试延时 10 第三次重试延时 15
     *
     * @return
     */
    public static RetryTimeTypeable FixStepTimeRetryTimeTypeator() {
        return new FixStepTimeRetryTimeTypeator();
    }

    /**
     * 斐波那契数列 建议不要超过30
     * CurrExecuteNum = 10  return 55
     * CurrExecuteNum = 20  return 6765
     * CurrExecuteNum = 30  return 832040
     * CurrExecuteNum = 40  return 102334155
     * .....
     * @return
     */
    public static RetryTimeTypeable FibonacciSeriesRetryTimeTypeator() {
        return new FibonacciSeriesRetryTimeTypeator();
    }

    private static class AdvanceStepTimeRetryTimeTypeator implements RetryTimeTypeable {

        @Override
        public long getTime(GeneralDelayedQueue task) {
            return (task.getCurrExecuteNum() == 1 ? 0 : task.getLastTime()) + task.getCurrExecuteNum() * task.getRetryTime();
        }
    }

    private static class FixDelayedRetryTimeTypeator implements RetryTimeTypeable {

        @Override
        public long getTime(GeneralDelayedQueue task) {
            return task.getRetryTime();
        }
    }

    private static class FixStepTimeRetryTimeTypeator implements RetryTimeTypeable {
        @Override
        public long getTime(GeneralDelayedQueue task) {
            return task.getCurrExecuteNum() * task.getRetryTime();
        }
    }

    private static class FibonacciSeriesRetryTimeTypeator implements RetryTimeTypeable {
        @Override
        public long getTime(GeneralDelayedQueue task) {
            int a = 0, b = 1, sum;
            int n = task.getCurrExecuteNum();
            for (int i = 0; i < n; i++) {
                sum = a + b;
                a = b;
                b = sum;
            }
            return a;
        }
    }

//    public static void main(String[] args) {
//        AdvanceStepTimeRetryTimeTypeator retryTimeTypeator = new AdvanceStepTimeRetryTimeTypeator();
//        GeneralDelayedQueue delayedQueue = new GeneralDelayedQueue(
//                UUID.randomUUID().toString(),
//                null,
//                8, 0, 150,TimeUnit.MILLISECONDS);
//        for (int i = 0; i < 8; i++) {
//            delayedQueue.setCurrExecuteNum(i);
//            long time = retryTimeTypeator.getTime(delayedQueue);
//            delayedQueue.setLastTime(time);
//            System.out.println(time);
//        }
//    }
}

 

2.5  延时队列执行器(主要)

延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。该类如下所示:

package com.b0c0.common.delayedQueue;


import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;
import com.b0c0.common.domain.vo.GeneralResultCodeEnum;
import com.b0c0.common.domain.vo.GeneralResultVo;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.logging.Logger;


/**
 * @program: springbootdemo
 * @description: 通用延时队列执行器
 * @author: lidongsheng
 * @createData: 2020-09-21 15:50
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-21 15:50
 * @updateContent:
 * @Version: 0.0.8
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */

/**
 * 延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。
 * 并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。
 */
public class GeneralDelayedQueueExecute {

    private static final Logger logger = Logger.getLogger(GeneralDelayedQueueExecute.class.getName());

    //延时队列
    private static Map<String, DelayQueue<GeneralDelayedQueue>> delayQueueMap = new ConcurrentHashMap<>();
    //延时队列主题信息
    private static Map<String, GeneralDelayedQueue> taskMap = new ConcurrentHashMap<>();
    //重试时间的具体实现
    private static Map<String, RetryTimeTypeable> retryTimeTypeableMap = new ConcurrentHashMap<>();
    //用来保证程序执行完成之后才能获取到执行结果
    private static Map<String, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
    //存储每次执行的具体结果信息
    private static Map<String, List> resultListMap = new ConcurrentHashMap<>();
    //异步执行的线程池
    private static ExecutorService executor;

    static {
        final ThreadFactory GENERAL_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("com.b0c0.commom.delayedQueue-pool-general-%d").build();
        //核心线程池大小
        final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() << 1 + 1;
        //最大线程池大小
        final int MAX_POOL_SIZE = CORE_POOL_SIZE << 1;
        //线程任务队列大小
        final int QUEUE_CAPACITY = 500;
        //空闲线程的存活时间.默认情况下核心线程不会退出
        final int KEEP_ALIVE_TIME = 15;
        executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(QUEUE_CAPACITY), GENERAL_THREAD_FACTORY, new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    /**
     * 用户可自定义异步执行时候的线程池
     *
     * @param executor
     */
    public static void setExecutor(ExecutorService executor) {
        GeneralDelayedQueueExecute.executor = executor;
    }

    /**
     * 执行方法
     *
     * @param task              具体任务
     * @param retryTimeTypeator 重试延时时间策略
     */
    public static <T> GeneralResultVo<T> run(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator) {
        DelayQueue<GeneralDelayedQueue> queue = new DelayQueue<>();
        initTask(task, retryTimeTypeator, queue);
        return execute(task);
    }

    public static <T> GeneralResultVo<T> run(GeneralDelayedQueue task) {
        return run(task, DefaultRetryTimeTypeator.FixDelayedRetryTimeTypeator());
    }

    /**
     * 异步执行方法,可以单独为某任务根据传入一个线程池执行
     *
     * @param task              具体任务
     * @param retryTimeTypeator 重试延时时间策略
     * @param executor          用户自定义线程池
     */
    public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator, ExecutorService executor) {
        return executor.submit(() -> run(task, retryTimeTypeator));
    }

    /**
     * 异步执行方法 默认内置线程池
     *
     * @param task              具体任务
     * @param retryTimeTypeator 重试延时时间策略
     */
    public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator) {
        return executor.submit(() -> run(task, retryTimeTypeator));
    }

    /**
     * 异步执行方法  默认内置线程池
     *
     * @param task 具体任务
     */
    public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task) {
        return executor.submit(() -> run(task));
    }

    /**
     * 任务链的执行方法 自定义顺序完成(流水线完成任务) 例如A -> B -> C
     * 并且任务的执行结果会自动传递给下一任务。比如A任务的执行结果,会传递给B任务。
     * 注意:
     * 此方法返回的为流水线最后一个任务的值,若想在最后得到某个任务或者所有任务的具体执行结果,须将 GeneralDelayedQueue.keepResults,设置为true;
     *
     * @param tasks              具体任务list集合,会按照集合的添加顺序来流水线顺序执行任务
     * @param retryTimeTypeators 重试延时时间策略
     * @param <T>
     * @return 任务链执行返回值为:返回的为最后一个运行任务的返回值。
     */
    public static <T> GeneralResultVo<T> runLine(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators) {

        if (tasks == null || tasks.isEmpty() || retryTimeTypeators == null || retryTimeTypeators.isEmpty()) {
            return GeneralResultVo.fail(GeneralResultCodeEnum.PARAM_ERROR.getCode(), "任务集合和重试延时时间策略集合不能为空");
        }
        int taskSize = tasks.size();
        if (taskSize != retryTimeTypeators.size()) {
            return GeneralResultVo.fail(GeneralResultCodeEnum.PARAM_ERROR.getCode(), "任务集合和重试延时时间策略集合大小不一致,无法相互对应");
        }
        DelayQueue<GeneralDelayedQueue> queue = new DelayQueue<>();
        GeneralResultVo<T> resultVo = GeneralResultVo.fail();
        for (int i = 0; i < taskSize; i++) {
            initTask(tasks.get(i), retryTimeTypeators.get(i), queue);
            resultVo = execute(tasks.get(i));
            if (resultVo.isSuccess()) {
                if (i != 0 && i != taskSize - 1) {
                    tasks.get(i + 1).getBodyData().setPreResult(resultVo.getReslutData());
                }
            } else {
                break;
            }
        }
        return resultVo;
    }

    public static <T> GeneralResultVo<T> runLine(List<GeneralDelayedQueue> tasks) {
        int taskSize = tasks.size();
        List<RetryTimeTypeable> retryTimeTypeators = new ArrayList<>();
        for (int i = 0; i < taskSize; i++) {
            retryTimeTypeators.add(DefaultRetryTimeTypeator.FixDelayedRetryTimeTypeator());
        }
        return runLine(tasks, retryTimeTypeators);
    }

    /**
     * 异步执行任务链方法,可以单独为某任务根据传入一个线程池执行
     *
     * @param tasks              具体任务
     * @param retryTimeTypeators 重试延时时间策略
     * @param executor           用户自定义线程池
     */
    public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators, ExecutorService executor) {
        return executor.submit(() -> runLine(tasks, retryTimeTypeators));
    }

    /**
     * 异步执行任务链方法 默认内置线程池
     *
     * @param tasks              具体任务
     * @param retryTimeTypeators 重试延时时间策略
     */
    public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators) {
        return executor.submit(() -> runLine(tasks, retryTimeTypeators));
    }

    /**
     * 异步执行任务链方法  默认内置线程池
     *
     * @param tasks 具体任务
     */
    public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks) {
        return executor.submit(() -> runLine(tasks));
    }

    private static <T> GeneralResultVo<T> execute(GeneralDelayedQueue task) {

        GeneralResultVo<T> result = GeneralResultVo.fail();
        String id = task.getId();
        RetryTimeTypeable retryTimeTypeator = retryTimeTypeableMap.get(id);
        List<GeneralResultVo<T>> resultList = resultListMap.get(id);
        CountDownLatch countDownLatch = countDownLatchMap.get(id);
        DelayQueue<GeneralDelayedQueue> queue = delayQueueMap.get(id);
        try {
            result = task.getConsumerable().run(queue.take());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            task.setLastTime(retryTimeTypeator.getTime(task));
            //添加执行结果
            resultList.add(result);
            countDownLatch.countDown();
            //延时执行
            if (task.getCurrExecuteNum() < task.getMaxExecuteNum() - 1 && !result.isSuccess()) {
                task.setCurrExecuteNum(task.getCurrExecuteNum() + 1);
                setExpireTime(task);
                queue.offer(task);
                execute(task);
            } else {
                while ((countDownLatch.getCount()) > 0) {
                    countDownLatch.countDown();
                }
            }
            if (!task.isKeepResults()) {
                clearTask(task.getId());
            }
        }
        return result;
    }

    private static void setExpireTime(GeneralDelayedQueue task) {
        long expireTime = 0;
        RetryTimeTypeable retryTimeTypeator = retryTimeTypeableMap.get(task.getId());
        if (task.getCurrExecuteNum() == 0) {
            expireTime = TimeUnit.NANOSECONDS.convert(
                    task.getDelayedTime(), task.getTimeUnit()) + System.nanoTime();
        } else {
            expireTime = TimeUnit.NANOSECONDS.convert(
                    retryTimeTypeator.getTime(task), task.getTimeUnit()) + System.nanoTime();
        }
        task.setExpireTime(expireTime);
    }

    /**
     * 得到全部的执行结果
     *
     * @param taskId     任务id标识
     * @param fastReturn 立即返回 true 代表立即返回, false 代表必须等到最大执行次数后返回(list.size = maxExecuteNum)
     * @param outTime    超时时间
     * @param timeUnit   时间单位
     * @return 执行结果列表
     */
    public static <T> List<GeneralResultVo<T>> getResultList(String taskId, boolean fastReturn, long outTime, TimeUnit timeUnit) {
        try {
            if (taskMap.containsKey(taskId)) {
                if (!fastReturn) {
                    awaitCountDown(taskId, outTime, timeUnit);
                }
                return resultListMap.get(taskId);
            } else {
                GeneralResultVo generalResultVo = GeneralResultVo.fail(
                        GeneralResultCodeEnum.TASK_EXIST.getCode(), GeneralResultCodeEnum.TASK_EXIST.getDesc());
                List<GeneralResultVo<T>> res = new ArrayList<>();
                res.add(generalResultVo);
                return res;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 等待任务全部完成
     *
     * @param id       任务id
     * @param timeOut  等待超时时间
     * @param timeUnit 时间单位
     * @return countDownLatch当前计数值
     * @throws InterruptedException
     */
    private static long awaitCountDown(String id, Long timeOut, TimeUnit timeUnit) throws InterruptedException {
        CountDownLatch countDownLatch = countDownLatchMap.get(id);
        if (timeOut == null) {
            countDownLatch.await();
        } else {
            countDownLatch.await(timeOut, timeUnit);
        }
        return countDownLatch.getCount();
    }

    /**
     * 根据任务初始化任务信息
     *
     * @param task
     * @param retryTimeTypeator
     */
    private static void initTask(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator, DelayQueue<GeneralDelayedQueue> queue) {
        String id = task.getId();
        delayQueueMap.put(id, queue);
        taskMap.put(id, task);
        retryTimeTypeableMap.put(id, retryTimeTypeator);
        CountDownLatch countDownLatch = new CountDownLatch(task.getMaxExecuteNum());
        countDownLatchMap.put(id, countDownLatch);
        setExpireTime(task);
        resultListMap.put(id, new ArrayList<>());
        queue.offer(task);
    }


    /**
     * 根据任务id清除任务全部的map信息
     * GeneralDelayedQueue的keepResults如果设置为true了,请务必手动调用此方法进行删除。否则任务相关信息将一直存在于内存中
     *
     * @param taskId 任务id
     */
    public static void clearTask(String taskId) {
        CountDownLatch countDownLatch = countDownLatchMap.get(taskId);
        while (countDownLatch != null && countDownLatch.getCount() > 0) {
            countDownLatch.countDown();
        }
        delayQueueMap.remove(taskId);
        taskMap.remove(taskId);
        countDownLatchMap.remove(taskId);
        taskMap.remove(taskId);
        resultListMap.remove(taskId);
        retryTimeTypeableMap.remove(taskId);
    }
}

 

关于更多com.b00c.common 依赖的源码详情、使用方法、更新历史请去下面这个链接查看。

https://github.com/DeBug-Bug/common

1+

发表评论

邮箱地址不会被公开。

6 条评论 “延时队列(Delayed)实现(支持失败重试机制自定义重试时间)”