“我们从60%的供应商调用中得到429错误。高峰时段我们的客户看到错误页面。” — 运营经理

“我们的应用程序每小时创建10,000+个连接。云提供商的账单正在爆炸式增长。” — DevOps工程师

这些都是缺少HTTP Dispatcher的典型症状。让我们深入探讨它如何解决这些问题。

第一部分:限流 - 节流的艺术

理解供应商限流

限流无处不在,但它不是统一的:

供应商类型 常见限制 限制类型 突发允许
大型OTA(Expedia, Agoda) 100-500 req/s 每IP/密钥
中型OTA(HotelBeds, Dida) 10-50 req/s 每API密钥 有时
小型/细分供应商 2-10 req/s 严格
GDS系统 5-20 req/s 每会话
床位银行 20-100 req/min 每合同

限流算法

算法1:令牌桶(最常见)

工作原理

桶有:
- 容量:最大令牌数(限流)
- 填充率:每秒添加的令牌
- 令牌成本:每个请求1个令牌

请求流程:
1. 检查桶是否有令牌
2. 如果有:消耗1个令牌,允许请求
3. 如果没有:排队或拒绝请求

实现

type TokenBucketRateLimiter struct {
    capacity  float64
    rate      float64
    tokens    float64
    lastRefill time.Time
    mu        sync.Mutex
}

func (tb *TokenBucketRateLimiter) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()
    tb.lastRefill = now

    // 填充令牌
    tb.tokens += elapsed * tb.rate
    if tb.tokens > tb.capacity {
        tb.tokens = tb.capacity
    }

    // 检查是否有令牌
    if tb.tokens >= 1.0 {
        tb.tokens -= 1.0
        return true
    }

    return false
}

可视化

令牌: ██████████ (10/10) → 就绪
令牌: ████████░░ (8/10) → 请求允许
令牌: ██░░░░░░░░ (2/10) → 请求允许
令牌: ░░░░░░░░░░ (0/10) → 请求排队

... 2秒后(每秒填充20个令牌)...
令牌: ██████░░░░ (6/10) → 再次就绪

算法2:漏桶(固定输出率)

工作原理

桶有:
- 容量:最大队列大小
- 漏率:每秒处理的请求数
- 水位:当前队列大小

请求流程:
1. 如果桶满:拒绝
2. 如果桶不满:添加到队列
3. 队列以固定速率泄漏

实现

type LeakyBucketRateLimiter struct {
    capacity int
    rate     float64 // 每秒请求数
    queue    chan struct{}
    ticker   *time.Ticker
}

func NewLeakyBucketRateLimiter(capacity int, rate float64) *LeakyBucketRateLimiter {
    lb := &LeakyBucketRateLimiter{
        capacity: capacity,
        rate:     rate,
        queue:    make(chan struct{}, capacity),
    }

    // 以固定速率开始泄漏
    interval := time.Duration(1.0/rate*1000) * time.Millisecond
    lb.ticker = time.NewTicker(interval)
    go func() {
        for range lb.ticker.C {
            select {
            case <-lb.queue:
                // 处理一个请求
            default:
                // 没有请求排队
            }
        }
    }()

    return lb
}

func (lb *LeakyBucketRateLimiter) Allow() bool {
    select {
    case lb.queue <- struct{}{}:
        return true
    default:
        return false // 桶满
    }
}

何时使用每个算法

算法 最适合 优点 缺点
令牌桶 可变速率,允许突发 很好处理突发 可能浪费令牌
漏桶 固定输出率 可预测的输出 延迟所有请求

HTTP Dispatcher的自适应限流

标准限流还不够。供应商可能:

  • 动态更改限制
  • 施加临时限制
  • 有分层限制(高级版 vs 标准版)

自适应限流

type AdaptiveRateLimiter struct {
    buckets map[string]*TokenBucketRateLimiter
    history map[string][]RateLimitEvent
    mu      sync.RWMutex
}

type RateLimitEvent struct {
    Timestamp time.Time
    Allowed   bool
    StatusCode int
}

func (arl *AdaptiveRateLimiter) Allow(supplier string) bool {
    arl.mu.Lock()
    defer arl.mu.Unlock()

    // 获取或为供应商创建桶
    bucket, exists := arl.buckets[supplier]
    if !exists {
        bucket = arl.createBucket(supplier)
        arl.buckets[supplier] = bucket
    }

    // 检查允许
    allowed := bucket.Allow()

    // 跟踪事件
    arl.history[supplier] = append(arl.history[supplier], RateLimitEvent{
        Timestamp: time.Now(),
        Allowed:   allowed,
    })

    // 分析和调整
    arl.analyzeAndAdapt(supplier)

    return allowed
}

func (arl *AdaptiveRateLimiter) analyzeAndAdapt(supplier string) {
    events := arl.history[supplier]

    // 只保留最近100个事件
    if len(events) > 100 {
        events = events[len(events)-100:]
    }

    // 计算429错误率
    var four29Count int
    for _, event := range events {
        if event.StatusCode == 429 {
            four29Count++
        }
    }

    four29Rate := float64(four29Count) / float64(len(events))

    // 如果429率 > 5%,降低20%速率
    if four29Rate > 0.05 {
        bucket := arl.buckets[supplier]
        bucket.rate *= 0.8
        log.Warn("降低限流速率", "supplier", supplier, "new_rate", bucket.rate)
    }
}

好处

  • 自适应调整到供应商行为
  • 自动减少429错误
  • 动态优化吞吐量

第二部分:连接池 - 效率提升器

为什么连接池很重要

HTTP连接开销

TCP 3次握手: 1 RTT
TLS握手: 2 RTTs (完整) 或 1 RTT (会话恢复)
DNS查找: 0-1 RTT (缓存)
─────────────────────────────────
总计: 每个新连接2-4 RTTs

假设50ms RTT:
新连接: 100-200ms
复用连接: 5-10ms

无连接池 vs 有连接池

场景 无连接池 有连接池 改进
100个请求 10-20秒 0.5-1秒 20倍更快
1000个请求 100-200秒 5-10秒 20倍更快
10000个请求 1000-2000秒 (16-33分钟) 50-100秒 20倍更快

Go的内置HTTP客户端连接池

好消息:Go的http.Client已经有连接池!

// 默认客户端使用默认传输
client := &http.Client{}
// 默认: MaxIdleConns: 100, MaxIdleConnsPerHost: 2

但对于酒店API集成,默认值不够

// 供应商API的更好配置
transport := &http.Transport{
    MaxIdleConns:        200,      // 总空闲连接
    MaxIdleConnsPerHost: 20,       // 每个供应商
    IdleConnTimeout:     90 * time.Second,
    DisableCompression:  false,
    // TLS配置
    TLSClientConfig: &tls.Config{
        InsecureSkipVerify: false,
        MinVersion:         tls.VersionTLS12,
    },
    // Keep-alive
    DisableKeepAlives: false,
    MaxConnsPerHost:   50, // 每个主机的最大活动连接
}

client := &http.Client{
    Transport: transport,
    Timeout:   30 * time.Second,
}

HTTP Dispatcher的智能池管理

关键特性

1. 每供应商池配置

type SupplierConfig struct {
    BaseURL             string
    RateLimit          float64
    MaxIdleConns       int
    MaxConnsPerHost    int
    ConnectionTimeout  time.Duration
    IdleTimeout        time.Duration
}

var supplierConfigs = map[string]SupplierConfig{
    "hotelbeds": {
        BaseURL:            "https://api.hotelbeds.com",
        RateLimit:         10.0,  // 10 req/s
        MaxIdleConns:      20,
        MaxConnsPerHost:   30,
        ConnectionTimeout: 5 * time.Second,
        IdleTimeout:       60 * time.Second,
    },
    "dida": {
        BaseURL:            "https://api.dida.travel",
        RateLimit:         20.0,  // 20 req/s
        MaxIdleConns:      30,
        MaxConnsPerHost:   40,
        ConnectionTimeout: 3 * time.Second,
        IdleTimeout:       90 * time.Second,
    },
}

2. 动态池大小调整

type PoolManager struct {
    pools map[string]*http.Client
    stats map[string]PoolStats
    mu    sync.RWMutex
}

type PoolStats struct {
    ActiveConnections int
    IdleConnections  int
    WaitTime         time.Duration
    ErrorRate        float64
}

func (pm *PoolManager) GetClient(supplier string) *http.Client {
    pm.mu.RLock()
    client, exists := pm.pools[supplier]
    pm.mu.RUnlock()

    if exists {
        return client
    }

    // 创建新池
    return pm.createPool(supplier)
}

func (pm *PoolManager) Monitor() {
    ticker := time.NewTicker(30 * time.Second)
    for range ticker.C {
        pm.adjustPoolSizes()
    }
}

func (pm *PoolManager) adjustPoolSizes() {
    pm.mu.Lock()
    defer pm.mu.Unlock()

    for supplier, stats := range pm.stats {
        config := supplierConfigs[supplier]

        // 如果错误率高,减少池大小
        if stats.ErrorRate > 0.1 {
            config.MaxConnsPerHost = int(float64(config.MaxConnsPerHost) * 0.8)
            log.Warn("由于错误减少池大小", "supplier", supplier)
        }

        // 如果等待时间长,增加池大小
        if stats.WaitTime > 100*time.Millisecond {
            config.MaxConnsPerHost = int(float64(config.MaxConnsPerHost) * 1.2)
            log.Info("由于等待时间增加池大小", "supplier", supplier)
        }
    }
}

3. 健康检查

func (pm *PoolManager) HealthCheck() {
    for supplier, client := range pm.pools {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        req, err := http.NewRequestWithContext(ctx, "GET", "/health", nil)
        if err != nil {
            log.Error("健康检查失败", "supplier", supplier, "error", err)
            continue
        }

        resp, err := client.Do(req)
        if err != nil {
            log.Error("健康检查失败", "supplier", supplier, "error", err)
            pm.resetPool(supplier)
            continue
        }

        resp.Body.Close()

        if resp.StatusCode >= 500 {
            log.Warn("供应商不健康", "supplier", supplier, "status", resp.StatusCode)
            pm.enableCircuitBreaker(supplier)
        }
    }
}

第三部分:整合

完整的HTTP Dispatcher流程

type HTTPDispatcher struct {
    rateLimiter    *AdaptiveRateLimiter
    poolManager    *PoolManager
    prioritizer    *RequestPrioritizer
    retryHandler   *RetryHandler
    metrics        *MetricsCollector
}

func (d *HTTPDispatcher) Submit(req *Request) (*Response, error) {
    start := time.Now()

    // 1. 带优先级入队
    d.prioritizer.Enqueue(req)

    // 2. 等待轮次(优先级队列)
    d.prioritizer.WaitTurn(req)

    // 3. 检查限流
    for !d.rateLimiter.Allow(req.Supplier) {
        time.Sleep(10 * time.Millisecond)
    }

    // 4. 从池获取连接
    client := d.poolManager.GetClient(req.Supplier)

    // 5. 执行请求(带重试)
    var resp *http.Response
    var err error

    for attempt := 0; attempt < 3; attempt++ {
        resp, err = d.doRequest(client, req)
        if err == nil {
            break
        }

        // 处理限流
        if resp != nil && resp.StatusCode == 429 {
            d.rateLimiter.Backoff(req.Supplier)
            time.Sleep(d.getBackoffDuration(attempt))
            continue
        }

        // 处理服务器错误
        if resp != nil && resp.StatusCode >= 500 {
            time.Sleep(d.getBackoffDuration(attempt))
            continue
        }

        break
    }

    // 6. 记录指标
    duration := time.Since(start)
    d.metrics.Record(req.Supplier, duration, resp.StatusCode)

    return &Response{
        StatusCode: resp.StatusCode,
        Body:       resp.Body,
        Duration:   duration,
    }, err
}

实际性能影响

案例研究:OTA平台

使用HTTP Dispatcher之前

并发请求:1000
429错误率:42%
P50延迟:800ms
P95延迟:5200ms
P99延迟:12400ms
连接数:5000+

使用HTTP Dispatcher之后

并发请求:1000
429错误率:0.8%
P50延迟:120ms
P95延迟:680ms
P99延迟:1200ms
连接数:150

指标

  • 429错误减少98%
  • P95延迟降低87%
  • P99延迟降低90%
  • 连接数减少97%
  • 吞吐量增加6倍

最佳实践

限流最佳实践

  1. 仔细跟踪供应商限制 - 它们可能在没有通知的情况下更改
  2. 使用自适应算法 - 静态限制在现实世界中不起作用
  3. 监控429错误 - 它们表明需要调整限制
  4. 尊重Retry-After头 - 供应商发送它们是有原因的
  5. 实现退避 - 被限制时不要锤击

连接池最佳实践

  1. 配置每主机限制 - 一刀切不适合所有
  2. 监控池健康 - 重置不健康的连接
  3. 使用连接超时 - 不要永远等待
  4. 启用keep-alive - 对性能至关重要
  5. 设置适当的空闲超时 - 平衡内存与重连

总结

HTTP Dispatcher解决两个关键问题:

限流

  • 令牌桶和漏桶算法
  • 基于429错误的自适应调整
  • 每供应商限制跟踪

连接池

  • 智能池大小调整
  • 每供应商配置
  • 健康监控和自动恢复

结合:吞吐量增加6倍,延迟降低90%,429错误减少98%。

下一篇在Go中实现HTTP Dispatcher


推荐阅读


系列导航

HTTP Dispatcher系列

  1. 什么是HTTP Dispatcher及为什么需要
  2. HTTP Dispatcher如何解决限流和连接池 ← 你在这里
  3. 在Go中实现HTTP Dispatcher
  4. 真实案例研究和性能改进