教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

怎樣為程序添加延遲任務?延遲任務實現思路

更新時間:2023年06月21日18時41分 來源:傳智教育 瀏覽次數:

好口碑IT培訓

延遲任務有固定周期有明確出發(fā)時間,而延遲隊列沒有固定的開始時間它常常是由一個事件觸發(fā)的,而在這個事件觸發(fā)之后的一段時間內觸發(fā)另一個事件,任務可以立即執(zhí)行,也可以延遲。

延遲任務

延遲任務的應用場景:

場景一:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統(tǒng)自動取消訂單;如果期間下單成功,任務取消

場景二:接口對接出現網絡問題,1分鐘后重試,如果失敗,2分鐘重試,直到出現閾值終止

實現延遲任務的兩種任務

1)DelayQueue

JDK自帶DelayQueue 是一個支持延時獲取元素的阻塞隊列, 內部采用優(yōu)先隊列 PriorityQueue 存儲元素,同時元素必須實現 Delayed 接口;在創(chuàng)建元素時可以指定多久才可以從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素

DelayQueue

DelayQueue屬于排序隊列,它的特殊之處在于隊列的元素必須實現Delayed接口,該接口需要實現compareTo和getDelay方法

getDelay方法:獲取元素在隊列中的剩余時間,只有當剩余時間為0時元素才可以出隊列。

compareTo方法:用于排序,確定元素出隊列的順序。

實現:

1:在測試包jdk下創(chuàng)建延遲任務元素對象DelayedTask,實現compareTo和getDelay方法,

2:在main方法中創(chuàng)建DelayQueue并向延遲隊列中添加三個延遲任務,

3:循環(huán)的從延遲隊列中拉取任務

public class DelayedTask  implements Delayed{
    
    // 任務的執(zhí)行時間
    private int executeTime = 0;
    
    public DelayedTask(int delay){
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND,delay);
        this.executeTime = (int)(calendar.getTimeInMillis() /1000 );
    }

    /**
     * 元素在隊列中的剩余時間
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        Calendar calendar = Calendar.getInstance();
        return executeTime - (calendar.getTimeInMillis()/1000);
    }

    /**
     * 元素排序
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        return val == 0 ? 0 : ( val < 0 ? -1: 1 );
    }


    public static void main(String[] args) {
        DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
        
        queue.add(new DelayedTask(5));
        queue.add(new DelayedTask(10));
        queue.add(new DelayedTask(15));

        System.out.println(System.currentTimeMillis()/1000+" start consume ");
        while(queue.size() != 0){
            DelayedTask delayedTask = queue.poll();
            if(delayedTask !=null ){
                System.out.println(System.currentTimeMillis()/1000+" cosume task");
            }
            //每隔一秒消費一次
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }     
    }
}

DelayQueue實現完成之后思考一個問題:

使用線程池或者原生DelayQueue程序掛掉之后,任務都是放在內存,需要考慮未處理消息的丟失帶來的影響,如何保證數據不丟失,需要持久化(磁盤)

2)RabbitMQ實現延遲任務

TTL:Time To Live (消息存活時間)

死信隊列:Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以重新發(fā)送另一個交換機(死信交換機)。

RabbitMQ實現延遲任務

3)redis實現

zset數據類型的去重有序(分數排序)特點進行延遲。例如:時間戳作為score進行排序。

redis實現

redis實現延遲任務的思路

redis延遲任務的思路

1.為什么任務需要存儲在數據庫中?

延遲任務是一個通用的服務,任何需要延遲得任務都可以調用該服務,需要考慮數據持久化的問題,存儲數據庫中是一種數據安全的考慮。

2.為什么redis中使用兩種數據類型,list和zset?

效率問題,算法的時間復雜度

3.在添加zset數據的時候,為什么不需要預加載?

任務模塊是一個通用的模塊,項目中任何需要延遲隊列的地方,都可以調用這個接口,要考慮到數據量的問題,如果數據量特別大,為了防止阻塞,只需要把未來幾分鐘要執(zhí)行的數據存入緩存即可。

4)延遲任務服務實現

搭建heima-leadnews-schedule模塊

leadnews-schedule是一個通用的服務,單獨創(chuàng)建模塊來管理任何類型的延遲任務

①:導入資料文件夾下的heima-leadnews-schedule模塊到heima-leadnews-service下,如下圖所示:

延遲任務實現

②:添加bootstrap.yml

server:
  port: 51701
spring:
  application:
    name: leadnews-schedule
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.200.130:8848
      config:
        server-addr: 192.168.200.130:8848
        file-extension: yml

③:在nacos中添加對應配置,并添加數據庫及mybatis-plus的配置

spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: root
# 設置Mapper接口所對應的XML文件位置,如果你在Mapper接口中有自定義方法,需要進行該配置
mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 設置別名包掃描路徑,通過該屬性可以給包中的類注冊別名
  type-aliases-package: com.heima.model.schedule.pojos
0 分享到:
和我們在線交談!