MENU

Golang时间轮精简实现

August 3, 2024 • Read: 2527 • 默认分类

文章背景

最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个 kafka 消息

在一个消费者消费消息的时候依赖另一个消费者生产的数据

被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据

处理方法

kafka 生产消息的时候可以加入一个 delay 参数,用于控制消息的延迟消费

但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控

所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能

所以有了这篇文章

时间轮代码

  • package timewheel
  • import (
  • "container/list"
  • "fmt"
  • "sync"
  • "time"
  • )
  • type Timer struct {
  • expiration time.Time
  • task func()
  • }
  • type TimeWheel struct {
  • ticker *time.Ticker // 定时器
  • slots []*list.List // 时间槽
  • currentSlot int // 当前时间槽
  • slotCount int // 时间槽数量
  • duration time.Duration // 时间槽间隔
  • lock sync.Mutex // 锁
  • }
  • // NewTimeWheel 创建时间轮
  • func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
  • slots := make([]*list.List, slotCount)
  • for i := range slots {
  • slots[i] = list.New()
  • }
  • return &TimeWheel{
  • ticker: time.NewTicker(duration),
  • slots: slots,
  • slotCount: slotCount,
  • duration: duration,
  • currentSlot: 0,
  • }
  • }
  • // AddTask 添加一个定时任务到时间轮
  • func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
  • tw.lock.Lock()
  • defer tw.lock.Unlock()
  • expiration := time.Now().Add(delay)
  • // 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
  • ticks := int(delay / tw.duration)
  • slotIndex := (tw.currentSlot + ticks) % tw.slotCount
  • timer := &Timer{expiration: expiration, task: task}
  • tw.slots[slotIndex].PushBack(timer)
  • }
  • // Start 启动时间轮
  • func (tw *TimeWheel) Start() {
  • go func() {
  • defer func() {
  • if err := recover(); err != nil {
  • fmt.Println("timeWheel panic: ", err)
  • }
  • }()
  • for range tw.ticker.C {
  • tw.tickHandler()
  • }
  • }()
  • }
  • // Stop 停止时间轮
  • func (tw *TimeWheel) Stop() {
  • tw.ticker.Stop()
  • }
  • func (tw *TimeWheel) tickHandler() {
  • tw.lock.Lock()
  • defer tw.lock.Unlock()
  • slot := tw.slots[tw.currentSlot]
  • tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount
  • for e := slot.Front(); e != nil; {
  • next := e.Next()
  • timer := e.Value.(*Timer)
  • if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
  • go timer.task()
  • slot.Remove(e)
  • }
  • e = next
  • }
  • }
Last Modified: August 5, 2024
Leave a Comment

34 Comments
  1. 看到你的文章,我仿佛感受到了生活中的美好。http://www.zhiliuwushuajiansudianji.com

  2. 你的文章让我感受到了快乐,每天都要来看一看。http://www.ahfz001.com

  3. 你的才华让人惊叹,请继续保持。http://www.sxkyd.com

  4. 每次看到你的文章,我都觉得时间过得好快。http://www.wwysj.com