实战篇
优惠券秒杀
全局唯一ID
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量限制
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足以下列特性:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
ID的组成部分:
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
工具类编写代码实现
package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;/*** @author xc* @date 2023/4/26 14:59*/
@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 左移位数,防止以后需要修改*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long end = now.toEpochSecond(ZoneOffset.UTC);long timestamp = end - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));Long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接返回return timestamp << COUNT_BITS | increment;}
}
实现优惠券秒杀下单
下单时需要判断两点:
- 秒杀是否开始或者结束,如果尚未开始或已经结束则无法下单
- 库存是否充足
流程:
根据流程实现具体业务
@Resourceprivate ISeckillVoucherService iSeckillVoucherService;@Resourceprivate IVoucherService iVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}
超卖问题
乐观锁
- 版本号法
使用CAS方法:
int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)// 更新的时候判断当前剩余库存量是否跟开始查询的时候相等.eq("stock",leftStock).update();
弊端:
成功率很低
库存改为大于0
int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)// 更新的时候判断当前剩余库存量是否大于0.gt("stock",0).update();
一人一单
在抢购前判断数据库是否存在已经的订单
// 查询秒杀优惠券,该用户是否已经抢到int count = query().eq("user_id", userId).eq("voucher_id",voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock",leftStock).update();if (!update) {return Result.fail("服务器内部错误");}
问题:
在多线程上会出现都走到代码第6行,然后再一起执行更新操作,就会出现一人多单情况
解决:
对操作进行加锁
@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}return getResult(voucherId, voucher, leftStock);}@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁 new// String类型也可能出现不同对象 new // intern()的作用是返回字符串常量池中对象的地址 new synchronized (userId.toString().intern()) { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}}return Result.ok(orderId);}
}
此时还会出现一个问题:
- 当一个线程释放锁后,但是事务还没提交,那么还是会出现一人多单的情况,所以需要对整个方法调用进行加锁
@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}Long userId = UserHolder.getUser().getId();// new synchronized (userId.toString().intern()) {return getResult(voucherId, voucher, leftStock);}}@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁// String类型也可能出现不同对象// intern()的作用是返回字符串常量池中对象的地址int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}
对于spring事务管理熟悉的话,在seckillVoucher方法中调用有事务的getResult这个方法,会出现事务失效。因为相当于this.getResult,用的不是代理类。
解决方法:
@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {// 获取当前对象的代理对象 newIVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.getResult(voucherId, voucher, leftStock);}}/*** xc* @param voucherId* @param voucher* @param leftStock* @return*/@Override@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁// String类型也可能出现不同对象// intern()的作用是返回字符串常量池中对象的地址int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}
需要导入aspectj的依赖
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
然后在主启动类上暴露代理对象
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableTransactionManagement
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}}
分布式锁
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
模拟集群效果:
怎么添加idea的Serivces
在Service中添加SpringBoot项目,通过不同的端口启动
# 在VM options中添加此段代码,以指定端口启动
-Dserver.port=8082
前端nginx通过负载均衡访问后端接口
在集群模式下:
有多个JVM实例的存在,所以又会出现超卖问题
使用分布式锁解决:
流程:
基于Redis实现分布式锁初级版本:
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
public interface Ilock {/*** 尝试获取锁* @param timeoutSec 过期时间* @return 获取锁是否成功*/boolean tryLock(long timeoutSec);void unlock();
}
简单锁实现类
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;/*** @author: xc* @date: 2023/4/27 20:46*/public class SimpleRedisLock implements ILock {/*** 锁的统一前缀*/private static final String KEY_PREFIX = "lock:";/*** 锁的名称*/private String name;// 因为不是spring管理的bean所以需要构造方法private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {// 当前线程idLong threadId = Thread.currentThread().getId();// setIfAbsent:如果不存在就设置Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId+"", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 删除keystringRedisTemplate.delete(KEY_PREFIX + name);}
}
实现初级redis分布式锁版本
Long userId = UserHolder.getUser().getId();// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的keySimpleRedisLock lock = new SimpleRedisLock("order:"+userId, stringRedisTemplate);// 获取锁boolean tryLock = lock.tryLock(LOCK_TIMEOUT);if (!tryLock) {// 获取锁失败return Result.fail("不允许重复下单");}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.getResult(voucherId, voucher, leftStock);} finally {// 拿到锁的释放锁lock.unlock();}
会出现的问题:
会释放别人的锁
解决方案:释放锁的时候先看一下是不是自己的锁
流程:
改进Redis的分布式锁:
-
在获取锁时存入线程表示(可以用UUID表示)
-
在释放锁时获取线程ID,判断是否与当前线程标示一致
-
- 如果一致则释放锁
- 如果不一致则不释放锁
修改获取锁和释放锁的逻辑
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
import java.util.UUID;/*** @author: xc* @date: 2023/4/27 20:46*/public class SimpleRedisLock implements ILock {/*** 锁的统一前缀*/private static final String KEY_PREFIX = "lock:";/*** 随机生成线程uuid的前缀*/private static final String ID_PREFIX = UUID.randomUUID() +"-";/*** 锁的名称*/private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {long threadId = Thread.currentThread().getId();// 标识位 ID_PREFIX+threadIdBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, ID_PREFIX+threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {long threadId = Thread.currentThread().getId();// 判断标识是否一致if ((ID_PREFIX+threadId).equals(stringRedisTemplate.opsForValue().get(KEY_PREFIX + name))) {stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
出现问题:
- 线程1如果判断完是自己的锁后,出现gc阻塞线程知道锁过期,此时线程2过来获取到锁执行自己的业务,然后线程1又阻塞完毕回到删除锁,就会将线程2的锁删除。然而又有线程3来过来获取锁没获取到,就会出现线程2和线程3同时执行代码。
解决办法: 保证判断锁和释放锁的原子性
使用Redis的Lua脚本:
关于redis的基本语法
执行Lua脚本
再次改进Redis的分布式锁
总结
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
基于Redis的分布式锁的优化:
基于setnx实现的分布式锁存在下面的问题:
Redisson入门
引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
配置Redisson客户端
/*** @author xc* @date 2023/4/28 9:16*/
@Configuration
public class RedissonConfig {public RedissonClient redissonClient() {// 配置Config config = new Config();config.setTransportMode(TransportMode.EPOLL);config.useSingleServer().setAddress("redis://127.0.0.1:6379");return Redisson.create(config);}}
使用Redisson的分布式锁
Redisson可重入锁的原理
流程图:
获取锁Lua脚本:
释放锁Lua脚本:
Redisson底层源码讲解(P66、P67)
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制
- 超时续约:利用watchDog,每个一段时间(releaseTime/3),重置超时时间
解决主从一致(P68)
Redis优化秒杀
改进秒杀业务,提高并发性能
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
// 引入redis@Resourceprivate StringRedisTemplate stringRedisTemplate;// 在保存秒杀优惠券的时候,也将优惠券的id和库存保存到redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
- 基于Lua脚本,判断秒杀库存、一人一单、决定用户是否抢购成功
lua脚本
---
--- Generated by Luanalysis
--- Created by xc.
--- DateTime: 2023/4/28 11:48
---
local voucherId = ARGV[1]
local userId = ARGV[2]local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherIdif(tonumber(redis.call('get',stockKey)) <= 0) thenreturn 1
endif(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
java代码
private static final DefaultRedisScript<Long> SECKILL_SCIPT;static {SECKILL_SCIPT = new DefaultRedisScript<>();ClassPathResource pathResource = new ClassPathResource("seckill.lua");SECKILL_SCIPT.setLocation(pathResource);SECKILL_SCIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {// 1.执行lua脚本Long userId = UserHolder.getUser().getId();long execute = stringRedisTemplate.execute(SECKILL_SCIPT,Collections.EMPTY_LIST,voucherId.toString(), userId.toString());// 2.判断结果是否为0if (execute != 0) {// 2.1 不为0,代表没有购买资格// 为1时库存不足,2时重复下单return Result.fail(execute == 1 ? "库存不足" : "重复下单");}// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");
// new ArrayBlockingQueue<>()// 3.返回订单idreturn Result.ok(orderId);}
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
// 阻塞队列private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);@Overridepublic Result seckillVoucher(Long voucherId) {// 1.执行lua脚本Long userId = UserHolder.getUser().getId();long execute = stringRedisTemplate.execute(SECKILL_SCIPT,Collections.EMPTY_LIST,voucherId.toString(), userId.toString());// 2.判断结果是否为0if (execute != 0) {// 2.1 不为0,代表没有购买资格// 为1时库存不足,2时重复下单return Result.fail(execute == 1 ? "库存不足" : "重复下单");}VoucherOrder voucherOrder = new VoucherOrder();// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);// 加入到阻塞队列 orderTasks.add(voucherOrder);proxy = (IVoucherOrderService) AopContext.currentProxy();// 3.返回订单idreturn Result.ok(orderId);}
- 开启线程任务,不断从阻塞队列中回去信息,实现异步下单功能
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/*** 在类初始化完后会执行该方法*/@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while (true){// 获取队列中的队列信息try {VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {// 因为是全新开启一个线程,所以需要在订单中拿到用户idLong userId = voucherOrder.getUserId();// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的keyRLock lock = redissonClient.getLock("lock:order:" + userId);boolean tryLock = lock.tryLock();if (!tryLock) {// 获取锁失败log.error("不允许重复下单");return;}try {proxy.getResult(voucherOrder);} finally {// 拿到锁的释放锁lock.unlock();}}}
总结
秒杀业务的优化思路是什么?
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题
- 数据安全问题
Redis消息队列实现异步秒杀
消息队列,字面意思就是存放消息队列。最简单的消息队列模型包括3个角色
- 消息队列:存储和管理消息,也被称为消息代理
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
基于List结构模拟消息队列
基于List的消息队列由哪些优缺点?
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
基于PubSub的消息队列由哪些优缺点?
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
基于STREAM的消息队列由哪些优缺点?
优点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
缺点:
- 有消息漏读的风险
基于Stream的消息队列-消费者组
创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建队列
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
Stream类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
实战篇
优惠券秒杀
全局唯一ID
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量限制
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足以下列特性:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
ID的组成部分:
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
工具类编写代码实现
package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;/*** @author xc* @date 2023/4/26 14:59*/
@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 左移位数,防止以后需要修改*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long end = now.toEpochSecond(ZoneOffset.UTC);long timestamp = end - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));Long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接返回return timestamp << COUNT_BITS | increment;}
}
实现优惠券秒杀下单
下单时需要判断两点:
- 秒杀是否开始或者结束,如果尚未开始或已经结束则无法下单
- 库存是否充足
流程:
根据流程实现具体业务
@Resourceprivate ISeckillVoucherService iSeckillVoucherService;@Resourceprivate IVoucherService iVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}
超卖问题
乐观锁
- 版本号法
使用CAS方法:
int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)// 更新的时候判断当前剩余库存量是否跟开始查询的时候相等.eq("stock",leftStock).update();
弊端:
成功率很低
库存改为大于0
int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)// 更新的时候判断当前剩余库存量是否大于0.gt("stock",0).update();
一人一单
在抢购前判断数据库是否存在已经的订单
// 查询秒杀优惠券,该用户是否已经抢到int count = query().eq("user_id", userId).eq("voucher_id",voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock",leftStock).update();if (!update) {return Result.fail("服务器内部错误");}
问题:
在多线程上会出现都走到代码第6行,然后再一起执行更新操作,就会出现一人多单情况
解决:
对操作进行加锁
@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}return getResult(voucherId, voucher, leftStock);}@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁 new// String类型也可能出现不同对象 new // intern()的作用是返回字符串常量池中对象的地址 new synchronized (userId.toString().intern()) { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}}return Result.ok(orderId);}
}
此时还会出现一个问题:
- 当一个线程释放锁后,但是事务还没提交,那么还是会出现一人多单的情况,所以需要对整个方法调用进行加锁
@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}Long userId = UserHolder.getUser().getId();// new synchronized (userId.toString().intern()) {return getResult(voucherId, voucher, leftStock);}}@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁// String类型也可能出现不同对象// intern()的作用是返回字符串常量池中对象的地址int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}
对于spring事务管理熟悉的话,在seckillVoucher方法中调用有事务的getResult这个方法,会出现事务失效。因为相当于this.getResult,用的不是代理类。
解决方法:
@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {// 获取当前对象的代理对象 newIVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.getResult(voucherId, voucher, leftStock);}}/*** xc* @param voucherId* @param voucher* @param leftStock* @return*/@Override@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁// String类型也可能出现不同对象// intern()的作用是返回字符串常量池中对象的地址int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}
需要导入aspectj的依赖
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
然后在主启动类上暴露代理对象
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableTransactionManagement
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}}
分布式锁
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
模拟集群效果:
怎么添加idea的Serivces
在Service中添加SpringBoot项目,通过不同的端口启动
# 在VM options中添加此段代码,以指定端口启动
-Dserver.port=8082
前端nginx通过负载均衡访问后端接口
在集群模式下:
有多个JVM实例的存在,所以又会出现超卖问题
使用分布式锁解决:
流程:
基于Redis实现分布式锁初级版本:
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
public interface Ilock {/*** 尝试获取锁* @param timeoutSec 过期时间* @return 获取锁是否成功*/boolean tryLock(long timeoutSec);void unlock();
}
简单锁实现类
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;/*** @author: xc* @date: 2023/4/27 20:46*/public class SimpleRedisLock implements ILock {/*** 锁的统一前缀*/private static final String KEY_PREFIX = "lock:";/*** 锁的名称*/private String name;// 因为不是spring管理的bean所以需要构造方法private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {// 当前线程idLong threadId = Thread.currentThread().getId();// setIfAbsent:如果不存在就设置Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId+"", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 删除keystringRedisTemplate.delete(KEY_PREFIX + name);}
}
实现初级redis分布式锁版本
Long userId = UserHolder.getUser().getId();// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的keySimpleRedisLock lock = new SimpleRedisLock("order:"+userId, stringRedisTemplate);// 获取锁boolean tryLock = lock.tryLock(LOCK_TIMEOUT);if (!tryLock) {// 获取锁失败return Result.fail("不允许重复下单");}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.getResult(voucherId, voucher, leftStock);} finally {// 拿到锁的释放锁lock.unlock();}
会出现的问题:
会释放别人的锁
解决方案:释放锁的时候先看一下是不是自己的锁
流程:
改进Redis的分布式锁:
-
在获取锁时存入线程表示(可以用UUID表示)
-
在释放锁时获取线程ID,判断是否与当前线程标示一致
-
- 如果一致则释放锁
- 如果不一致则不释放锁
修改获取锁和释放锁的逻辑
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
import java.util.UUID;/*** @author: xc* @date: 2023/4/27 20:46*/public class SimpleRedisLock implements ILock {/*** 锁的统一前缀*/private static final String KEY_PREFIX = "lock:";/*** 随机生成线程uuid的前缀*/private static final String ID_PREFIX = UUID.randomUUID() +"-";/*** 锁的名称*/private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {long threadId = Thread.currentThread().getId();// 标识位 ID_PREFIX+threadIdBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, ID_PREFIX+threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {long threadId = Thread.currentThread().getId();// 判断标识是否一致if ((ID_PREFIX+threadId).equals(stringRedisTemplate.opsForValue().get(KEY_PREFIX + name))) {stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
出现问题:
- 线程1如果判断完是自己的锁后,出现gc阻塞线程知道锁过期,此时线程2过来获取到锁执行自己的业务,然后线程1又阻塞完毕回到删除锁,就会将线程2的锁删除。然而又有线程3来过来获取锁没获取到,就会出现线程2和线程3同时执行代码。
解决办法: 保证判断锁和释放锁的原子性
使用Redis的Lua脚本:
关于redis的基本语法
执行Lua脚本
再次改进Redis的分布式锁
总结
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
基于Redis的分布式锁的优化:
基于setnx实现的分布式锁存在下面的问题:
Redisson入门
引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
配置Redisson客户端
/*** @author xc* @date 2023/4/28 9:16*/
@Configuration
public class RedissonConfig {public RedissonClient redissonClient() {// 配置Config config = new Config();config.setTransportMode(TransportMode.EPOLL);config.useSingleServer().setAddress("redis://127.0.0.1:6379");return Redisson.create(config);}}
使用Redisson的分布式锁
Redisson可重入锁的原理
流程图:
获取锁Lua脚本:
释放锁Lua脚本:
Redisson底层源码讲解(P66、P67)
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制
- 超时续约:利用watchDog,每个一段时间(releaseTime/3),重置超时时间
解决主从一致(P68)
Redis优化秒杀
改进秒杀业务,提高并发性能
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
// 引入redis@Resourceprivate StringRedisTemplate stringRedisTemplate;// 在保存秒杀优惠券的时候,也将优惠券的id和库存保存到redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
- 基于Lua脚本,判断秒杀库存、一人一单、决定用户是否抢购成功
lua脚本
---
--- Generated by Luanalysis
--- Created by xc.
--- DateTime: 2023/4/28 11:48
---
local voucherId = ARGV[1]
local userId = ARGV[2]local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherIdif(tonumber(redis.call('get',stockKey)) <= 0) thenreturn 1
endif(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
java代码
private static final DefaultRedisScript<Long> SECKILL_SCIPT;static {SECKILL_SCIPT = new DefaultRedisScript<>();ClassPathResource pathResource = new ClassPathResource("seckill.lua");SECKILL_SCIPT.setLocation(pathResource);SECKILL_SCIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {// 1.执行lua脚本Long userId = UserHolder.getUser().getId();long execute = stringRedisTemplate.execute(SECKILL_SCIPT,Collections.EMPTY_LIST,voucherId.toString(), userId.toString());// 2.判断结果是否为0if (execute != 0) {// 2.1 不为0,代表没有购买资格// 为1时库存不足,2时重复下单return Result.fail(execute == 1 ? "库存不足" : "重复下单");}// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");
// new ArrayBlockingQueue<>()// 3.返回订单idreturn Result.ok(orderId);}
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
// 阻塞队列private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);@Overridepublic Result seckillVoucher(Long voucherId) {// 1.执行lua脚本Long userId = UserHolder.getUser().getId();long execute = stringRedisTemplate.execute(SECKILL_SCIPT,Collections.EMPTY_LIST,voucherId.toString(), userId.toString());// 2.判断结果是否为0if (execute != 0) {// 2.1 不为0,代表没有购买资格// 为1时库存不足,2时重复下单return Result.fail(execute == 1 ? "库存不足" : "重复下单");}VoucherOrder voucherOrder = new VoucherOrder();// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);// 加入到阻塞队列 orderTasks.add(voucherOrder);proxy = (IVoucherOrderService) AopContext.currentProxy();// 3.返回订单idreturn Result.ok(orderId);}
- 开启线程任务,不断从阻塞队列中回去信息,实现异步下单功能
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/*** 在类初始化完后会执行该方法*/@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while (true){// 获取队列中的队列信息try {VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {// 因为是全新开启一个线程,所以需要在订单中拿到用户idLong userId = voucherOrder.getUserId();// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的keyRLock lock = redissonClient.getLock("lock:order:" + userId);boolean tryLock = lock.tryLock();if (!tryLock) {// 获取锁失败log.error("不允许重复下单");return;}try {proxy.getResult(voucherOrder);} finally {// 拿到锁的释放锁lock.unlock();}}}
总结
秒杀业务的优化思路是什么?
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题
- 数据安全问题
Redis消息队列实现异步秒杀
消息队列,字面意思就是存放消息队列。最简单的消息队列模型包括3个角色
- 消息队列:存储和管理消息,也被称为消息代理
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
基于List结构模拟消息队列
基于List的消息队列由哪些优缺点?
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
基于PubSub的消息队列由哪些优缺点?
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
基于STREAM的消息队列由哪些优缺点?
优点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
缺点:
- 有消息漏读的风险
基于Stream的消息队列-消费者组
创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建队列
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
Stream类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次