透彻理解秒杀系统(六)——项目实战:秒杀抢购与限流
本章,我将讲解如何基于Nginx完成对秒杀抢购请求的限流,以及秒杀抢购服务对请求的处理。我们先来回顾一下,整个秒杀抢购链路的限流分为两块:
- 全局限流:根据Nginx集群的可承载能力,控制总QPS;
- 业务限流:根据每个场次的每个商品库存数量,控制QPS。

我会基于OpenResty实现限流脚本的编写,对OpenResty不熟悉的童鞋,可以先去看看我写的《分布式系统从理论到实战》系列的实战篇。
一、限流
使用OpenResty限流的核心就是编写Lua脚本,Lua本身是一门脚本语言,和Perl、Python之类的是差不多的。
关于Lua的基本使用,读者可以参考官网资料或者网上的教程:https://www.runoob.com/lua/lua-tutorial.html。
1.1 全局限流
全局限流的核心思想就是:
- 通过分布式配置中心或者磁盘文件或Redis动态配置全局流控参数;
- Nginx通过全局变量记录每秒内的请求数,每过来一个请求就将当前秒内的请求数累加1;
- 最后,将每秒内的请求数与全局流控参数比较,超过则拒绝访问,否则就放行。
-- 全局限流:流控参数,可以每隔一段时间从配置中心/Redis/文件读取
globalLimiting = 10000
-- 记录当前秒
currentTime = nil
-- 当前秒内的请求数
currentRequests = 0
timestamp = os.date("%Y-%m-%d %H:%M:%S")
if(currentTime == nil)
then
currentTime = timestamp
end
if(currentTime == timestamp)
then
if(currentRequests <= globalLimiting)
then
currentRequests = currentRequests + 1
else
-- 超过全局流控阈值,自定义一些响应给客户端
end
else
-- 新的一秒
currentTime = timestamp
currentRequests = 1
end
1.2 业务限流
业务限流的思路也是一样的:
- 动态配置读取每个秒杀商品的库存;
- Nginx通过全局变量记录每个商品的请求次数,每请求一次,次数累加1;
- 最后,将商品的请求数与配置的流控参数比较,超过则拒绝访问,否则就放行。
-- 业务限流:流控参数,可以每隔一段时间从配置中心/Redis/文件读取
currentSessionProductLimiting = {}
currentSessionProductLimiting[518] = 1000
currentSessionProductLimiting[629] = 10000
currentSessionProductLimiting[745] = 200
-- 记录每个商品的请求数
currentProductRequests = {}
-- 从Http请求参数中读取当前抢购的商品ID
local productId = 518
-- 读取该商品的已抢购次数
local productRequests = currentProductRequests[productId]
if(productRequests == nil or productRequests == 0)
then
-- 抢购次数+1
currentProductRequests[productId] = 1
else
-- 读取该商品的限流阈值
local productLimiting = currentSessionProductLimiting[productId]
-- 最多允许超过10%
if(productRequests <= productLimiting * 1.1)
then
-- 放行,抢购次数+1
currentProductRequests[productId] = productRequests + 1
else
-- 该秒杀商品的抢购次数已经超过了阈值的1.1倍
-- 进行业务限流,自定义响应给客户端,说明抢购失败
end
end
二、秒杀抢购
经过Nginx的限流之后,过滤的请求就进入了秒杀商品抢购服务seckill-flash-sale
。seckill-flash-sale
主要就是做两件事:
- 利用Redis的Lua脚本执行库存扣减;
- 库存扣减成功后,发送下单通知到MQ。
2.1 接口
我们先来看秒杀抢购的接口逻辑,本质就是执行Redis Lua脚本完成库存扣减,扣减成功则发送下单通知到MQ:
/**
* 秒杀抢购处理
*/
@RestController
@RequestMapping("/seckill/flash/sale")
public class FlashSaleController {
/**
* 用户对商品进行抢购,默认限定每个商品最多只能抢购一件
*
* @param userId 用户id
* @param productId 商品id
*/
@GetMapping("/deduct")
public String flashSale(Long userId, Long productId) {
// 1.基于Redis Lua进行库存扣减
RedisCluster redisCluster = RedisCluster.getInstance();
Boolean flashSaleResult = redisCluster.flashSale(userId, productId);
// 2.如果秒杀抢购成功,发送下单消息到MQ
if (flashSaleResult) {
DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer();
try {
JSONObject flashSaleSuccessInform = new JSONObject();
flashSaleSuccessInform.put("userId", userId);
flashSaleSuccessInform.put("productId", productId);
Message message = new Message( "flash_sale_success_inform", null,
flashSaleSuccessInform.toJSONString().getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
System.out.println("推送秒杀抢购成功通知到MQ......");
} catch (Exception e) {
System.err.println("秒杀抢购成功通知推送到MQ失败:" + e);
// 降级处理,把消息写入本地磁盘做一个积压,搞一个后台线程不停的尝试MQ是否恢复
// 如果MQ恢复了,就可以把本地磁盘积压的消息发送出去
return "秒杀抢购成功,但是推送消息到MQ失败";
}
return "秒杀抢购成功";
} else {
return "秒杀抢购失败";
}
}
}
2.2 库存扣减
库存扣减直接通过Lua脚本完成,由Redis保证了原子性。这里要注意的是分片轮询扣减库存的策略,因为库存是分片存储在Redis节点中的,所以当某个节点扣减库存失败后,还需要轮询其它节点:
// RedisCluster.java
/**
* 基于Redis集群进行秒杀抢购
*/
public Boolean flashSale(Long userId, Long productId) {
// 随机选择一个Redis的节点
int redisNodeCount = cluster.size();
int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount);
Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex);
// 向redis节点提交一个lua脚本进行抢购
String flashSaleLuaScript = ""
+ "local productKey = 'seckill::product::" + productId + "::stock';"
+ "local salingStock = redis.call('hget', productKey, 'salingStock') + 0;"
+ "local lockedStock = redis.call('hget', productKey, 'lockedStock') + 0;"
+ "if(salingStock > 0) "
+ "then "
+ "redis.call('hset', productKey, 'salingStock', salingStock - 1);"
+ "redis.call('hset', productKey, 'lockedStock', lockedStock + 1);"
+ "return 'success';"
+ "else "
+ "return 'fail';"
+ "end;";
String flashSaleResult = null;
try {
flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript);
} catch(Exception e) {
// 这里报错,说明某台Redis宕机了,则执行分片轮询扣减库存逻辑,即从其它机器进行秒杀
try {
return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId);
} catch(Exception e1) {
// 这里报错,说明所有Redis都宕机了,则可以执行降级逻辑:将秒杀请求写入本地磁盘
return false;
}
}
// 如果秒杀抢购成功
if("success".equals(flashSaleResult)) {
// 记录抢购成功所在的分片,后续接受订单支付通知时有用
JedisManager jedisManager = JedisManager.getInstance();
Jedis jedis = jedisManager.getJedis();
jedis.set("flash_sale::stock_shard::" + userId + "::" + productId,
String.valueOf(chosenRedisNodeIndex));
return true;
}
// 如果秒杀抢购失败,则执行库存分片迁移
else {
try {
return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId);
} catch(Exception e) {
// 在这里就可以写所有的redis节点都崩溃的逻辑了
// 就可以尝试把请求给写入到本地磁盘去,让抢购状态保持在抢购中
return false;
}
}
}
/**
* 分片轮询扣减库存
*/
private Boolean tryOtherStockShard(int failedStockShard, String flashSaleLuaScript,
Long userId, Long productId) throws Exception {
String flashSaleResult = null;
Boolean flashSaleSuccess = false;
Boolean allRedisNodeCrashed = true;
for(int i = 0; i < cluster.size(); i++) {
if(i != failedStockShard) {
try {
Jedis redisNode = cluster.get(i);
flashSaleResult = (String) redisNode.eval(flashSaleLuaScript);
allRedisNodeCrashed = false;
if("success".equals(flashSaleResult)) {
JedisManager jedisManager = JedisManager.getInstance();
Jedis jedis = jedisManager.getJedis();
jedis.set("flash_sale::stock_shard::" + userId + "::" + productId,
String.valueOf(i));
flashSaleSuccess = true;
break;
}
} catch(Exception e) {
// 在尝试其他节点进行抢购的时候,其他某个节点也出现了宕机问题
}
}
}
// 如果说所有的redis节点都崩溃了
if(allRedisNodeCrashed) {
throw new Exception("所有Redis节点都崩溃了!!!");
}
return flashSaleSuccess;
}
三、下单
秒杀抢购成功后,秒杀商品抢购服务seckill-flash-sale
会将下单通知扔到MQ中。秒杀下单服务seckill-order
监听到通知后,会调用订单中心接口执行下单操作:
@Component
public class BootListener implements CommandLineRunner {
public static final Long ORDER_RATE_LIMIT = 500L;
public void run(String... strings) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-order-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("flash_sale_success_inform", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts,
ConsumeConcurrentlyContext context) {
for(MessageExt messageExt : messageExts) {
JSONObject flashSaleSuccessInform = JSONObject.parseObject(
new String(messageExt.getBody()));】
// 1.检查消息时间戳,如果超过阈值,就认为出现了消息积压
// 则执行fail-fast机制,直接推“支付失败”的消息到MQ,这样秒杀库存管理服务监听到后就会对Redis库存回滚
// 2.模拟调用订单中心提供的接口进行下单
Long userId = flashSaleSuccessInform.getLong("userId");
Long productId = flashSaleSuccessInform.getLong("productId");
// 3.下单操作限流
JedisManager jedisManager = JedisManager.getInstance();
Jedis jedis = jedisManager.getJedis();
Boolean orderResult = false;
while(!orderResult) {
Date now = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentSecond = dateFormat.format(now);
Long result = jedis.incr("flash_sale::order::rate_limit::" + currentSecond);
if(result < ORDER_RATE_LIMIT) {
System.out.println("调用调用订单中心提供的接口进行秒杀抢购的下单,用户id: "
+ userId + ", 商品id: " + productId);
// 如果说订单系统崩溃了
// 那么你的消费线程应该进入阻塞,就不要消费后面的消息了
// 阻塞个几分钟过后,再尝试调用订单系统去进行下单
orderResult = true;
} else {
// 如果当前这一秒限流了,此时休眠一秒,下一秒继续进行下单就可以了
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动......");
}
}
这里关键要注意的是,如果出现消息积压的情况,则需要执行fail-fast机制。
四、总结
本章,我对秒杀系统最核心的链路:秒杀抢购链路进行了讲解。秒杀抢购链路的核心时基于Redis完成库存的扣减,扣减成功后发送下单通知。
感谢赞赏~