MENU

Golang时间轮精简实现

August 3, 2024 • Read: 12810 • 默认分类

文章背景

最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个kafka消息

在一个消费者消费消息的时候依赖另一个消费者生产的数据

被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据

处理方法

kafka生产消息的时候可以加入一个delay参数,用于控制消息的延迟消费

但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控

所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能

所以有了这篇文章

时间轮代码

package timewheel

import (
    "container/list"
    "fmt"
    "sync"
    "time"
)

type Timer struct {
    expiration time.Time
    task       func()
}

type TimeWheel struct {
    ticker      *time.Ticker  // 定时器
    slots       []*list.List  // 时间槽
    currentSlot int           // 当前时间槽
    slotCount   int           // 时间槽数量
    duration    time.Duration // 时间槽间隔
    lock        sync.Mutex    // 锁
}

// NewTimeWheel 创建时间轮
func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
    slots := make([]*list.List, slotCount)
    for i := range slots {
        slots[i] = list.New()
    }
    return &TimeWheel{
        ticker:      time.NewTicker(duration),
        slots:       slots,
        slotCount:   slotCount,
        duration:    duration,
        currentSlot: 0,
    }
}

// AddTask 添加一个定时任务到时间轮
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
    tw.lock.Lock()
    defer tw.lock.Unlock()

    expiration := time.Now().Add(delay)

    // 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
    ticks := int(delay / tw.duration)
    slotIndex := (tw.currentSlot + ticks) % tw.slotCount

    timer := &Timer{expiration: expiration, task: task}
    tw.slots[slotIndex].PushBack(timer)
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println("timeWheel panic: ", err)
            }
        }()
        for range tw.ticker.C {
            tw.tickHandler()
        }
    }()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
    tw.ticker.Stop()
}

func (tw *TimeWheel) tickHandler() {
    tw.lock.Lock()
    defer tw.lock.Unlock()

    slot := tw.slots[tw.currentSlot]
    tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount

    for e := slot.Front(); e != nil; {
        next := e.Next()
        timer := e.Value.(*Timer)
        if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
            go timer.task()
            slot.Remove(e)
        }
        e = next
    }
}


Last Modified: August 5, 2024
Leave a Comment

57 Comments
  1. 华纳东方明珠客服电话是多少?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠开户专线联系方式?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    如何联系华纳东方明珠客服?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠官方客服联系方式?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠客服热线?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠开户客服电话?(▲182(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠24小时客服电话?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠客服邮箱?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠官方客服在线咨询?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】
    华纳东方明珠客服微信?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】

  2. 华纳企业账户开户步骤?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】

  3. 华纳个人账户开户流程?(▲18288362750?《?微信STS5099? 】【╃q 2704132802╃】

  4. 华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099

  5. 华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099
    华纳公司合作开户所需材料?电话号码15587291507 微信STS5099