前言:随着互联网的发展,单体web应用已无法满足业务的需求,随之而来的是微服务,再加上分布式部署,带来的是各种问题,个个服务在不同的进程,当对同一资源进行修改时就会发生线程安全问题,特别是在电商活动(抢优惠券、下单等业务场景),记录一下自己探索分布式锁的过程
一、单机redis下基于jedis分布式锁
1、环境: redis-server: redis-6.0.3 、 redis-client: jedis 2.9.1、springboot :2.1.3
2、创建redis连接池
public class RedisPool {
private static JedisPool pool;//jedis连接池
private static Jedis jedis;
private static int maxTotal = 20;//最大连接数
private static int maxIdle = 10;//最大空闲连接数
private static int minIdle = 5;//最小空闲连接数
private static boolean testOnBorrow = true;//在取连接时测试连接的可用性
private static boolean testOnReturn = false;//再还连接时不测试连接的可用性
static {
initPool();//初始化连接池
}
public static Jedis getJedis(){
return pool.getResource();
}
public static void close(Jedis jedis){
jedis.close();
}
private static void initPool(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxTotal);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setTestOnBorrow(testOnBorrow);
config.setTestOnReturn(testOnReturn);
config.setBlockWhenExhausted(true);
pool = new JedisPool(config, "192.168.106.120", 6379, 5000, "redis");
}
}
3、创建RedisClient,实现分布式锁工具类
public class RedisClient {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
public boolean tryGetLock( String lockKey, String requestId, int expireTime) {
Jedis jedis = RedisPool.getJedis();
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
RedisPool.close(jedis);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
public boolean releaseLock( String lockKey, String requestId) {
Jedis jedis = RedisPool.getJedis();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
RedisPool.close(jedis);
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
public boolean setKey( String key, String value) {
Jedis jedis = RedisPool.getJedis();
String result = jedis.set(key.getBytes(), value.getBytes());
RedisPool.close(jedis);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
public String getKey( String key) {
Jedis jedis = RedisPool.getJedis();
String result = jedis.get(key);
RedisPool.close(jedis);
return result;
}
}
4、装配RedisClient到spring ioc容器
@Configuration
public class RedisClientConf {
@Bean
public RedisClient redisClient() {
return new RedisClient();
}
}
5 、编写service,用CountDownLatch结合ExecutorService模拟并发测试分布式锁
@Service
@SuppressWarnings("all")
public class RedisService {
// 商品锁 key 值
private String lockKey = "computer_key";
private Logger logger = LoggerFactory.getLogger(RedisService.class);
// 线程池
ExecutorService executorService = new ThreadPoolExecutor(4, 4, 1L,
TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>());
@Autowired
private RedisClient redisClient;
public List<String> testRedisLock() {
CountDownLatch countDownLatch = new CountDownLatch(200);
// 抢到商品的用户
List<String> shopUser = new ArrayList<>();
// 模拟用户数据
List<String> userArray = new ArrayList<>();
for (int i = 0; i < 200; i++) {
userArray.add(UUID.randomUUID().toString());
}
// 模拟抢单
userArray.stream().parallel().forEach(userId -> {
executorService.execute(() -> {
String user = takeOrder(userId);
if (!StringUtils.isEmpty(user)) {
shopUser.add(user);
}
countDownLatch.countDown();
});
});
// executorService.shutdown(); shutdown后,后续再调用会触发决绝策略
System.out.println("wait start");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("wait end ! 成功抢单用户数量:" + shopUser.size());
return shopUser;
}
private String takeOrder(String b) {
while (true) {
if (redisClient.tryGetLock(lockKey, b, 500)) { // 设置锁的过期时间,避免宕机或者其他情况,导致死锁
// cumputer_stock 为redis中提前设置好的库存
String stockStr = redisClient.getKey("cumputer_stock");
int stock = Integer.parseInt(stockStr);
logger.info("用户:{} 获取锁", b);
try {
if (stock <= 0) {// 检查库存
logger.info("已售罄");
break;
}
try {
// 模拟业务操作
Thread.sleep(new Random().nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}
// 扣减库存
redisClient.setKey("cumputer_stock", (stock - 1) + "");
logger.info("用户:{},抢单成功, 剩余库存: {}", b, stock -1);
} finally {
// 释放锁, 必须放在finally,确保锁能释放
if (b.equals(redisClient.getKey(lockKey))) { // 避免误删,导致锁失效
boolean flag = redisClient.releaseLock(lockKey, b.toString());
logger.info("用户:{},释放锁: {}", b, flag);
}
}
return b;
}
}
return null;
}
}
6、测试分布式锁效果:初始化cumputer_stock 库存为100,启动多个应用访问接口,在查看结果
测试结果:
8080端口下:成功抢单用户数量:41
8081端口下:成功抢单用户数量:30
8082端口下:成功抢单用户数量:29
结论:经过多次并发测试,不会导致超卖现象,单机下redis分布式锁已成功完成(这里还有一个问题,就是程序无法判断业务代码的执行时间,超时时间设置多少都不合适,解决方案是开一个子线程来为延长redis key 的超时时间,以后成熟的框架,后面会见到Redission,来实现分布式锁会很简单)
二、 单机redis下基于Spring Data Redis分布式锁
1、环境: redis-server: redis-6.0.3 ,pom文件引入spring-boot-starter-data-redis
2、模拟并发测试代码,测试结果一样,只不过是更简单,只需要在配置文件中配置redis的信息即可
@Service
@SuppressWarnings("all")
public class SpringDataRedisLock {
沈略部分代码,和上面service代码一样,只不过是把操作redis的工具类换成了RtringRedisTemplate
private String takeOrder(String b) {
while (true) {
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, b, 500, TimeUnit.MICROSECONDS);
if (lockFlag) { // 设置锁的过期时间,避免宕机或者其他情况,导致死锁
// cumputer_stock 为redis中提前设置好的库存
String stockStr = stringRedisTemplate.opsForValue().get("cumputer_stock");
int stock = Integer.parseInt(stockStr);
logger.info("用户:{} 获取锁", b);
try {
if (stock <= 0) {// 检查库存
logger.info("已售罄");
break;
}
try {
// 模拟业务操作
Thread.sleep(new Random().nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}
// 扣减库存
stringRedisTemplate.opsForValue().set("cumputer_stock", (stock - 1) + "");
logger.info("用户:{},抢单成功, 剩余库存: {}", b, stock -1);
} finally {
// 释放锁, 必须放在finally,确保锁能释放
if (b.equals(stringRedisTemplate.opsForValue().get(lockKey))) { // 避免误删,导致锁失效
boolean flag = stringRedisTemplate.delete(lockKey);
logger.info("用户:{},释放锁: {}", b, flag);
}
}
return b;
}
}
return null;
}
}
三、初探Redission分布式锁框架
1、pom引入相关依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.4</version> </dependency>
2、配置RedissionClient
@Configuration
public class RedissionClientConf {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.setTransportMode(TransportMode.NIO);
config.useSingleServer().setAddress("redis://192.168.106.120:6379").setPassword("****");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
3、RedisssionClient 锁的使用
@Service
@SuppressWarnings("all")
public class RedissionService {
// 商品锁 key 值
private String lockKey = "computer_key";
private Logger logger = LoggerFactory.getLogger(RedissionService.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
public String takeOrder() {
String userID = UUID.randomUUID().toString();
RLock rLock = redissonClient.getLock(lockKey);
while (true) {
rLock.lock();
// cumputer_stock 为redis中提前设置好的库存
String stockStr = stringRedisTemplate.opsForValue().get("cumputer_stock");
int stock = Integer.parseInt(stockStr);
logger.info("用户:{} 获取锁", userID);
try {
if (stock <= 0) {// 检查库存
logger.info("已售罄");
break;
}
try {
// 模拟业务操作
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
// 扣减库存
stringRedisTemplate.opsForValue().set("cumputer_stock", (stock - 1) + "");
logger.info("用户:{},抢单成功, 剩余库存: {}", userID, stock - 1);
} finally {
// 释放锁, 必须放在finally,确保锁能释放
rLock.unlock();
}
return "用户:" + userID + ",抢单成功";
}
return "已售罄";
}
}
4、编写Controller,用jemeter对接口进行压测,设置500并发,redis中配置100库存,测试结果如下,发现也不会出现超卖现象,陈序运行稳定。
至此对redis单机环境下分布式锁算是入门了,欢迎指正。
文章详情请查看(redis集群环境下分布式锁解决方案): http://www.xiaoyuge.com.cn/#/article/detail?articleId=64