HTTP Dispatcher系列(2):HTTP Dispatcher如何解决限流和连接池问题
深入探讨HTTP Dispatcher的限流算法和连接池策略。了解令牌桶、漏桶、自适应限流以及高效连接管理,以实现高吞吐量的酒店API集成。
“我们从60%的供应商调用中得到429错误。高峰时段我们的客户看到错误页面。” — 运营经理
“我们的应用程序每小时创建10,000+个连接。云提供商的账单正在爆炸式增长。” — DevOps工程师
这些都是缺少HTTP Dispatcher的典型症状。让我们深入探讨它如何解决这些问题。
第一部分:限流 - 节流的艺术
理解供应商限流
限流无处不在,但它不是统一的:
| 供应商类型 | 常见限制 | 限制类型 | 突发允许 |
|---|---|---|---|
| 大型OTA(Expedia, Agoda) | 100-500 req/s | 每IP/密钥 | 是 |
| 中型OTA(HotelBeds, Dida) | 10-50 req/s | 每API密钥 | 有时 |
| 小型/细分供应商 | 2-10 req/s | 严格 | 否 |
| GDS系统 | 5-20 req/s | 每会话 | 否 |
| 床位银行 | 20-100 req/min | 每合同 | 否 |
限流算法
算法1:令牌桶(最常见)
工作原理:
桶有:
- 容量:最大令牌数(限流)
- 填充率:每秒添加的令牌
- 令牌成本:每个请求1个令牌
请求流程:
1. 检查桶是否有令牌
2. 如果有:消耗1个令牌,允许请求
3. 如果没有:排队或拒绝请求
实现:
type TokenBucketRateLimiter struct {
capacity float64
rate float64
tokens float64
lastRefill time.Time
mu sync.Mutex
}
func (tb *TokenBucketRateLimiter) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
tb.lastRefill = now
// 填充令牌
tb.tokens += elapsed * tb.rate
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
// 检查是否有令牌
if tb.tokens >= 1.0 {
tb.tokens -= 1.0
return true
}
return false
}
可视化:
令牌: ██████████ (10/10) → 就绪
令牌: ████████░░ (8/10) → 请求允许
令牌: ██░░░░░░░░ (2/10) → 请求允许
令牌: ░░░░░░░░░░ (0/10) → 请求排队
... 2秒后(每秒填充20个令牌)...
令牌: ██████░░░░ (6/10) → 再次就绪
算法2:漏桶(固定输出率)
工作原理:
桶有:
- 容量:最大队列大小
- 漏率:每秒处理的请求数
- 水位:当前队列大小
请求流程:
1. 如果桶满:拒绝
2. 如果桶不满:添加到队列
3. 队列以固定速率泄漏
实现:
type LeakyBucketRateLimiter struct {
capacity int
rate float64 // 每秒请求数
queue chan struct{}
ticker *time.Ticker
}
func NewLeakyBucketRateLimiter(capacity int, rate float64) *LeakyBucketRateLimiter {
lb := &LeakyBucketRateLimiter{
capacity: capacity,
rate: rate,
queue: make(chan struct{}, capacity),
}
// 以固定速率开始泄漏
interval := time.Duration(1.0/rate*1000) * time.Millisecond
lb.ticker = time.NewTicker(interval)
go func() {
for range lb.ticker.C {
select {
case <-lb.queue:
// 处理一个请求
default:
// 没有请求排队
}
}
}()
return lb
}
func (lb *LeakyBucketRateLimiter) Allow() bool {
select {
case lb.queue <- struct{}{}:
return true
default:
return false // 桶满
}
}
何时使用每个算法:
| 算法 | 最适合 | 优点 | 缺点 |
|---|---|---|---|
| 令牌桶 | 可变速率,允许突发 | 很好处理突发 | 可能浪费令牌 |
| 漏桶 | 固定输出率 | 可预测的输出 | 延迟所有请求 |
HTTP Dispatcher的自适应限流
标准限流还不够。供应商可能:
- 动态更改限制
- 施加临时限制
- 有分层限制(高级版 vs 标准版)
自适应限流:
type AdaptiveRateLimiter struct {
buckets map[string]*TokenBucketRateLimiter
history map[string][]RateLimitEvent
mu sync.RWMutex
}
type RateLimitEvent struct {
Timestamp time.Time
Allowed bool
StatusCode int
}
func (arl *AdaptiveRateLimiter) Allow(supplier string) bool {
arl.mu.Lock()
defer arl.mu.Unlock()
// 获取或为供应商创建桶
bucket, exists := arl.buckets[supplier]
if !exists {
bucket = arl.createBucket(supplier)
arl.buckets[supplier] = bucket
}
// 检查允许
allowed := bucket.Allow()
// 跟踪事件
arl.history[supplier] = append(arl.history[supplier], RateLimitEvent{
Timestamp: time.Now(),
Allowed: allowed,
})
// 分析和调整
arl.analyzeAndAdapt(supplier)
return allowed
}
func (arl *AdaptiveRateLimiter) analyzeAndAdapt(supplier string) {
events := arl.history[supplier]
// 只保留最近100个事件
if len(events) > 100 {
events = events[len(events)-100:]
}
// 计算429错误率
var four29Count int
for _, event := range events {
if event.StatusCode == 429 {
four29Count++
}
}
four29Rate := float64(four29Count) / float64(len(events))
// 如果429率 > 5%,降低20%速率
if four29Rate > 0.05 {
bucket := arl.buckets[supplier]
bucket.rate *= 0.8
log.Warn("降低限流速率", "supplier", supplier, "new_rate", bucket.rate)
}
}
好处:
- 自适应调整到供应商行为
- 自动减少429错误
- 动态优化吞吐量
第二部分:连接池 - 效率提升器
为什么连接池很重要
HTTP连接开销:
TCP 3次握手: 1 RTT
TLS握手: 2 RTTs (完整) 或 1 RTT (会话恢复)
DNS查找: 0-1 RTT (缓存)
─────────────────────────────────
总计: 每个新连接2-4 RTTs
假设50ms RTT:
新连接: 100-200ms
复用连接: 5-10ms
无连接池 vs 有连接池:
| 场景 | 无连接池 | 有连接池 | 改进 |
|---|---|---|---|
| 100个请求 | 10-20秒 | 0.5-1秒 | 20倍更快 |
| 1000个请求 | 100-200秒 | 5-10秒 | 20倍更快 |
| 10000个请求 | 1000-2000秒 (16-33分钟) | 50-100秒 | 20倍更快 |
Go的内置HTTP客户端连接池
好消息:Go的http.Client已经有连接池!
// 默认客户端使用默认传输
client := &http.Client{}
// 默认: MaxIdleConns: 100, MaxIdleConnsPerHost: 2
但对于酒店API集成,默认值不够:
// 供应商API的更好配置
transport := &http.Transport{
MaxIdleConns: 200, // 总空闲连接
MaxIdleConnsPerHost: 20, // 每个供应商
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
// TLS配置
TLSClientConfig: &tls.Config{
InsecureSkipVerify: false,
MinVersion: tls.VersionTLS12,
},
// Keep-alive
DisableKeepAlives: false,
MaxConnsPerHost: 50, // 每个主机的最大活动连接
}
client := &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}
HTTP Dispatcher的智能池管理
关键特性:
1. 每供应商池配置
type SupplierConfig struct {
BaseURL string
RateLimit float64
MaxIdleConns int
MaxConnsPerHost int
ConnectionTimeout time.Duration
IdleTimeout time.Duration
}
var supplierConfigs = map[string]SupplierConfig{
"hotelbeds": {
BaseURL: "https://api.hotelbeds.com",
RateLimit: 10.0, // 10 req/s
MaxIdleConns: 20,
MaxConnsPerHost: 30,
ConnectionTimeout: 5 * time.Second,
IdleTimeout: 60 * time.Second,
},
"dida": {
BaseURL: "https://api.dida.travel",
RateLimit: 20.0, // 20 req/s
MaxIdleConns: 30,
MaxConnsPerHost: 40,
ConnectionTimeout: 3 * time.Second,
IdleTimeout: 90 * time.Second,
},
}
2. 动态池大小调整
type PoolManager struct {
pools map[string]*http.Client
stats map[string]PoolStats
mu sync.RWMutex
}
type PoolStats struct {
ActiveConnections int
IdleConnections int
WaitTime time.Duration
ErrorRate float64
}
func (pm *PoolManager) GetClient(supplier string) *http.Client {
pm.mu.RLock()
client, exists := pm.pools[supplier]
pm.mu.RUnlock()
if exists {
return client
}
// 创建新池
return pm.createPool(supplier)
}
func (pm *PoolManager) Monitor() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
pm.adjustPoolSizes()
}
}
func (pm *PoolManager) adjustPoolSizes() {
pm.mu.Lock()
defer pm.mu.Unlock()
for supplier, stats := range pm.stats {
config := supplierConfigs[supplier]
// 如果错误率高,减少池大小
if stats.ErrorRate > 0.1 {
config.MaxConnsPerHost = int(float64(config.MaxConnsPerHost) * 0.8)
log.Warn("由于错误减少池大小", "supplier", supplier)
}
// 如果等待时间长,增加池大小
if stats.WaitTime > 100*time.Millisecond {
config.MaxConnsPerHost = int(float64(config.MaxConnsPerHost) * 1.2)
log.Info("由于等待时间增加池大小", "supplier", supplier)
}
}
}
3. 健康检查
func (pm *PoolManager) HealthCheck() {
for supplier, client := range pm.pools {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", "/health", nil)
if err != nil {
log.Error("健康检查失败", "supplier", supplier, "error", err)
continue
}
resp, err := client.Do(req)
if err != nil {
log.Error("健康检查失败", "supplier", supplier, "error", err)
pm.resetPool(supplier)
continue
}
resp.Body.Close()
if resp.StatusCode >= 500 {
log.Warn("供应商不健康", "supplier", supplier, "status", resp.StatusCode)
pm.enableCircuitBreaker(supplier)
}
}
}
第三部分:整合
完整的HTTP Dispatcher流程
type HTTPDispatcher struct {
rateLimiter *AdaptiveRateLimiter
poolManager *PoolManager
prioritizer *RequestPrioritizer
retryHandler *RetryHandler
metrics *MetricsCollector
}
func (d *HTTPDispatcher) Submit(req *Request) (*Response, error) {
start := time.Now()
// 1. 带优先级入队
d.prioritizer.Enqueue(req)
// 2. 等待轮次(优先级队列)
d.prioritizer.WaitTurn(req)
// 3. 检查限流
for !d.rateLimiter.Allow(req.Supplier) {
time.Sleep(10 * time.Millisecond)
}
// 4. 从池获取连接
client := d.poolManager.GetClient(req.Supplier)
// 5. 执行请求(带重试)
var resp *http.Response
var err error
for attempt := 0; attempt < 3; attempt++ {
resp, err = d.doRequest(client, req)
if err == nil {
break
}
// 处理限流
if resp != nil && resp.StatusCode == 429 {
d.rateLimiter.Backoff(req.Supplier)
time.Sleep(d.getBackoffDuration(attempt))
continue
}
// 处理服务器错误
if resp != nil && resp.StatusCode >= 500 {
time.Sleep(d.getBackoffDuration(attempt))
continue
}
break
}
// 6. 记录指标
duration := time.Since(start)
d.metrics.Record(req.Supplier, duration, resp.StatusCode)
return &Response{
StatusCode: resp.StatusCode,
Body: resp.Body,
Duration: duration,
}, err
}
实际性能影响
案例研究:OTA平台
使用HTTP Dispatcher之前:
并发请求:1000
429错误率:42%
P50延迟:800ms
P95延迟:5200ms
P99延迟:12400ms
连接数:5000+
使用HTTP Dispatcher之后:
并发请求:1000
429错误率:0.8%
P50延迟:120ms
P95延迟:680ms
P99延迟:1200ms
连接数:150
指标:
- 429错误减少98%
- P95延迟降低87%
- P99延迟降低90%
- 连接数减少97%
- 吞吐量增加6倍
最佳实践
限流最佳实践
- 仔细跟踪供应商限制 - 它们可能在没有通知的情况下更改
- 使用自适应算法 - 静态限制在现实世界中不起作用
- 监控429错误 - 它们表明需要调整限制
- 尊重Retry-After头 - 供应商发送它们是有原因的
- 实现退避 - 被限制时不要锤击
连接池最佳实践
- 配置每主机限制 - 一刀切不适合所有
- 监控池健康 - 重置不健康的连接
- 使用连接超时 - 不要永远等待
- 启用keep-alive - 对性能至关重要
- 设置适当的空闲超时 - 平衡内存与重连
总结
HTTP Dispatcher解决两个关键问题:
限流:
- 令牌桶和漏桶算法
- 基于429错误的自适应调整
- 每供应商限制跟踪
连接池:
- 智能池大小调整
- 每供应商配置
- 健康监控和自动恢复
结合:吞吐量增加6倍,延迟降低90%,429错误减少98%。
推荐阅读
系列导航
HTTP Dispatcher系列:
- 什么是HTTP Dispatcher及为什么需要
- HTTP Dispatcher如何解决限流和连接池 ← 你在这里
- 在Go中实现HTTP Dispatcher
- 真实案例研究和性能改进
评论