MENU

Golang时间轮精简实现

August 3, 2024 • Read: 1477 • 默认分类

文章背景

最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个kafka消息

在一个消费者消费消息的时候依赖另一个消费者生产的数据

被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据

处理方法

kafka生产消息的时候可以加入一个delay参数,用于控制消息的延迟消费

但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控

所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能

所以有了这篇文章

时间轮代码

package timewheel

import (
    "container/list"
    "fmt"
    "sync"
    "time"
)

type Timer struct {
    expiration time.Time
    task       func()
}

type TimeWheel struct {
    ticker      *time.Ticker  // 定时器
    slots       []*list.List  // 时间槽
    currentSlot int           // 当前时间槽
    slotCount   int           // 时间槽数量
    duration    time.Duration // 时间槽间隔
    lock        sync.Mutex    // 锁
}

// NewTimeWheel 创建时间轮
func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
    slots := make([]*list.List, slotCount)
    for i := range slots {
        slots[i] = list.New()
    }
    return &TimeWheel{
        ticker:      time.NewTicker(duration),
        slots:       slots,
        slotCount:   slotCount,
        duration:    duration,
        currentSlot: 0,
    }
}

// AddTask 添加一个定时任务到时间轮
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
    tw.lock.Lock()
    defer tw.lock.Unlock()

    expiration := time.Now().Add(delay)

    // 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
    ticks := int(delay / tw.duration)
    slotIndex := (tw.currentSlot + ticks) % tw.slotCount

    timer := &Timer{expiration: expiration, task: task}
    tw.slots[slotIndex].PushBack(timer)
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println("timeWheel panic: ", err)
            }
        }()
        for range tw.ticker.C {
            tw.tickHandler()
        }
    }()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
    tw.ticker.Stop()
}

func (tw *TimeWheel) tickHandler() {
    tw.lock.Lock()
    defer tw.lock.Unlock()

    slot := tw.slots[tw.currentSlot]
    tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount

    for e := slot.Front(); e != nil; {
        next := e.Next()
        timer := e.Value.(*Timer)
        if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
            go timer.task()
            slot.Remove(e)
        }
        e = next
    }
}


Last Modified: August 5, 2024
Leave a Comment

34 Comments
  1. 感谢大佬的贡献

  2. 《暗红》剧情片高清在线免费观看:https://www.jgz518.com/xingkong/131068.html

  3. 你的文章内容非常精彩,让人回味无穷。 https://www.4006400989.com/qyvideo/66700.html

  4. 每次看到你的文章,我都觉得时间过得好快。 http://www.55baobei.com/k86xnIqwEF.html

  5. 你的文章让我感受到了艺术的魅力,谢谢! http://www.55baobei.com/8V44rtHeT3.html