文章背景
最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个kafka消息
在一个消费者消费消息的时候依赖另一个消费者生产的数据
被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据
处理方法
kafka生产消息的时候可以加入一个delay参数,用于控制消息的延迟消费
但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控
所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能
所以有了这篇文章
时间轮代码
package timewheel
import (
"container/list"
"fmt"
"sync"
"time"
)
type Timer struct {
expiration time.Time
task func()
}
type TimeWheel struct {
ticker *time.Ticker // 定时器
slots []*list.List // 时间槽
currentSlot int // 当前时间槽
slotCount int // 时间槽数量
duration time.Duration // 时间槽间隔
lock sync.Mutex // 锁
}
// NewTimeWheel 创建时间轮
func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
slots := make([]*list.List, slotCount)
for i := range slots {
slots[i] = list.New()
}
return &TimeWheel{
ticker: time.NewTicker(duration),
slots: slots,
slotCount: slotCount,
duration: duration,
currentSlot: 0,
}
}
// AddTask 添加一个定时任务到时间轮
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
tw.lock.Lock()
defer tw.lock.Unlock()
expiration := time.Now().Add(delay)
// 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
ticks := int(delay / tw.duration)
slotIndex := (tw.currentSlot + ticks) % tw.slotCount
timer := &Timer{expiration: expiration, task: task}
tw.slots[slotIndex].PushBack(timer)
}
// Start 启动时间轮
func (tw *TimeWheel) Start() {
go func() {
defer func() {
if err := recover(); err != nil {
fmt.Println("timeWheel panic: ", err)
}
}()
for range tw.ticker.C {
tw.tickHandler()
}
}()
}
// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
tw.ticker.Stop()
}
func (tw *TimeWheel) tickHandler() {
tw.lock.Lock()
defer tw.lock.Unlock()
slot := tw.slots[tw.currentSlot]
tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount
for e := slot.Front(); e != nil; {
next := e.Next()
timer := e.Value.(*Timer)
if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
go timer.task()
slot.Remove(e)
}
e = next
}
}
你的文章让我心情愉悦,每天都要来看一看。 https://www.4006400989.com/qyvideo/59281.html
你的文章内容非常专业,让人佩服。 https://www.4006400989.com/qyvideo/61123.html
你的才华让人惊叹,你是我的榜样。 https://www.yonboz.com/video/64652.html
你的文章内容非常用心,让人感动。 https://www.yonboz.com/video/6348.html
《影圣》短片剧高清在线免费观看:https://www.jgz518.com/xingkong/13886.html