文章背景
最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个 kafka 消息
在一个消费者消费消息的时候依赖另一个消费者生产的数据
被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据
处理方法
kafka 生产消息的时候可以加入一个 delay 参数,用于控制消息的延迟消费
但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控
所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能
所以有了这篇文章
时间轮代码
- package timewheel
-
- import (
- "container/list"
- "fmt"
- "sync"
- "time"
- )
-
- type Timer struct {
- expiration time.Time
- task func()
- }
-
- type TimeWheel struct {
- ticker *time.Ticker // 定时器
- slots []*list.List // 时间槽
- currentSlot int // 当前时间槽
- slotCount int // 时间槽数量
- duration time.Duration // 时间槽间隔
- lock sync.Mutex // 锁
- }
-
- // NewTimeWheel 创建时间轮
- func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
- slots := make([]*list.List, slotCount)
- for i := range slots {
- slots[i] = list.New()
- }
- return &TimeWheel{
- ticker: time.NewTicker(duration),
- slots: slots,
- slotCount: slotCount,
- duration: duration,
- currentSlot: 0,
- }
- }
-
- // AddTask 添加一个定时任务到时间轮
- func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
- tw.lock.Lock()
- defer tw.lock.Unlock()
-
- expiration := time.Now().Add(delay)
-
- // 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
- ticks := int(delay / tw.duration)
- slotIndex := (tw.currentSlot + ticks) % tw.slotCount
-
- timer := &Timer{expiration: expiration, task: task}
- tw.slots[slotIndex].PushBack(timer)
- }
-
- // Start 启动时间轮
- func (tw *TimeWheel) Start() {
- go func() {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println("timeWheel panic: ", err)
- }
- }()
- for range tw.ticker.C {
- tw.tickHandler()
- }
- }()
- }
-
- // Stop 停止时间轮
- func (tw *TimeWheel) Stop() {
- tw.ticker.Stop()
- }
-
- func (tw *TimeWheel) tickHandler() {
- tw.lock.Lock()
- defer tw.lock.Unlock()
-
- slot := tw.slots[tw.currentSlot]
- tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount
-
- for e := slot.Front(); e != nil; {
- next := e.Next()
- timer := e.Value.(*Timer)
- if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
- go timer.task()
- slot.Remove(e)
- }
- e = next
- }
- }
-
-
看到你的文章,我仿佛感受到了生活中的美好。http://www.zhiliuwushuajiansudianji.com
你的文章让我感受到了快乐,每天都要来看一看。http://www.ahfz001.com
你的才华让人惊叹,请继续保持。http://www.sxkyd.com
每次看到你的文章,我都觉得时间过得好快。http://www.wwysj.com