延时队列(Delayed)实现(支持失败重试机制自定义重试时间)

1+

最初的业务场景就是:

需要需要使用restTemplate调用个接口并且调用失败后需要延时重复调用(最多3次),第一次5秒,第二次10秒,第三次15秒。

1. 主要功能

最起初的话思考如果只是简单这样的话其实也好实现,重写httpClient里面就有相关的超时重试机制,但是如果要是实现了某个整体的方法来进行失败重试那不能更好。所以就根据这个想法实现了以下的几个功能:

  1. 可以更简单的针对整个方法来进行延时或者有失败重试的调用执行。
  2. 可以设置首次是否延迟执行以及延迟执行的时间。
  3. 可以设置失败重试调用的次数以及延迟时间。
  4. 失败重试调用的延迟时间开发者可以根据自身业务需要自定义(无侵入式),当然也内置了4钟常见的重试延时时间机制可以供调用。

2. 具体实现代码

具体实现的话是根据Delayed延时队列基础之上来实现的。关于怎么Delayed的使用在这里也就不多说了,大家自行百度学习。

 

2.1.延时队列实体类:

首先我们要是实现这个的话既然要根据Delayed延时队列机制来实现,我们就首先实现一个Delayed接口的基础延时队列实体,里面来保存我们的信息,例如延时时间、重试时间、执行次数、以及开发者根据业务需要自定义的主题内容等。该类如下所示:

package com.b0c0.common.delayedQueue;


import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;


/**
 * @program: springbootdemo
 * @description: 通用延时队列实体
 * @author: lidongsheng
 * @createData: 2020-09-21 14:01
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-21 14:01
 * @updateContent:
 * @Version: 1.0.0
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */
public class GeneralDelayedQueue implements Delayed {

    // 每次请求的唯一id
    private String requestId;
    //主题内容
    private String body;
    //当前的执行次数(可设置此值为maxExecuteNum来达到强制中断之后的重试执行)
    private int currExecuteNum;
    //最大执行次数 如果大于1 表示开启失败重试
    private int maxExecuteNum;
    //延时时间
    private long delayedTime;
    //重试时间
    private long retryTime;
    //过期时间
    private long expireTime;
    //上次的延时时间
    private long lastTime = -1;

    //时间单位
    private TimeUnit timeUnit;


    public String getRequestId() {
        return requestId;
    }

    public String getBody() {
        return body;
    }

    public int getCurrExecuteNum() {
        return currExecuteNum;
    }

    public int getMaxExecuteNum() {
        return maxExecuteNum;
    }

    public long getDelayedTime() {
        return delayedTime;
    }

    public long getRetryTime() {
        return retryTime;
    }

    public long getLastTime() {
        return lastTime;
    }

    protected void setLastTime(long lastTime) {
        this.lastTime = lastTime;
    }

    protected void setCurrExecuteNum(int currExecuteNum) {
        this.currExecuteNum = currExecuteNum;
    }

    protected void setExpireTime(long expireTime) {
        this.expireTime = expireTime;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    /**
     * 完整参数的构造方法
     *
     * @param requestId     唯一标识
     * @param body          主题内容
     * @param maxExecuteNum 最大执行次数
     * @param delayedTime   首次执行延时时间
     * @param retryTime     重试延时时间
     * @param timeUnit      时间单位
     */
    public GeneralDelayedQueue(String requestId, String body, int maxExecuteNum, long delayedTime, long retryTime,TimeUnit timeUnit) {
        this.requestId = requestId;
        this.body = body;
        this.currExecuteNum = 0;
        this.maxExecuteNum = maxExecuteNum;
        this.delayedTime = delayedTime;
        this.retryTime = retryTime;
        this.timeUnit = timeUnit;
    }


    /**
     * 构造方法 默认时间单位秒,自动捕获异常
     *
     * @param requestId     唯一标识
     * @param body          主题内容
     * @param maxExecuteNum 最大执行次数
     * @param delayedTime   首次执行延时时间
     * @param retryTime     重试延时时间
     */
    public GeneralDelayedQueue(String requestId, String body, int maxExecuteNum, long delayedTime, long retryTime) {
        this(requestId, body, maxExecuteNum, delayedTime, retryTime, TimeUnit.SECONDS);
    }


    @Override
    public int compareTo(Delayed delayed) {
        long result = this.getDelay(TimeUnit.NANOSECONDS)
                - delayed.getDelay(TimeUnit.NANOSECONDS);
        if (result < 0) {
            return -1;
        } else if (result > 0) {
            return 1;
        } else {
            return 0;
        }
    }


    /**
     * 检测延迟任务是否到期
     * 如果返回的是负数则说明到期否则还没到期
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

}

 

2.2  延时队列消费者

延时队列消费者,也就是开发者要进行执行的具体业务方法,开发者实现这个GeneralQueueConsumerable接口里面的run方法即可执行自己的业务逻辑,该接口如下所示:

package com.b0c0.common.delayedQueue.base;


import com.b0c0.common.delayedQueue.GeneralDelayedQueue;

/**
 * @program: springbootdemo
 * @description: 通用延时队列消费
 * @author: lidongsheng
 * @createData: 2020-09-21 15:01
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-21 15:01
 * @updateContent:
 * @Version: 1.0.0
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */

public interface GeneralQueueConsumerable {

    /**
     * 开发者要进行执行的具体业务方法,开发者实现这个GeneralQueueConsumerable接口重写里面的run方法即可执行自己的业务逻辑。
     * @param task 延时队列实体
     * @return false表示执行业务失败,如果设置了失败重试,则会根据重试机制进行重试,true表示执行成功
     */
    boolean run(GeneralDelayedQueue task);
}

 

2.3  自定义重试时间

自定义的重试时间实现RetryTimeTypeable接口里面的getTime方法即可。该接口如下所示:
package com.b0c0.common.delayedQueue.base;

import com.b0c0.common.delayedQueue.GeneralDelayedQueue;

/**
 * @program: springbootdemo
 * @description: 重试时间的type接口,自定义的重试时间实现此接口即可
 * @author: lidongsheng
 * @createData: 2020-09-25 19:07
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-25 19:07
 * @updateContent: 重试时间的type接口
 * @Version: 1.0.0
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */

public interface RetryTimeTypeable {
    /**
     * 返回延时时间
     * 开发者实现这个此接口重写里面的getTime方法即可根据具体需要进行定义重试机制延时时间。
     * @param task
     * @return 返回此次的延时时间,单位和GeneralDelayedQueue构造方法中设置的时间单位一致(默认为秒)。
     */
    long getTime(GeneralDelayedQueue task);
}

 

2.4  默认的四种重试时间实现

  1. 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
  2. 固定时间
  3. 固定步长
  4. 斐波那契数列

这四种感觉是最常用的重试时间,已经默认集成,使用时可直接调用。该类如下所示:

package com.b0c0.common.delayedQueue;


import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;

import java.util.UUID;

/**
 * @program: springbootdemo
 * @description: 默认的重试时间实现接口
 * @author: lidongsheng
 * @createData: 2020-09-25 19:41
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-25 19:41
 * @updateContent:
 * @Version: 1.0.0
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */

public class DefaultRetryTimeTypeator {


    /**
     * 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
     * 栗子 retryTime = 5
     * 第一次重试延时 5 = 0 + 1 * 5
     * 第二次重试延时 15 = 5 + 2 * 5
     * 第三次重试延时  30 = 15 + 3 * 5
     * 第四次重试延时  50 = 30 + 4 * 5
     * ... 第10次重试延时  225
     * ... 第20次重试延时  950
     * ... 第30次重试延时  2175
     * ... 第40次重试延时  3900
     * ... 第50次重试延时  6125
     * @return
     */
    public static RetryTimeTypeable AdvanceStepTimeRetryTimeTypeator() {
        return new AdvanceStepTimeRetryTimeTypeator();
    }

    /**
     * 固定时间
     * 栗子 retryTime = 5
     * 第一次重试延时 5 第二次重试延时 5 第三次重试延时 5
     *
     * @return
     */
    public static RetryTimeTypeable FixDelayedRetryTimeTypeator() {
        return new FixDelayedRetryTimeTypeator();
    }

    /**
     * 固定步长
     * 栗子 retryTime = 5
     * 第一次重试延时 5 第二次重试延时 10 第三次重试延时 15
     *
     * @return
     */
    public static RetryTimeTypeable FixStepTimeRetryTimeTypeator() {
        return new FixStepTimeRetryTimeTypeator();
    }

    /**
     * 斐波那契数列 建议不要超过30
     * CurrExecuteNum = 10  return 55
     * CurrExecuteNum = 20  return 6765
     * CurrExecuteNum = 30  return 832040
     * CurrExecuteNum = 40  return 102334155
     * .....
     * @return
     */
    public static RetryTimeTypeable FibonacciSeriesRetryTimeTypeator() {
        return new FibonacciSeriesRetryTimeTypeator();
    }

    private static class AdvanceStepTimeRetryTimeTypeator implements RetryTimeTypeable {

        @Override
        public long getTime(GeneralDelayedQueue task) {
            return (task.getCurrExecuteNum() == 1 ? 0 : task.getLastTime()) + task.getCurrExecuteNum() * task.getRetryTime();
        }
    }

    private static class FixDelayedRetryTimeTypeator implements RetryTimeTypeable {

        @Override
        public long getTime(GeneralDelayedQueue task) {
            return task.getRetryTime();
        }
    }

    private static class FixStepTimeRetryTimeTypeator implements RetryTimeTypeable {
        @Override
        public long getTime(GeneralDelayedQueue task) {
            return task.getCurrExecuteNum() * task.getRetryTime();
        }
    }

    private static class FibonacciSeriesRetryTimeTypeator implements RetryTimeTypeable {
        @Override
        public long getTime(GeneralDelayedQueue task) {
            int a = 0, b = 1, sum;
            int n = task.getCurrExecuteNum();
            for (int i = 0; i < n; i++) {
                sum = a + b;
                a = b;
                b = sum;
            }
            return a;
        }
    }
}

 

2.5  延时队列执行器(主要)

延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。该类如下所示:

package com.b0c0.common.delayedQueue;


import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;
import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;


/**
 * @program: springbootdemo
 * @description: 通用延时队列执行器
 * @author: lidongsheng
 * @createData: 2020-09-21 15:50
 * @updateAuthor: lidongsheng
 * @updateData: 2020-09-21 15:50
 * @updateContent:
 * @Version: 1.0.0
 * @email: lidongshenglife@163.com
 * @blog: https://www.b0c0.com
 * @csdn: https://blog.csdn.net/LDSWAN0
 * ************************************************
 * Copyright @ 李东升 2020. All rights reserved
 * ************************************************
 */

/**
 * 延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。
 * 并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。
 */
public class GeneralDelayedQueueExecute implements Runnable {


    //队列消费者
    private GeneralQueueConsumerable consumer;

    //延时队列
    private DelayQueue<GeneralDelayedQueue> queue = new DelayQueue<>();

    //延时队列实体
    private GeneralDelayedQueue task;

    //重试时间的具体实现
    private RetryTimeTypeable retryTimeTypeator;

    /**
     * 构造方法(默认为FixDelayedRetryTimeTypeator延时重试方法)
     *
     * @param consumer 具体的消费者
     * @param task     延时队列实体
     */
    public GeneralDelayedQueueExecute(GeneralQueueConsumerable consumer, GeneralDelayedQueue task) {
        this.consumer = consumer;
        this.task = task;
        this.retryTimeTypeator = DefaultRetryTimeTypeator.FixDelayedRetryTimeTypeator();
        setExpireTime();
        queue.offer(task);
    }

    /**
     * 构造方法
     *
     * @param consumer 具体的消费者
     * @param task     延时队列实体
     */
    public GeneralDelayedQueueExecute(GeneralQueueConsumerable consumer, GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator) {
        this.consumer = consumer;
        this.task = task;
        this.retryTimeTypeator = retryTimeTypeator;
        setExpireTime();
        queue.offer(task);
    }

    @Override
    public void run() {
        boolean result = false;
        try {
            result = consumer.run(queue.take());
            task.setLastTime(retryTimeTypeator.getTime(task));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (task.getCurrExecuteNum() < task.getMaxExecuteNum() - 1 && !result) {
                task.setCurrExecuteNum(task.getCurrExecuteNum() + 1);
                setExpireTime();
                queue.offer(task);
                this.run();
            }
        }
    }

    private void setExpireTime() {
        long expireTime = 0;
        if (task.getCurrExecuteNum() == 0) {
            expireTime = TimeUnit.NANOSECONDS.convert(
                    task.getDelayedTime(), task.getTimeUnit()) + System.nanoTime();
        } else {
            expireTime = TimeUnit.NANOSECONDS.convert(
                    retryTimeTypeator.getTime(task), task.getTimeUnit()) + System.nanoTime();
        }
        task.setExpireTime(expireTime);
    }

}

 

3. 使用示例测试类

这个我已经打包发布至maven中央仓库中,大家可以直接在项目的pom文件中添加下面依赖即可,并且在此com.b00c.common依赖中还正在不断完善,现在是只有这个延时队列和一个通用的方法日志拦截输出:

<dependency>
    <groupId>com.b0c0</groupId>
    <artifactId>common</artifactId>
    <version>0.0.4</version>
</dependency>

 

就此整体的就好了,在此贴出一个使用示例类,这个TestConsumer 就是开发者自己要实现的接口,根据自己的业务需求进行执行的具体方法:

package com.b0c0.common.delayedQueue;


import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;

import java.util.UUID;

public class GeneralDelayedQueueExecuteTest {

    public static void main(String[] args) {

        GeneralDelayedQueue delayedQueue = new GeneralDelayedQueue(
                UUID.randomUUID().toString(),
                "jsonbody",
                4, 5, 5);
        new GeneralDelayedQueueExecute(
                new TestConsumer(),
                delayedQueue,
                DefaultRetryTimeTypeator.AdvanceStepTimeRetryTimeTypeator()).run();
    }

    static class TestConsumer implements GeneralQueueConsumerable {

        @Override
        public boolean run(GeneralDelayedQueue task) {
            String body = task.getBody();
            String requestId = task.getRequestId();
            int currExecuteNum = task.getCurrExecuteNum();
            System.out.println("消费延时队列 requestId -> " + requestId + " ,第 -> " + (currExecuteNum + 1) + " 次,body -> " + body);
            return false;
        }
    }
}

 

关于更多com.b00c.common 依赖的源码详情、使用方法、更新历史请去下面这个链接查看。

https://github.com/DeBug-Bug/common

1+

发表评论

电子邮件地址不会被公开。