原创

透彻理解秒杀系统(六)——项目实战:秒杀抢购与限流

本章,我将讲解如何基于Nginx完成对秒杀抢购请求的限流,以及秒杀抢购服务对请求的处理。我们先来回顾一下,整个秒杀抢购链路的限流分为两块:

  • 全局限流:根据Nginx集群的可承载能力,控制总QPS;
  • 业务限流:根据每个场次的每个商品库存数量,控制QPS。


我会基于OpenResty实现限流脚本的编写,对OpenResty不熟悉的童鞋,可以先去看看我写的《分布式系统从理论到实战》系列的实战篇。

一、限流

使用OpenResty限流的核心就是编写Lua脚本,Lua本身是一门脚本语言,和Perl、Python之类的是差不多的。

关于Lua的基本使用,读者可以参考官网资料或者网上的教程:https://www.runoob.com/lua/lua-tutorial.html。

1.1 全局限流

全局限流的核心思想就是:

  1. 通过分布式配置中心或者磁盘文件或Redis动态配置全局流控参数;
  2. Nginx通过全局变量记录每秒内的请求数,每过来一个请求就将当前秒内的请求数累加1;
  3. 最后,将每秒内的请求数与全局流控参数比较,超过则拒绝访问,否则就放行。
-- 全局限流:流控参数,可以每隔一段时间从配置中心/Redis/文件读取
globalLimiting = 10000

-- 记录当前秒
currentTime = nil
-- 当前秒内的请求数
currentRequests = 0

timestamp = os.date("%Y-%m-%d %H:%M:%S")

if(currentTime == nil)
then
    currentTime = timestamp
end

if(currentTime == timestamp)
then
    if(currentRequests <= globalLimiting)
    then
        currentRequests = currentRequests + 1
    else
        -- 超过全局流控阈值,自定义一些响应给客户端
    end
else
    -- 新的一秒
    currentTime = timestamp
    currentRequests = 1
end

1.2 业务限流

业务限流的思路也是一样的:

  1. 动态配置读取每个秒杀商品的库存;
  2. Nginx通过全局变量记录每个商品的请求次数,每请求一次,次数累加1;
  3. 最后,将商品的请求数与配置的流控参数比较,超过则拒绝访问,否则就放行。
-- 业务限流:流控参数,可以每隔一段时间从配置中心/Redis/文件读取
currentSessionProductLimiting = {}
currentSessionProductLimiting[518] = 1000
currentSessionProductLimiting[629] = 10000
currentSessionProductLimiting[745] = 200

-- 记录每个商品的请求数
currentProductRequests = {}

-- 从Http请求参数中读取当前抢购的商品ID
local productId = 518
-- 读取该商品的已抢购次数
local productRequests = currentProductRequests[productId]

if(productRequests == nil or productRequests == 0)
    then
    -- 抢购次数+1
    currentProductRequests[productId] = 1
else
    -- 读取该商品的限流阈值
    local productLimiting = currentSessionProductLimiting[productId]
    -- 最多允许超过10%
    if(productRequests <= productLimiting * 1.1)
    then
        -- 放行,抢购次数+1
        currentProductRequests[productId] = productRequests + 1
    else
        -- 该秒杀商品的抢购次数已经超过了阈值的1.1倍
        -- 进行业务限流,自定义响应给客户端,说明抢购失败
    end
end

二、秒杀抢购

经过Nginx的限流之后,过滤的请求就进入了秒杀商品抢购服务seckill-flash-saleseckill-flash-sale主要就是做两件事:

  1. 利用Redis的Lua脚本执行库存扣减;
  2. 库存扣减成功后,发送下单通知到MQ。

2.1 接口

我们先来看秒杀抢购的接口逻辑,本质就是执行Redis Lua脚本完成库存扣减,扣减成功则发送下单通知到MQ:

/**
 * 秒杀抢购处理
 */
@RestController
@RequestMapping("/seckill/flash/sale")
public class FlashSaleController {

    /**
     * 用户对商品进行抢购,默认限定每个商品最多只能抢购一件
     *
     * @param userId    用户id
     * @param productId 商品id
     */
    @GetMapping("/deduct")
    public String flashSale(Long userId, Long productId) {
        // 1.基于Redis Lua进行库存扣减
        RedisCluster redisCluster = RedisCluster.getInstance();
        Boolean flashSaleResult = redisCluster.flashSale(userId, productId);

        // 2.如果秒杀抢购成功,发送下单消息到MQ
        if (flashSaleResult) {
            DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer();
            try {
                JSONObject flashSaleSuccessInform = new JSONObject();
                flashSaleSuccessInform.put("userId", userId);
                flashSaleSuccessInform.put("productId", productId);
                Message message = new Message( "flash_sale_success_inform", null,
                        flashSaleSuccessInform.toJSONString().getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                SendResult sendResult = producer.send(message);
                System.out.printf("%s%n", sendResult);
                System.out.println("推送秒杀抢购成功通知到MQ......");
            } catch (Exception e) {
                System.err.println("秒杀抢购成功通知推送到MQ失败:" + e);
                // 降级处理,把消息写入本地磁盘做一个积压,搞一个后台线程不停的尝试MQ是否恢复
                // 如果MQ恢复了,就可以把本地磁盘积压的消息发送出去
                return "秒杀抢购成功,但是推送消息到MQ失败";
            }
            return "秒杀抢购成功";
        } else {
            return "秒杀抢购失败";
        }
    }
}

2.2 库存扣减

库存扣减直接通过Lua脚本完成,由Redis保证了原子性。这里要注意的是分片轮询扣减库存的策略,因为库存是分片存储在Redis节点中的,所以当某个节点扣减库存失败后,还需要轮询其它节点:

// RedisCluster.java


/**
 * 基于Redis集群进行秒杀抢购
 */
public Boolean flashSale(Long userId, Long productId) {
    // 随机选择一个Redis的节点
    int redisNodeCount = cluster.size();
    int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount);
    Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex);

    // 向redis节点提交一个lua脚本进行抢购
    String flashSaleLuaScript = ""
        + "local productKey = 'seckill::product::" + productId + "::stock';"
        + "local salingStock = redis.call('hget', productKey, 'salingStock') + 0;"
        + "local lockedStock = redis.call('hget', productKey, 'lockedStock') + 0;"
        + "if(salingStock > 0) "
        + "then "
        + "redis.call('hset', productKey, 'salingStock', salingStock - 1);"
        + "redis.call('hset', productKey, 'lockedStock', lockedStock + 1);"
        + "return 'success';"
        + "else "
        + "return 'fail';"
        + "end;";

    String flashSaleResult = null;
    try {
        flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript);
    } catch(Exception e) {
        // 这里报错,说明某台Redis宕机了,则执行分片轮询扣减库存逻辑,即从其它机器进行秒杀
        try {
            return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId);
        } catch(Exception e1) {
            // 这里报错,说明所有Redis都宕机了,则可以执行降级逻辑:将秒杀请求写入本地磁盘
            return false;
        }
    }

    // 如果秒杀抢购成功
    if("success".equals(flashSaleResult)) {
        // 记录抢购成功所在的分片,后续接受订单支付通知时有用
        JedisManager jedisManager = JedisManager.getInstance();
        Jedis jedis = jedisManager.getJedis();
        jedis.set("flash_sale::stock_shard::" + userId + "::" + productId,
                  String.valueOf(chosenRedisNodeIndex));
        return true;
    }
    // 如果秒杀抢购失败,则执行库存分片迁移
    else {
        try {
            return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId);
        } catch(Exception e) {
            // 在这里就可以写所有的redis节点都崩溃的逻辑了
            // 就可以尝试把请求给写入到本地磁盘去,让抢购状态保持在抢购中
            return false;
        }
    }
}

/**
 * 分片轮询扣减库存
 */
private Boolean tryOtherStockShard(int failedStockShard, String flashSaleLuaScript,
                                   Long userId, Long productId) throws Exception {
    String flashSaleResult = null;
    Boolean flashSaleSuccess = false;
    Boolean allRedisNodeCrashed = true;

    for(int i = 0; i < cluster.size(); i++) {
        if(i != failedStockShard) {
            try {
                Jedis redisNode = cluster.get(i);
                flashSaleResult = (String) redisNode.eval(flashSaleLuaScript);
                allRedisNodeCrashed = false;
                if("success".equals(flashSaleResult)) {
                    JedisManager jedisManager = JedisManager.getInstance();
                    Jedis jedis = jedisManager.getJedis();
                    jedis.set("flash_sale::stock_shard::" + userId + "::" + productId,
                              String.valueOf(i));
                    flashSaleSuccess = true;
                    break;
                }
            } catch(Exception e) {
                // 在尝试其他节点进行抢购的时候,其他某个节点也出现了宕机问题
            }
        }
    }
    // 如果说所有的redis节点都崩溃了
    if(allRedisNodeCrashed) {
        throw new Exception("所有Redis节点都崩溃了!!!");
    }
    return flashSaleSuccess;
}

三、下单

秒杀抢购成功后,秒杀商品抢购服务seckill-flash-sale会将下单通知扔到MQ中。秒杀下单服务seckill-order监听到通知后,会调用订单中心接口执行下单操作:

@Component
public class BootListener implements CommandLineRunner {
    public static final Long ORDER_RATE_LIMIT = 500L;

    public void run(String... strings) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-order-consumer-group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("flash_sale_success_inform", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts,
                                                            ConsumeConcurrentlyContext context) {
                for(MessageExt messageExt : messageExts) {
                    JSONObject flashSaleSuccessInform = JSONObject.parseObject(
                            new String(messageExt.getBody()));】

                    // 1.检查消息时间戳,如果超过阈值,就认为出现了消息积压
                    // 则执行fail-fast机制,直接推“支付失败”的消息到MQ,这样秒杀库存管理服务监听到后就会对Redis库存回滚

                    // 2.模拟调用订单中心提供的接口进行下单
                    Long userId = flashSaleSuccessInform.getLong("userId");
                    Long productId = flashSaleSuccessInform.getLong("productId");

                    // 3.下单操作限流
                    JedisManager jedisManager = JedisManager.getInstance();
                    Jedis jedis = jedisManager.getJedis();
                    Boolean orderResult = false;
                    while(!orderResult) {
                        Date now = new Date();
                        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String currentSecond = dateFormat.format(now);

                        Long result = jedis.incr("flash_sale::order::rate_limit::" + currentSecond);
                        if(result < ORDER_RATE_LIMIT) {
                            System.out.println("调用调用订单中心提供的接口进行秒杀抢购的下单,用户id: "
                                    + userId + ", 商品id: " + productId);
                            // 如果说订单系统崩溃了
                            // 那么你的消费线程应该进入阻塞,就不要消费后面的消息了
                            // 阻塞个几分钟过后,再尝试调用订单系统去进行下单
                            orderResult = true;
                        } else {
                            // 如果当前这一秒限流了,此时休眠一秒,下一秒继续进行下单就可以了
                            try {
                                Thread.sleep(1000);
                            } catch(InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动......");
    }
}

这里关键要注意的是,如果出现消息积压的情况,则需要执行fail-fast机制。

四、总结

本章,我对秒杀系统最核心的链路:秒杀抢购链路进行了讲解。秒杀抢购链路的核心时基于Redis完成库存的扣减,扣减成功后发送下单通知。

正文到此结束

感谢赞赏~

本文目录