这是”为什么酒店API集成这么难?”系列的第3篇。

第1篇:认证地狱 | 第2篇:数据混乱 | 第4篇:错误处理(即将发布)


你的搜索功能上线第1天就被供应商拉黑了吗?

真实场景

你的酒店搜索功能上线了!

第1天

  • 早上9点:一切正常,搜索响应时间300ms
  • 上午10点:用户量增加,搜索QPS达到100
  • 上午10:05:供应商A开始返回429 Too Many Requests
  • 上午10:10:供应商B返回403 Forbidden
  • 上午10:15:供应商C返回429 Too Many Requests

你的用户投诉

  • “为什么搜不到XX酒店?”
  • “为什么搜索一直转圈?”
  • “为什么显示错误?”

你打开供应商后台,看到了这个

Rate Limit Status: EXCEEDED
Next Reset Time: 2026-02-06 16:00:00 UTC (3 hours later)
Your API Key has been temporarily suspended.

问题来了

你只接入了5家供应商,上线第1天就被拉黑了3家。

  • 为什么?
  • 供应商的限流规则是什么?
  • 为什么有些酒店永远搜不到?

痛点1:每个供应商限流规则不同

5家供应商,5种不同的限流规则

供应商 QPS限制 时间窗口 限流策略 超限后
A 10 每秒 固定窗口 429错误
B 100 每分钟 滑动窗口 403错误
C 1000 每小时 令牌桶 429错误
D 500 每分钟 滑动窗口 暂停API Key
E 无限制 N/A N/A

限流策略的差异

固定窗口(供应商A)

时间窗口:每秒
限制:10 QPS

问题

  • ❌ 第1秒0.99秒:发送10个请求 → OK
  • ❌ 第2秒0.01秒:发送10个请求 → OK
  • ❌ 0.02秒内发送了20个请求 → 突发流量

供应商视角:你遵守了QPS限制,但是…

用户视角:为什么搜索有时候快,有时候慢?

滑动窗口(供应商B)

时间窗口:每分钟
限制:100 QPS

更精确,但更复杂

  • 记录每个请求的时间戳
  • 统计过去60秒内的请求数
  • 如果超过100,拒绝

问题

  • ⚠️ 需要维护请求时间戳列表
  • ⚠️ 内存占用大(如果QPS很高)
  • ⚠️ 实现复杂

令牌桶(供应商C)

时间窗口:每小时
限制:1000 QPS
令牌填充速率:~16.7 tokens/second
桶容量:1000 tokens

工作原理

  • 每秒向桶里添加16.7个令牌
  • 每个请求消耗1个令牌
  • 如果桶里没有令牌,拒绝请求

问题

  • ⚠️ 突发流量:如果桶是满的,可以瞬间发送1000个请求
  • ⚠️ 冷启动:刚开始时桶可能是空的

API Key暂停(供应商D)

超限后:暂停API Key 1小时
第二次超限:暂停API Key 24小时
第三次超限:永久禁用API Key

问题

  • ❌ 第一次超限就暂停1小时
  • ❌ 无法快速恢复
  • ❌ 你的搜索功能完全不可用

痛点2:热门酒店有更严格的限流

真实场景

你的用户在搜索伦敦的热门酒店

  • London Central Hotel(热门,搜索量是普通酒店的100倍)
  • London Budget Hotel(冷门,搜索量很低)

供应商的限流规则

  • 全局QPS限制:100/分钟
  • 单酒店QPS限制:10/分钟

问题来了

第1个用户搜索

  • London Central Hotel → 搜索成功
  • London Budget Hotel → 搜索成功

第100个用户搜索

  • London Central Hotel → 429 Too Many Requests
  • London Budget Hotel → 搜索成功

用户投诉

  • “为什么我搜不到London Central Hotel?”
  • “为什么有时候能搜到,有时候搜不到?”

热点检测的重要性

供应商的视角

  • 热门酒店:搜索量太大,保护供应商的后端
  • 单酒店限流:防止某个热门酒店拖垮整个系统

你的问题

  • 你不知道哪些是热门酒店
  • 你不知道每个供应商的单酒店限流规则
  • 你不知道如何避开热点

痛点3:突发流量导致超限

真实场景

你的搜索功能平时稳定

  • 平均QPS:50
  • 响应时间:300ms
  • 没有被限流

某天上午10点

  • 某个网红发布了你们网站的推荐
  • 突然涌入10000个用户
  • 瞬间QPS达到500

结果

  • 供应商A:429 Too Many Requests
  • 供应商B:403 Forbidden
  • 供应商C:429 Too Many Requests
  • 供应商D:API Key暂停
  • 供应商E:超时(后端过载)

用户投诉

  • “为什么网站这么慢?”
  • “为什么一直显示错误?”
  • “我用不了了!”

问题来了

你的系统设计时假设平均QPS是50,但突发流量是500。

  • 你应该怎么做?
  • 提前预估流量?
  • 实施缓存?
  • 队列化管理?

痛点4:缓存失效导致重复调用

真实场景

你实施了缓存

  • 搜索结果缓存5分钟
  • 用户搜索同一个查询,直接返回缓存

效果

  • 响应时间:300ms → 50ms
  • QPS:500 → 50
  • 供应商API调用减少90%

但是…

某个热门酒店的价格突然变化了

  • 用户A搜索:$150(缓存)
  • 用户B搜索:$150(缓存)
  • 用户C搜索:$150(缓存)
  • 供应商价格变了:$180

用户D搜索

  • 缓存失效
  • 调用供应商API
  • 价格:$180

用户投诉

  • “为什么价格变了?”
  • “我朋友看到$150,我为什么是$180?”

问题来了

如果你不缓存

  • 响应时间慢(300ms)
  • QPS高(500)
  • 容易被限流

如果你缓存

  • 响应时间快(50ms)
  • QPS低(50)
  • 但价格不准确

如何平衡?


痛点5:多供应商并发调用导致超限

真实场景

你的搜索功能并发调用5家供应商

async def search_hotel(hotel_id):
    tasks = []
    for supplier in suppliers:
        task = asyncio.create_task(supplier.search(hotel_id))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return results

单个用户的搜索

  • 并发调用5家供应商
  • 每家供应商收到1个请求
  • QPS:5

100个用户同时搜索

  • 并发调用500家供应商(5家×100用户)
  • 每家供应商收到100个请求
  • QPS:100

500个用户同时搜索

  • 并发调用2500家供应商(5家×500用户)
  • 每家供应商收到500个请求
  • QPS:500

供应商A的限流规则

  • 10 QPS(每秒)

结果

  • 100个用户同时搜索:超限(100 > 10)
  • 500个用户同时搜索:超限(500 > 10)

问题来了

你的用户增长很快,但你被限流了。

  • 你应该限制用户并发吗?
  • 你应该实施队列吗?
  • 你应该缓存吗?

我们如何解决

核心思路:智能限流管理

问题:5家供应商,5种不同的限流规则,很容易超限。

解决方案:智能限流管理 + 热点识别 + 多级缓存 + 队列化管理

用户请求 → 队列化管理 → 智能限流 → 热点检测 → 多级缓存 → 供应商API

1. 智能限流管理

自动识别限流规则

class RateLimiter:
    def __init__(self, supplier):
        self.supplier = supplier
        self.rules = self._detect_rate_limit_rules()
    
    def _detect_rate_limit_rules(self):
        # 1. 从供应商文档读取限流规则
        docs_rules = self.supplier.rate_limit_docs
        
        # 2. 测试限流规则(渐进式测试)
        test_rules = self._test_rate_limits()
        
        # 3. 选择最严格的规则
        return self._merge_rules(docs_rules, test_rules)
    
    def _test_rate_limits(self):
        # 渐进式测试:从低QPS开始,逐步提高
        qps = 1
        max_qps = 1000
        
        while qps <= max_qps:
            success = self._test_qps(qps)
            if not success:
                break
            qps *= 2
        
        return {
            "qps_limit": qps // 2,
            "time_window": self._detect_time_window(),
            "strategy": self._detect_strategy()
        }

动态调整请求速率

async def request_with_rate_limit(supplier, request):
    rate_limiter = RateLimiter(supplier)
    
    # 1. 检查当前请求速率
    current_qps = rate_limiter.get_current_qps()
    max_qps = rate_limiter.get_max_qps()
    
    # 2. 如果接近限流,降低速率
    if current_qps > max_qps * 0.8:
        # 降低80%的速率
        await asyncio.sleep(1.0 / (max_qps * 0.2))
    
    # 3. 发送请求
    response = await supplier.request(request)
    
    # 4. 检查是否被限流
    if response.status_code == 429:
        # 被限流,等待重试
        retry_after = response.headers.get('Retry-After', 60)
        await asyncio.sleep(retry_after)
        return await request_with_rate_limit(supplier, request)
    
    return response

2. 热点检测

实时识别热门酒店

class HotSpotDetector:
    def __init__(self):
        self.request_counts = {}  # hotel_id -> (count, timestamp)
        self.hotspots = set()
    
    def record_request(self, hotel_id):
        now = time.time()
        
        # 1. 记录请求
        self.request_counts[hotel_id] = (
            self.request_counts.get(hotel_id, (0, 0))[0] + 1,
            now
        )
        
        # 2. 清理过期数据(> 1分钟)
        self._cleanup_expired_data(now - 60)
        
        # 3. 检测热点
        self._detect_hotspots()
    
    def _detect_hotspots(self):
        # 1. 计算每个酒店的QPS
        hotel_qps = {}
        for hotel_id, (count, timestamp) in self.request_counts.items():
            hotel_qps[hotel_id] = count / 60.0  # 每分钟
        
        # 2. 识别热点(QPS > 平均值 * 10)
        avg_qps = sum(hotel_qps.values()) / len(hotel_qps)
        self.hotspots = {
            hotel_id for hotel_id, qps in hotel_qps.items()
            if qps > avg_qps * 10
        }
    
    def is_hotspot(self, hotel_id):
        return hotel_id in self.hotspots

热点酒店的限流策略

async def search_hotel_with_hotspot_protection(hotel_id):
    hotspot_detector = HotSpotDetector()
    
    # 1. 记录请求
    hotspot_detector.record_request(hotel_id)
    
    # 2. 检测是否是热点
    if hotspot_detector.is_hotspot(hotel_id):
        # 热点酒店:降低请求频率
        await asyncio.sleep(1.0)  # 每秒最多1个请求
    else:
        # 普通酒店:正常请求
        pass
    
    # 3. 搜索酒店
    return await search_hotel(hotel_id)

3. 多级缓存

缓存层次

L1: 本地内存缓存(热数据,秒级TTL)
  ↓ miss
L2: Redis缓存(温数据,分钟级TTL)
  ↓ miss
L3: 供应商API(实时数据)

自适应TTL

def adaptive_ttl(hotel_id, check_in_date):
    # 1. 基础TTL:1分钟
    base_ttl = 60  # 秒
    
    # 2. 距离入住时间
    days_until = (check_in_date - datetime.now()).days
    time_factor = min(1.0, days_until / 30.0)
    
    # 3. 热门酒店:缩短TTL
    hotspot_detector = HotSpotDetector()
    if hotspot_detector.is_hotspot(hotel_id):
        hotspot_factor = 0.5  # 热点酒店TTL减半
    else:
        hotspot_factor = 1.0
    
    # 4. 计算最终TTL
    final_ttl = int(base_ttl * time_factor * hotspot_factor)
    
    return final_ttl

缓存实现

async def search_hotel_with_cache(hotel_id, check_in_date):
    cache_key = f"hotel:{hotel_id}:{check_in_date}"
    
    # 1. L1:本地缓存
    if cache_key in local_cache:
        return local_cache[cache_key]
    
    # 2. L2:Redis缓存
    cached = await redis.get(cache_key)
    if cached:
        data = json.loads(cached)
        local_cache[cache_key] = data
        return data
    
    # 3. L3:供应商API
    result = await search_hotel_from_supplier(hotel_id)
    
    # 4. 写入缓存
    ttl = adaptive_ttl(hotel_id, check_in_date)
    await redis.setex(cache_key, ttl, json.dumps(result))
    local_cache[cache_key] = result
    
    return result

4. 队列化管理

请求队列

class RequestQueue:
    def __init__(self, max_concurrent=10):
        self.queue = asyncio.Queue()
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.workers = []
    
    async def enqueue(self, request):
        await self.queue.put(request)
    
    async def start_workers(self):
        for i in range(self.max_concurrent):
            worker = asyncio.create_task(self._worker(i))
            self.workers.append(worker)
    
    async def _worker(self, worker_id):
        while True:
            request = await self.queue.get()
            
            async with self.semaphore:
                try:
                    result = await process_request(request)
                    request.complete(result)
                except Exception as e:
                    request.fail(e)
                finally:
                    self.queue.task_done()
    
    async def stop(self):
        for worker in self.workers:
            worker.cancel()

突发流量保护

async def search_hotel_with_queue_protection(hotel_id):
    # 1. 检查队列长度
    queue_length = request_queue.qsize()
    
    if queue_length > 1000:
        # 队列太长,拒绝新请求
        raise TooManyRequests("Please try again later")
    
    # 2. 加入队列
    future = asyncio.Future()
    await request_queue.enqueue(SearchRequest(
        hotel_id=hotel_id,
        future=future
    ))
    
    # 3. 等待结果
    result = await future
    return result

5. 降级策略

优雅降级

async def search_hotel_with_fallback(hotel_id):
    # 1. 尝试主供应商
    try:
        result = await search_hotel_from_supplier(supplier_a, hotel_id)
        return result
    except RateLimitError:
        # 被限流,尝试备用供应商
        pass
    
    # 2. 尝试备用供应商
    try:
        result = await search_hotel_from_supplier(supplier_b, hotel_id)
        return result
    except RateLimitError:
        # 备用供应商也被限流,返回缓存
        pass
    
    # 3. 返回缓存(可能过期)
    cached = await redis.get(f"hotel:{hotel_id}")
    if cached:
        return json.loads(cached)
    
    # 4. 实在没有办法了,返回错误
    raise ServiceUnavailable("All suppliers are rate limited")

我们的优势

1. 智能限流管理

  • 自动识别每个供应商的限流规则
  • 动态调整请求速率
  • 避免被限流
  • 自动重试被限流的请求

2. 热点检测

  • 实时识别热门酒店
  • 自动降低热点酒店的请求频率
  • 避免单酒店限流

3. 多级缓存

  • L1本地缓存 + L2 Redis缓存
  • 自适应TTL(热门酒店缩短TTL)
  • 减少90%的供应商API调用

4. 队列化管理

  • 请求队列避免突发流量
  • 限制并发数保护供应商
  • 优雅降级(缓存 → 备用供应商)

5. 完善的监控

  • 实时监控每个供应商的QPS
  • 实时监控被限流的次数
  • 实时监控缓存命中率
  • 实时监控队列长度

行动号召

你的搜索功能上线第1天就被拉黑了吗?

问题

  • 5家供应商,5种不同的限流规则
  • 热门酒店有更严格的限流
  • 突发流量导致超限
  • 多供应商并发调用导致超限
  • 缓存失效导致重复调用

我们的解决方案

  • 智能限流管理(自动识别限流规则,动态调整速率)
  • 热点检测(实时识别热门酒店,降低请求频率)
  • 多级缓存(L1+L2,减少90%API调用)
  • 队列化管理(避免突发流量,优雅降级)

你只需要

import "github.com/hotelbyte-com/sdk-go/hotelbyte"

client := hotelbyte.NewClient("YOUR_API_KEY")

// 搜索酒店(自动处理限流、热点、缓存)
result, err := client.SearchHotels(&hotelbyte.SearchRequest{
    Destination: "London",
    CheckIn:     time.Date(2026, 2, 10, 0, 0, 0, 0, time.UTC),
    CheckOut:    time.Date(2026, 2, 12, 0, 0, 0, 0, time.UTC),
    Guests:      2,
})

// 我们自动处理:
// - 限流管理
// - 热点检测
// - 多级缓存
// - 队列化管理
// - 优雅降级

for _, hotel := range result.Hotels {
    fmt.Printf("%s: $%.2f\n", hotel.Name, hotel.TotalPrice)
}

没有限流噩梦。没有429错误。只有稳定的搜索。


下一步

免费试用

  • 30天免费试用
  • 无需信用卡
  • 立即开始测试

免费试用30天

查看文档

联系我们

有问题?直接联系我们的工程师。

联系我们


系列导航


下一篇预告:错误处理 - 同一个错误,5种不同的HTTP状态码和错误消息


阅读时间:约15分钟 难度:中等(需要了解API限流、缓存、队列管理)