文章背景
最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个kafka消息
在一个消费者消费消息的时候依赖另一个消费者生产的数据
被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据
处理方法
kafka生产消息的时候可以加入一个delay参数,用于控制消息的延迟消费
但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控
所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能
所以有了这篇文章
时间轮代码
package timewheel
import (
"container/list"
"fmt"
"sync"
"time"
)
type Timer struct {
expiration time.Time
task func()
}
type TimeWheel struct {
ticker *time.Ticker // 定时器
slots []*list.List // 时间槽
currentSlot int // 当前时间槽
slotCount int // 时间槽数量
duration time.Duration // 时间槽间隔
lock sync.Mutex // 锁
}
// NewTimeWheel 创建时间轮
func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
slots := make([]*list.List, slotCount)
for i := range slots {
slots[i] = list.New()
}
return &TimeWheel{
ticker: time.NewTicker(duration),
slots: slots,
slotCount: slotCount,
duration: duration,
currentSlot: 0,
}
}
// AddTask 添加一个定时任务到时间轮
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
tw.lock.Lock()
defer tw.lock.Unlock()
expiration := time.Now().Add(delay)
// 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
ticks := int(delay / tw.duration)
slotIndex := (tw.currentSlot + ticks) % tw.slotCount
timer := &Timer{expiration: expiration, task: task}
tw.slots[slotIndex].PushBack(timer)
}
// Start 启动时间轮
func (tw *TimeWheel) Start() {
go func() {
defer func() {
if err := recover(); err != nil {
fmt.Println("timeWheel panic: ", err)
}
}()
for range tw.ticker.C {
tw.tickHandler()
}
}()
}
// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
tw.ticker.Stop()
}
func (tw *TimeWheel) tickHandler() {
tw.lock.Lock()
defer tw.lock.Unlock()
slot := tw.slots[tw.currentSlot]
tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount
for e := slot.Front(); e != nil; {
next := e.Next()
timer := e.Value.(*Timer)
if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
go timer.task()
slot.Remove(e)
}
e = next
}
}
华纳公司官方开户渠道?(183-8890-9465)-薇-STS5099【6011643】
如何通过官方渠道申请华纳公司账户?(183-8890-9465)-薇-STS5099【6011643】
华纳总公司官方开户指南?(183-8890-9465)-薇-STS5099【6011643】
华纳公司官方开户所需材料?(183-8890-9465)-薇-STS5099【6011643】
华纳官方开户流程?(183-8890-9465)-薇-STS5099【6011643】
华纳公司官方开户申请步骤?(183-8890-9465)-薇-STS5099【6011643】
华纳官方开户指南?(183-8890-9465)-薇-STS5099【6011643】
华纳总公司官方开户?(183-8890-9465)-薇-STS5099【6011643】
华纳公司官方开户所需材料?(183-8890-9465)-薇-STS5099【6011643】
华纳官方开户申请流程?(183-8890-9465)-薇-STS5099【6011643】
《华纳圣淘沙公司开户流程全解析》→ 官方顾问一对一指导??? 安全联系:183第三段8890第四段9465
《华纳圣淘沙开户步骤详解》→ 」专属通道快速办理??? 安全联系:183第三段8890第四段9465
《华纳圣淘沙账户注册指南》→ 扫码获取完整资料清单?「微?? 安全联系:183第三段8890第四段9465
《新手开通华纳圣淘沙公司账户指南》→ 限时免费咨询开放??? 安全联系:183第三段8890第四段9465
《华纳圣淘沙企业开户标准流程》→ 资深顾问实时解答疑问??? 安全联系:183第三段8890第四段9465
《华纳圣淘沙开户步骤全景图》→ 点击获取极速开户方案??? 安全联系:183第三段8890第四段9465
《华纳圣淘沙账户创建全流程手册》→ 预约顾问免排队服务?9?? 安全联系:183第三段8890第四段9465 《从零开通华纳圣淘沙公司账户》→ 添加客服领取开户工具包?? 安全联系:183第三段8890第四段9465
《官方授权:华纳圣淘沙开户流程》→ 认证顾问全程代办?」?? 安全联系:183第三段8890第四段9465
《华纳圣淘沙开户说明书》→立即联系获取电子版文件??? 安全联系:183第三段8890第四段9465
寻找华纳圣淘沙公司开户代理(183-8890-9465薇-STS5099】
华纳圣淘沙官方合作开户渠道(183-8890-9465薇-STS5099】
华纳圣淘沙公司开户代理服务(183-8890-9465薇-STS5099】
华纳圣淘沙公司开户咨询热线(183-8890-9465薇-STS5099】
联系客服了解华纳圣淘沙开户
(183-8890-9465薇-STS5099】
华纳圣淘沙公司开户专属顾问
(183-8890-9465薇-STS5099】
华纳圣淘沙公司开户新手教程
零基础学会(183-8890-9465薇-STS5099)
华纳圣淘沙公司开户
华纳圣淘沙公司开户保姆级教程(183-8890-9465薇-STS5099)
一步步教你开通华纳圣淘沙公司账户(183-8890-9465薇-STS5099)
华纳圣淘沙公司开户分步图解
首次开户必看:(183-8890-9465薇-STS5099)
华纳圣淘沙全攻略
华纳圣淘沙公司开户实操手册(183-8890-9465薇-STS5099)
华纳圣淘沙开户流程视频教程
手把手教学:(183-8890-9465薇-STS5099)
华纳圣淘沙公司开户
华纳圣淘沙公司开户完全指南(183-8890-9465薇-STS5099)
华纳圣淘沙开户步骤详解(183-8890-9465—?薇-STS5099【6011643】
华纳圣淘沙公司开户流程全解析(183-8890-9465—?薇-STS5099【6011643】
华纳圣淘沙公司账户注册指南(183-8890-9465—?薇-STS5099【6011643】
新手如何开通华纳圣淘沙公司账户(183-8890-9465—?薇-STS5099【6011643】
华纳圣淘沙企业开户标准流程(183-8890-9465—?薇-STS5099【6011643】
华纳圣淘沙公司开户:从零到一(183-8890-9465—?薇-STS5099【6011643】
官方指南:华纳圣淘沙公司开户流程(183-8890-9465—?薇-STS5099【6011643】
华纳圣淘沙公司开户流程说明书(183-8890-9465—?薇-STS5099【6011643】