3063 字
15 分钟
常见限流算法对比与令牌桶详解
常见限流算法对比与令牌桶详解
一、四种限流算法对比
1. 固定窗口计数器
原理:在固定时间窗口内统计请求数,超过阈值则拒绝。
优点:实现简单,内存占用小 缺点:存在”临界问题” - 窗口边界可能出现 2 倍流量突刺
窗口1: [0s-1s] 允许100个请求窗口2: [1s-2s] 允许100个请求问题:0.9s-1.1s 可能通过200个请求2. 滑动窗口计数器
原理:将时间窗口切分成多个小格子,统计滑动时间范围内的请求总数。
优点:解决固定窗口的临界问题,更平滑 缺点:需要存储多个时间格子的计数,内存占用较大
3. 漏桶算法 (Leaky Bucket)
原理:请求进入漏桶,以恒定速率流出处理。桶满则溢出拒绝。
优点:流量整形效果好,输出绝对平滑 缺点:无法应对突发流量,即使系统空闲也无法快速处理
请求 ↓↓↓ ┌─────────┐ │ 漏桶 │ │ ~~~~~~ │ └────↓────┘ 恒定速率流出4. 令牌桶算法 (Token Bucket) ⭐
原理:以恒定速率生成令牌放入桶中,请求需获取令牌才能通过。
优点:
- ✅ 支持突发流量 - 桶中积累的令牌可一次性使用
- ✅ 既能限制平均速率,又有弹性
- ✅ 适合大多数业务场景
缺点:实现相对复杂
二、令牌桶算法详解
工作原理
令牌生成器 → 恒定速率r ↓ ┌──────────────────┐ │ 令牌桶 (容量c) │ │ 🪙🪙🪙🪙🪙 │ └──────────────────┘ ↓ 获取令牌 请求处理核心机制:
- 令牌以固定速率
r生成(如每秒 100 个) - 桶的容量为
c(如最多存 200 个令牌) - 请求到达时尝试获取令牌:
- 有令牌:获取成功,请求通过
- 无令牌:拒绝请求或等待
- 令牌满了会丢弃新生成的令牌
为什么支持突发流量?
假设:r=100/s,c=200
- 平稳期:1 秒内最多处理 100 个请求(消耗=生成)
- 突发期:如果之前系统空闲,桶中积累了 200 个令牌,此时可瞬间处理 200 个请求
- 限制:突发过后,仍按 100/s 的平均速率处理
三、实现方式对比
单机实现
- 使用本地变量存储令牌数和时间戳
- 适合单体应用
- 性能高,无网络开销
分布式实现
常见方案:
- Redis + Lua 脚本:保证原子性,适合中小规模
- Redis + 限流中间件(如 Sentinel)
- 独立限流服务(如 Envoy Rate Limit Service)
核心挑战:
- 原子性:令牌获取必须是原子操作
- 一致性:多实例间的令牌计数同步
- 性能:减少网络调用开销
四、代码实现
Java 实现(单机版)
import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;
/** * 令牌桶限流器 */public class TokenBucketRateLimiter { // 桶的容量 private final long capacity; // 令牌生成速率(每秒) private final long refillRate; // 当前令牌数 private long tokens; // 上次刷新时间 private long lastRefillTime; // 线程安全锁 private final ReentrantLock lock = new ReentrantLock();
/** * @param capacity 桶容量(最大令牌数) * @param refillRate 每秒生成的令牌数 */ public TokenBucketRateLimiter(long capacity, long refillRate) { this.capacity = capacity; this.refillRate = refillRate; this.tokens = capacity; // 初始化时桶是满的 this.lastRefillTime = System.nanoTime(); }
/** * 尝试获取指定数量的令牌 * @param tokensRequired 需要的令牌数 * @return 是否成功获取 */ public boolean tryAcquire(long tokensRequired) { lock.lock(); try { refill(); if (tokens >= tokensRequired) { tokens -= tokensRequired; return true; } return false; } finally { lock.unlock(); } }
/** * 获取1个令牌 */ public boolean tryAcquire() { return tryAcquire(1); }
/** * 阻塞式获取令牌,直到成功 * @param tokensRequired 需要的令牌数 * @param timeout 超时时间 * @param unit 时间单位 * @return 是否在超时前获取成功 */ public boolean acquire(long tokensRequired, long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout);
while (System.nanoTime() < deadline) { if (tryAcquire(tokensRequired)) { return true; } // 计算需要等待的时间 long waitTime = (tokensRequired - tokens) * 1_000_000_000L / refillRate; if (waitTime > 0) { Thread.sleep(Math.min(waitTime / 1_000_000, 100)); } } return false; }
/** * 补充令牌 */ private void refill() { long now = System.nanoTime(); long elapsedTime = now - lastRefillTime;
// 计算这段时间应该生成的令牌数 long tokensToAdd = (elapsedTime * refillRate) / 1_000_000_000L;
if (tokensToAdd > 0) { tokens = Math.min(capacity, tokens + tokensToAdd); lastRefillTime = now; } }
/** * 获取当前可用令牌数 */ public long getAvailableTokens() { lock.lock(); try { refill(); return tokens; } finally { lock.unlock(); } }
// 使用示例 public static void main(String[] args) throws InterruptedException { // 创建限流器:容量200,每秒生成100个令牌 TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(200, 100);
System.out.println("=== 测试突发流量 ==="); // 模拟突发:瞬间150个请求 int successCount = 0; for (int i = 0; i < 150; i++) { if (limiter.tryAcquire()) { successCount++; } } System.out.println("突发150个请求,通过: " + successCount); System.out.println("剩余令牌: " + limiter.getAvailableTokens());
System.out.println("\n=== 测试持续流量 ==="); // 模拟持续请求 for (int i = 0; i < 5; i++) { Thread.sleep(1000); // 等待1秒 int pass = 0; for (int j = 0; j < 120; j++) { if (limiter.tryAcquire()) { pass++; } } System.out.println("第" + (i+1) + "秒: 尝试120个请求,通过 " + pass); }
System.out.println("\n=== 测试阻塞获取 ==="); boolean acquired = limiter.acquire(10, 2, TimeUnit.SECONDS); System.out.println("阻塞获取10个令牌: " + (acquired ? "成功" : "超时")); }}Go 实现(单机版)
package main
import ( "fmt" "sync" "time")
// TokenBucket 令牌桶限流器type TokenBucket struct { capacity int64 // 桶容量 refillRate int64 // 每秒生成的令牌数 tokens int64 // 当前令牌数 lastRefillTime time.Time // 上次刷新时间 mu sync.Mutex // 互斥锁}
// NewTokenBucket 创建令牌桶func NewTokenBucket(capacity, refillRate int64) *TokenBucket { return &TokenBucket{ capacity: capacity, refillRate: refillRate, tokens: capacity, // 初始化时桶是满的 lastRefillTime: time.Now(), }}
// TryAcquire 尝试获取指定数量的令牌func (tb *TokenBucket) TryAcquire(tokensRequired int64) bool { tb.mu.Lock() defer tb.mu.Unlock()
tb.refill()
if tb.tokens >= tokensRequired { tb.tokens -= tokensRequired return true } return false}
// TryAcquireOne 尝试获取1个令牌func (tb *TokenBucket) TryAcquireOne() bool { return tb.TryAcquire(1)}
// Acquire 阻塞式获取令牌,直到成功或超时func (tb *TokenBucket) Acquire(tokensRequired int64, timeout time.Duration) bool { deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) { if tb.TryAcquire(tokensRequired) { return true }
// 计算需要等待的时间 tb.mu.Lock() tb.refill() waitTokens := tokensRequired - tb.tokens tb.mu.Unlock()
if waitTokens > 0 { waitTime := time.Duration(waitTokens*1e9/tb.refillRate) * time.Nanosecond if waitTime > 100*time.Millisecond { waitTime = 100 * time.Millisecond } time.Sleep(waitTime) } } return false}
// refill 补充令牌(需要在持有锁的情况下调用)func (tb *TokenBucket) refill() { now := time.Now() elapsed := now.Sub(tb.lastRefillTime)
// 计算这段时间应该生成的令牌数 tokensToAdd := int64(elapsed.Seconds() * float64(tb.refillRate))
if tokensToAdd > 0 { tb.tokens += tokensToAdd if tb.tokens > tb.capacity { tb.tokens = tb.capacity } tb.lastRefillTime = now }}
// GetAvailableTokens 获取当前可用令牌数func (tb *TokenBucket) GetAvailableTokens() int64 { tb.mu.Lock() defer tb.mu.Unlock()
tb.refill() return tb.tokens}
func main() { // 创建限流器:容量200,每秒生成100个令牌 limiter := NewTokenBucket(200, 100)
fmt.Println("=== 测试突发流量 ===") // 模拟突发:瞬间150个请求 successCount := 0 for i := 0; i < 150; i++ { if limiter.TryAcquireOne() { successCount++ } } fmt.Printf("突发150个请求,通过: %d\n", successCount) fmt.Printf("剩余令牌: %d\n", limiter.GetAvailableTokens())
fmt.Println("\n=== 测试持续流量 ===") // 模拟持续请求 for i := 0; i < 5; i++ { time.Sleep(1 * time.Second) pass := 0 for j := 0; j < 120; j++ { if limiter.TryAcquireOne() { pass++ } } fmt.Printf("第%d秒: 尝试120个请求,通过 %d\n", i+1, pass) }
fmt.Println("\n=== 测试阻塞获取 ===") acquired := limiter.Acquire(10, 2*time.Second) if acquired { fmt.Println("阻塞获取10个令牌: 成功") } else { fmt.Println("阻塞获取10个令牌: 超时") }
fmt.Println("\n=== 并发测试 ===") // 重置限流器 limiter = NewTokenBucket(100, 50)
var wg sync.WaitGroup successTotal := int64(0) var successMu sync.Mutex
// 10个goroutine并发请求 for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() localSuccess := 0 for j := 0; j < 20; j++ { if limiter.TryAcquireOne() { localSuccess++ } time.Sleep(10 * time.Millisecond) } successMu.Lock() successTotal += int64(localSuccess) successMu.Unlock() fmt.Printf("Goroutine %d: 通过 %d 个请求\n", id, localSuccess) }(i) }
wg.Wait() fmt.Printf("并发总通过: %d 个请求\n", successTotal)}基于 Redis 的分布式实现(Go + Lua)
package main
import ( "context" "fmt" "time"
"github.com/redis/go-redis/v9")
// Lua脚本:原子性获取令牌const tokenBucketScript = `-- KEYS[1]: 令牌桶key-- KEYS[2]: 时间戳key-- ARGV[1]: 桶容量-- ARGV[2]: 令牌生成速率(每秒)-- ARGV[3]: 当前时间戳(纳秒)-- ARGV[4]: 请求令牌数
local capacity = tonumber(ARGV[1])local refill_rate = tonumber(ARGV[2])local now = tonumber(ARGV[3])local tokens_required = tonumber(ARGV[4])
-- 获取当前令牌数和上次刷新时间local tokens = tonumber(redis.call('GET', KEYS[1]) or capacity)local last_refill_time = tonumber(redis.call('GET', KEYS[2]) or now)
-- 计算应该生成的令牌数local elapsed = math.max(0, now - last_refill_time)local tokens_to_add = math.floor((elapsed / 1e9) * refill_rate)
-- 更新令牌数(不超过容量)if tokens_to_add > 0 then tokens = math.min(capacity, tokens + tokens_to_add) last_refill_time = nowend
-- 尝试获取令牌if tokens >= tokens_required then tokens = tokens - tokens_required -- 保存状态 redis.call('SET', KEYS[1], tokens) redis.call('SET', KEYS[2], last_refill_time) -- 设置过期时间(避免Redis内存泄漏) redis.call('EXPIRE', KEYS[1], 3600) redis.call('EXPIRE', KEYS[2], 3600) return 1 -- 成功else return 0 -- 失败end`
// DistributedTokenBucket 分布式令牌桶限流器type DistributedTokenBucket struct { rdb *redis.Client key string // Redis key前缀 capacity int64 // 桶容量 refillRate int64 // 每秒生成的令牌数 script *redis.Script}
// NewDistributedTokenBucket 创建分布式令牌桶func NewDistributedTokenBucket(rdb *redis.Client, key string, capacity, refillRate int64) *DistributedTokenBucket { return &DistributedTokenBucket{ rdb: rdb, key: key, capacity: capacity, refillRate: refillRate, script: redis.NewScript(tokenBucketScript), }}
// TryAcquire 尝试获取指定数量的令牌func (dtb *DistributedTokenBucket) TryAcquire(ctx context.Context, tokensRequired int64) (bool, error) { keys := []string{ dtb.key + ":tokens", dtb.key + ":timestamp", } args := []interface{}{ dtb.capacity, dtb.refillRate, time.Now().UnixNano(), tokensRequired, }
result, err := dtb.script.Run(ctx, dtb.rdb, keys, args...).Int() if err != nil { return false, err }
return result == 1, nil}
// TryAcquireOne 尝试获取1个令牌func (dtb *DistributedTokenBucket) TryAcquireOne(ctx context.Context) (bool, error) { return dtb.TryAcquire(ctx, 1)}
// GetAvailableTokens 获取当前可用令牌数(仅用于监控)func (dtb *DistributedTokenBucket) GetAvailableTokens(ctx context.Context) (int64, error) { result, err := dtb.rdb.Get(ctx, dtb.key+":tokens").Int64() if err == redis.Nil { return dtb.capacity, nil } return result, err}
func main() { // 连接Redis rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // 根据实际情况设置 DB: 0, }) defer rdb.Close()
ctx := context.Background()
// 测试连接 _, err := rdb.Ping(ctx).Result() if err != nil { fmt.Printf("无法连接Redis: %v\n", err) fmt.Println("请确保Redis服务正在运行") return }
// 创建分布式限流器:容量200,每秒生成100个令牌 limiter := NewDistributedTokenBucket(rdb, "rate_limit:api", 200, 100)
fmt.Println("=== 分布式令牌桶测试 ===")
// 测试1:突发流量 fmt.Println("\n1. 测试突发流量") successCount := 0 for i := 0; i < 150; i++ { ok, err := limiter.TryAcquireOne(ctx) if err != nil { fmt.Printf("错误: %v\n", err) return } if ok { successCount++ } } fmt.Printf("突发150个请求,通过: %d\n", successCount)
tokens, _ := limiter.GetAvailableTokens(ctx) fmt.Printf("剩余令牌: %d\n", tokens)
// 测试2:持续流量 fmt.Println("\n2. 测试持续流量(每秒120个请求)") for i := 0; i < 3; i++ { time.Sleep(1 * time.Second) pass := 0 for j := 0; j < 120; j++ { ok, _ := limiter.TryAcquireOne(ctx) if ok { pass++ } } fmt.Printf("第%d秒: 通过 %d 个请求\n", i+1, pass) }
// 测试3:模拟多实例并发 fmt.Println("\n3. 模拟3个服务实例并发请求") successChan := make(chan int, 3)
for instance := 0; instance < 3; instance++ { go func(id int) { localSuccess := 0 for j := 0; j < 50; j++ { ok, _ := limiter.TryAcquireOne(ctx) if ok { localSuccess++ } time.Sleep(10 * time.Millisecond) } successChan <- localSuccess fmt.Printf("实例%d: 通过 %d 个请求\n", id, localSuccess) }(instance) }
totalSuccess := 0 for i := 0; i < 3; i++ { totalSuccess += <-successChan } fmt.Printf("总通过: %d 个请求\n", totalSuccess)
fmt.Println("\n测试完成!")}
/*使用说明:1. 安装依赖: go get github.com/redis/go-redis/v92. 启动Redis: docker run -p 6379:6379 redis3. 运行程序: go run main.go
优势:- 多实例间共享限流配额- 原子性操作保证一致性- 支持动态调整限流策略
注意事项:- Redis性能成为瓶颈时考虑本地缓存- 设置合理的key过期时间- 监控Redis连接和网络延迟*/Java 实现(单机版)### Go 实现(单机版)### 基于 Redis 的分布式实现(Go + Lua)## 五、选型建议
| 场景 | 推荐算法 | 原因 |
|---|---|---|
| API 网关 | 令牌桶 | 需要处理突发流量,用户体验好 |
| 消息队列消费 | 漏桶 | 需要绝对平滑的处理速率 |
| 简单计数 | 固定窗口 | 实现简单,资源占用少 |
| 精确控制 | 滑动窗口 | 更平滑,无临界问题 |
六、进阶优化
1. 预热机制
系统启动时桶可能是空的,可以初始化一定数量的令牌
2. 动态调整
根据系统负载动态调整 refillRate
3. 分层限流
全局限流(1000 QPS) ↓用户级限流(每用户 10 QPS) ↓接口级限流(单接口 5 QPS)4. 本地缓存 + Redis
// 本地桶处理大部分请求,减少Redis调用if localBucket.TryAcquire() { return true}// 本地失败时尝试Redisreturn redisBucket.TryAcquire()总结:令牌桶是最常用的限流算法,它在保证平均速率的同时支持突发流量,适合绝大多数业务场景。单机场景用本地实现即可,分布式场景推荐 Redis + Lua 方案。
常见限流算法对比与令牌桶详解
https://mizuki.mysqil.com/posts/盘牛客面经/快手一面实习/限流算法原理与令牌桶详解/