3063 字
15 分钟
常见限流算法对比与令牌桶详解

常见限流算法对比与令牌桶详解#

一、四种限流算法对比#

1. 固定窗口计数器#

原理:在固定时间窗口内统计请求数,超过阈值则拒绝。

优点:实现简单,内存占用小 缺点:存在”临界问题” - 窗口边界可能出现 2 倍流量突刺

窗口1: [0s-1s] 允许100个请求
窗口2: [1s-2s] 允许100个请求
问题:0.9s-1.1s 可能通过200个请求

2. 滑动窗口计数器#

原理:将时间窗口切分成多个小格子,统计滑动时间范围内的请求总数。

优点:解决固定窗口的临界问题,更平滑 缺点:需要存储多个时间格子的计数,内存占用较大

3. 漏桶算法 (Leaky Bucket)#

原理:请求进入漏桶,以恒定速率流出处理。桶满则溢出拒绝。

优点:流量整形效果好,输出绝对平滑 缺点无法应对突发流量,即使系统空闲也无法快速处理

请求 ↓↓↓
┌─────────┐
│ 漏桶 │
│ ~~~~~~ │
└────↓────┘
恒定速率流出

4. 令牌桶算法 (Token Bucket)#

原理:以恒定速率生成令牌放入桶中,请求需获取令牌才能通过。

优点

  • 支持突发流量 - 桶中积累的令牌可一次性使用
  • ✅ 既能限制平均速率,又有弹性
  • ✅ 适合大多数业务场景

缺点:实现相对复杂


二、令牌桶算法详解#

工作原理#

令牌生成器 → 恒定速率r
┌──────────────────┐
│ 令牌桶 (容量c) │
│ 🪙🪙🪙🪙🪙 │
└──────────────────┘
↓ 获取令牌
请求处理

核心机制

  1. 令牌以固定速率 r 生成(如每秒 100 个)
  2. 桶的容量为 c(如最多存 200 个令牌)
  3. 请求到达时尝试获取令牌:
    • 有令牌:获取成功,请求通过
    • 无令牌:拒绝请求或等待
  4. 令牌满了会丢弃新生成的令牌

为什么支持突发流量?#

假设:r=100/s,c=200

  • 平稳期:1 秒内最多处理 100 个请求(消耗=生成)
  • 突发期:如果之前系统空闲,桶中积累了 200 个令牌,此时可瞬间处理 200 个请求
  • 限制:突发过后,仍按 100/s 的平均速率处理

三、实现方式对比#

单机实现#

  • 使用本地变量存储令牌数和时间戳
  • 适合单体应用
  • 性能高,无网络开销

分布式实现#

常见方案:

  1. Redis + Lua 脚本:保证原子性,适合中小规模
  2. Redis + 限流中间件(如 Sentinel)
  3. 独立限流服务(如 Envoy Rate Limit Service)

核心挑战

  • 原子性:令牌获取必须是原子操作
  • 一致性:多实例间的令牌计数同步
  • 性能:减少网络调用开销

四、代码实现#

Java 实现(单机版)#

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 令牌桶限流器
*/
public class TokenBucketRateLimiter {
// 桶的容量
private final long capacity;
// 令牌生成速率(每秒)
private final long refillRate;
// 当前令牌数
private long tokens;
// 上次刷新时间
private long lastRefillTime;
// 线程安全锁
private final ReentrantLock lock = new ReentrantLock();
/**
* @param capacity 桶容量(最大令牌数)
* @param refillRate 每秒生成的令牌数
*/
public TokenBucketRateLimiter(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = capacity; // 初始化时桶是满的
this.lastRefillTime = System.nanoTime();
}
/**
* 尝试获取指定数量的令牌
* @param tokensRequired 需要的令牌数
* @return 是否成功获取
*/
public boolean tryAcquire(long tokensRequired) {
lock.lock();
try {
refill();
if (tokens >= tokensRequired) {
tokens -= tokensRequired;
return true;
}
return false;
} finally {
lock.unlock();
}
}
/**
* 获取1个令牌
*/
public boolean tryAcquire() {
return tryAcquire(1);
}
/**
* 阻塞式获取令牌,直到成功
* @param tokensRequired 需要的令牌数
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否在超时前获取成功
*/
public boolean acquire(long tokensRequired, long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
while (System.nanoTime() < deadline) {
if (tryAcquire(tokensRequired)) {
return true;
}
// 计算需要等待的时间
long waitTime = (tokensRequired - tokens) * 1_000_000_000L / refillRate;
if (waitTime > 0) {
Thread.sleep(Math.min(waitTime / 1_000_000, 100));
}
}
return false;
}
/**
* 补充令牌
*/
private void refill() {
long now = System.nanoTime();
long elapsedTime = now - lastRefillTime;
// 计算这段时间应该生成的令牌数
long tokensToAdd = (elapsedTime * refillRate) / 1_000_000_000L;
if (tokensToAdd > 0) {
tokens = Math.min(capacity, tokens + tokensToAdd);
lastRefillTime = now;
}
}
/**
* 获取当前可用令牌数
*/
public long getAvailableTokens() {
lock.lock();
try {
refill();
return tokens;
} finally {
lock.unlock();
}
}
// 使用示例
public static void main(String[] args) throws InterruptedException {
// 创建限流器:容量200,每秒生成100个令牌
TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(200, 100);
System.out.println("=== 测试突发流量 ===");
// 模拟突发:瞬间150个请求
int successCount = 0;
for (int i = 0; i < 150; i++) {
if (limiter.tryAcquire()) {
successCount++;
}
}
System.out.println("突发150个请求,通过: " + successCount);
System.out.println("剩余令牌: " + limiter.getAvailableTokens());
System.out.println("\n=== 测试持续流量 ===");
// 模拟持续请求
for (int i = 0; i < 5; i++) {
Thread.sleep(1000); // 等待1秒
int pass = 0;
for (int j = 0; j < 120; j++) {
if (limiter.tryAcquire()) {
pass++;
}
}
System.out.println("第" + (i+1) + "秒: 尝试120个请求,通过 " + pass);
}
System.out.println("\n=== 测试阻塞获取 ===");
boolean acquired = limiter.acquire(10, 2, TimeUnit.SECONDS);
System.out.println("阻塞获取10个令牌: " + (acquired ? "成功" : "超时"));
}
}

Go 实现(单机版)#

package main
import (
"fmt"
"sync"
"time"
)
// TokenBucket 令牌桶限流器
type TokenBucket struct {
capacity int64 // 桶容量
refillRate int64 // 每秒生成的令牌数
tokens int64 // 当前令牌数
lastRefillTime time.Time // 上次刷新时间
mu sync.Mutex // 互斥锁
}
// NewTokenBucket 创建令牌桶
func NewTokenBucket(capacity, refillRate int64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
refillRate: refillRate,
tokens: capacity, // 初始化时桶是满的
lastRefillTime: time.Now(),
}
}
// TryAcquire 尝试获取指定数量的令牌
func (tb *TokenBucket) TryAcquire(tokensRequired int64) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
if tb.tokens >= tokensRequired {
tb.tokens -= tokensRequired
return true
}
return false
}
// TryAcquireOne 尝试获取1个令牌
func (tb *TokenBucket) TryAcquireOne() bool {
return tb.TryAcquire(1)
}
// Acquire 阻塞式获取令牌,直到成功或超时
func (tb *TokenBucket) Acquire(tokensRequired int64, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if tb.TryAcquire(tokensRequired) {
return true
}
// 计算需要等待的时间
tb.mu.Lock()
tb.refill()
waitTokens := tokensRequired - tb.tokens
tb.mu.Unlock()
if waitTokens > 0 {
waitTime := time.Duration(waitTokens*1e9/tb.refillRate) * time.Nanosecond
if waitTime > 100*time.Millisecond {
waitTime = 100 * time.Millisecond
}
time.Sleep(waitTime)
}
}
return false
}
// refill 补充令牌(需要在持有锁的情况下调用)
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillTime)
// 计算这段时间应该生成的令牌数
tokensToAdd := int64(elapsed.Seconds() * float64(tb.refillRate))
if tokensToAdd > 0 {
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastRefillTime = now
}
}
// GetAvailableTokens 获取当前可用令牌数
func (tb *TokenBucket) GetAvailableTokens() int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
return tb.tokens
}
func main() {
// 创建限流器:容量200,每秒生成100个令牌
limiter := NewTokenBucket(200, 100)
fmt.Println("=== 测试突发流量 ===")
// 模拟突发:瞬间150个请求
successCount := 0
for i := 0; i < 150; i++ {
if limiter.TryAcquireOne() {
successCount++
}
}
fmt.Printf("突发150个请求,通过: %d\n", successCount)
fmt.Printf("剩余令牌: %d\n", limiter.GetAvailableTokens())
fmt.Println("\n=== 测试持续流量 ===")
// 模拟持续请求
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second)
pass := 0
for j := 0; j < 120; j++ {
if limiter.TryAcquireOne() {
pass++
}
}
fmt.Printf("第%d秒: 尝试120个请求,通过 %d\n", i+1, pass)
}
fmt.Println("\n=== 测试阻塞获取 ===")
acquired := limiter.Acquire(10, 2*time.Second)
if acquired {
fmt.Println("阻塞获取10个令牌: 成功")
} else {
fmt.Println("阻塞获取10个令牌: 超时")
}
fmt.Println("\n=== 并发测试 ===")
// 重置限流器
limiter = NewTokenBucket(100, 50)
var wg sync.WaitGroup
successTotal := int64(0)
var successMu sync.Mutex
// 10个goroutine并发请求
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
localSuccess := 0
for j := 0; j < 20; j++ {
if limiter.TryAcquireOne() {
localSuccess++
}
time.Sleep(10 * time.Millisecond)
}
successMu.Lock()
successTotal += int64(localSuccess)
successMu.Unlock()
fmt.Printf("Goroutine %d: 通过 %d 个请求\n", id, localSuccess)
}(i)
}
wg.Wait()
fmt.Printf("并发总通过: %d 个请求\n", successTotal)
}

基于 Redis 的分布式实现(Go + Lua)#

package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// Lua脚本:原子性获取令牌
const tokenBucketScript = `
-- KEYS[1]: 令牌桶key
-- KEYS[2]: 时间戳key
-- ARGV[1]: 桶容量
-- ARGV[2]: 令牌生成速率(每秒)
-- ARGV[3]: 当前时间戳(纳秒)
-- ARGV[4]: 请求令牌数
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local tokens_required = tonumber(ARGV[4])
-- 获取当前令牌数和上次刷新时间
local tokens = tonumber(redis.call('GET', KEYS[1]) or capacity)
local last_refill_time = tonumber(redis.call('GET', KEYS[2]) or now)
-- 计算应该生成的令牌数
local elapsed = math.max(0, now - last_refill_time)
local tokens_to_add = math.floor((elapsed / 1e9) * refill_rate)
-- 更新令牌数(不超过容量)
if tokens_to_add > 0 then
tokens = math.min(capacity, tokens + tokens_to_add)
last_refill_time = now
end
-- 尝试获取令牌
if tokens >= tokens_required then
tokens = tokens - tokens_required
-- 保存状态
redis.call('SET', KEYS[1], tokens)
redis.call('SET', KEYS[2], last_refill_time)
-- 设置过期时间(避免Redis内存泄漏)
redis.call('EXPIRE', KEYS[1], 3600)
redis.call('EXPIRE', KEYS[2], 3600)
return 1 -- 成功
else
return 0 -- 失败
end
`
// DistributedTokenBucket 分布式令牌桶限流器
type DistributedTokenBucket struct {
rdb *redis.Client
key string // Redis key前缀
capacity int64 // 桶容量
refillRate int64 // 每秒生成的令牌数
script *redis.Script
}
// NewDistributedTokenBucket 创建分布式令牌桶
func NewDistributedTokenBucket(rdb *redis.Client, key string, capacity, refillRate int64) *DistributedTokenBucket {
return &DistributedTokenBucket{
rdb: rdb,
key: key,
capacity: capacity,
refillRate: refillRate,
script: redis.NewScript(tokenBucketScript),
}
}
// TryAcquire 尝试获取指定数量的令牌
func (dtb *DistributedTokenBucket) TryAcquire(ctx context.Context, tokensRequired int64) (bool, error) {
keys := []string{
dtb.key + ":tokens",
dtb.key + ":timestamp",
}
args := []interface{}{
dtb.capacity,
dtb.refillRate,
time.Now().UnixNano(),
tokensRequired,
}
result, err := dtb.script.Run(ctx, dtb.rdb, keys, args...).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
// TryAcquireOne 尝试获取1个令牌
func (dtb *DistributedTokenBucket) TryAcquireOne(ctx context.Context) (bool, error) {
return dtb.TryAcquire(ctx, 1)
}
// GetAvailableTokens 获取当前可用令牌数(仅用于监控)
func (dtb *DistributedTokenBucket) GetAvailableTokens(ctx context.Context) (int64, error) {
result, err := dtb.rdb.Get(ctx, dtb.key+":tokens").Int64()
if err == redis.Nil {
return dtb.capacity, nil
}
return result, err
}
func main() {
// 连接Redis
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 根据实际情况设置
DB: 0,
})
defer rdb.Close()
ctx := context.Background()
// 测试连接
_, err := rdb.Ping(ctx).Result()
if err != nil {
fmt.Printf("无法连接Redis: %v\n", err)
fmt.Println("请确保Redis服务正在运行")
return
}
// 创建分布式限流器:容量200,每秒生成100个令牌
limiter := NewDistributedTokenBucket(rdb, "rate_limit:api", 200, 100)
fmt.Println("=== 分布式令牌桶测试 ===")
// 测试1:突发流量
fmt.Println("\n1. 测试突发流量")
successCount := 0
for i := 0; i < 150; i++ {
ok, err := limiter.TryAcquireOne(ctx)
if err != nil {
fmt.Printf("错误: %v\n", err)
return
}
if ok {
successCount++
}
}
fmt.Printf("突发150个请求,通过: %d\n", successCount)
tokens, _ := limiter.GetAvailableTokens(ctx)
fmt.Printf("剩余令牌: %d\n", tokens)
// 测试2:持续流量
fmt.Println("\n2. 测试持续流量(每秒120个请求)")
for i := 0; i < 3; i++ {
time.Sleep(1 * time.Second)
pass := 0
for j := 0; j < 120; j++ {
ok, _ := limiter.TryAcquireOne(ctx)
if ok {
pass++
}
}
fmt.Printf("第%d秒: 通过 %d 个请求\n", i+1, pass)
}
// 测试3:模拟多实例并发
fmt.Println("\n3. 模拟3个服务实例并发请求")
successChan := make(chan int, 3)
for instance := 0; instance < 3; instance++ {
go func(id int) {
localSuccess := 0
for j := 0; j < 50; j++ {
ok, _ := limiter.TryAcquireOne(ctx)
if ok {
localSuccess++
}
time.Sleep(10 * time.Millisecond)
}
successChan <- localSuccess
fmt.Printf("实例%d: 通过 %d 个请求\n", id, localSuccess)
}(instance)
}
totalSuccess := 0
for i := 0; i < 3; i++ {
totalSuccess += <-successChan
}
fmt.Printf("总通过: %d 个请求\n", totalSuccess)
fmt.Println("\n测试完成!")
}
/*
使用说明:
1. 安装依赖: go get github.com/redis/go-redis/v9
2. 启动Redis: docker run -p 6379:6379 redis
3. 运行程序: go run main.go
优势:
- 多实例间共享限流配额
- 原子性操作保证一致性
- 支持动态调整限流策略
注意事项:
- Redis性能成为瓶颈时考虑本地缓存
- 设置合理的key过期时间
- 监控Redis连接和网络延迟
*/

Java 实现(单机版)### Go 实现(单机版)### 基于 Redis 的分布式实现(Go + Lua)## 五、选型建议#

场景推荐算法原因
API 网关令牌桶需要处理突发流量,用户体验好
消息队列消费漏桶需要绝对平滑的处理速率
简单计数固定窗口实现简单,资源占用少
精确控制滑动窗口更平滑,无临界问题

六、进阶优化#

1. 预热机制#

系统启动时桶可能是空的,可以初始化一定数量的令牌

2. 动态调整#

根据系统负载动态调整 refillRate

3. 分层限流#

全局限流(1000 QPS)
用户级限流(每用户 10 QPS)
接口级限流(单接口 5 QPS)

4. 本地缓存 + Redis#

// 本地桶处理大部分请求,减少Redis调用
if localBucket.TryAcquire() {
return true
}
// 本地失败时尝试Redis
return redisBucket.TryAcquire()

总结:令牌桶是最常用的限流算法,它在保证平均速率的同时支持突发流量,适合绝大多数业务场景。单机场景用本地实现即可,分布式场景推荐 Redis + Lua 方案。

常见限流算法对比与令牌桶详解
https://mizuki.mysqil.com/posts/盘牛客面经/快手一面实习/限流算法原理与令牌桶详解/
作者
Laoli
发布于
2025-10-19
许可协议
CC BY-NC-SA 4.0
封面
示例歌曲
示例艺术家
封面
示例歌曲
示例艺术家
0:00 / 0:00