最近看了一下分布式锁的知识,分布式锁在实际中用的还是比较多的,因为在高并发的情况下,不适用分布式锁的话会导致数据肯定是有问题的,例如电商平台的秒杀商品库存的问题。
选用redis的原因就是:
1. Redis有很高的性能。
2. Redis本身就是单线程的所以不存在并发的问题。
3. 以及Redis命令对此支持较好,实现起来比较方便。
话不多说,直接看代码,关于一些细节问题和解释我都已经在代码注释中标出来。
在pom.xml文件中添加redis官方推荐的redis连接客户端jedis
pom.xml
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> </dependency>
Reids配置实体类
package com.lds.springbootdemo.redis; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * @program: core * @description: redis连接配置实体 * @author: lidongsheng * @createData: 2020-03-13 11:46 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 11:46 * @updateContent: redis连接配置实体 * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ @Component @ConfigurationProperties(prefix = "spring.redis") public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.timeout}") private int timeout; //最大连接数 @Value("${spring.redis.lettuce.pool.max-active}") private int poolMaxActive; //最大等待连接数 @Value("${spring.redis.lettuce.pool.max-idle}") private int poolMaxIdle; //最大连接等待时间 ms @Value("${spring.redis.lettuce.pool.max-wait}") private int poolMaxWait; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getTimeout() { return timeout; } public void setTimeout(int timeout) { this.timeout = timeout; } public int getPoolMaxActive() { return poolMaxActive; } public void setPoolMaxActive(int poolMaxActive) { this.poolMaxActive = poolMaxActive; } public int getPoolMaxIdle() { return poolMaxIdle; } public void setPoolMaxIdle(int poolMaxIdle) { this.poolMaxIdle = poolMaxIdle; } public int getPoolMaxWait() { return poolMaxWait; } public void setPoolMaxWait(int poolMaxWait) { this.poolMaxWait = poolMaxWait; } }
添加Jedis的连接池工厂类
package com.lds.springbootdemo.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * @program: core * @description: Jedis连接池 * @author: lidongsheng * @createData: 2020-03-13 12:03 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 12:03 * @updateContent: Jedis连接池 * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ @Component public class RedisPoolFactory { @Autowired private RedisConfig redisConfig; @Bean public JedisPool jedisPoolFactory() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle()); poolConfig.setMaxTotal(redisConfig.getPoolMaxActive()); poolConfig.setTestOnBorrow(true); poolConfig.setMaxWaitMillis(redisConfig.getPoolMaxWait()); JedisPool jp = new JedisPool(poolConfig, redisConfig.getHost(), redisConfig.getPort(), redisConfig.getTimeout(), redisConfig.getPassword(), 0); return jp; } }
为了方便redis中进行数据的管理,所以加的一个key的统一前缀类,一个模块下的可以为统一的前缀。此项也可不要
package com.lds.springbootdemo.redis; /** * @program: core * @description: key前缀类(为了方便redis中进行数据的管理,所以加的一个前缀类,一个模块下的可以为统一的前缀) * @author: lidongsheng * @createData: 2020-03-13 15:39 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 15:39 * @updateContent: key前缀类 * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ public interface KeyPrefix { Long expireMillisecond(); String getPrefix(); }
package com.lds.springbootdemo.redis.keyPrefix; import com.lds.springbootdemo.redis.KeyPrefix; /** * @program: core * @description: 抽象key前缀类,理论所有都需继承 * @author: lidongsheng * @createData: 2020-03-13 15:40 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 15:40 * @updateContent: * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ public abstract class BasePrefix implements KeyPrefix { private Long expireMillisecond; private String prefix; public BasePrefix(Long expireMillisecond, String prefix) { this.expireMillisecond = expireMillisecond; this.prefix = prefix; } public BasePrefix(String prefix) { this(0L, prefix); } @Override public Long expireMillisecond() { return expireMillisecond; } @Override public String getPrefix() { String className = getClass().getSimpleName(); return className + ":" + prefix; } }
例如:在商品售卖的一系列redis操作,我们可以创建一个商品售卖类的一致性key前缀类
package com.lds.springbootdemo.redis.keyPrefix; /** * @program: core * @description: 商品售卖类的一致性key前缀 * @author: lidongsheng * @createData: 2020-03-13 15:46 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 15:46 * @updateContent: redis * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ public class GoodsoldPrefix extends BasePrefix { public GoodsoldPrefix(Long expireMillisecond, String prefix) { super(expireMillisecond, prefix); } public GoodsoldPrefix(String prefix) { super(prefix); } }
接下里就是关键的去实现分布式锁了。关于一切点我已经在注释中说明。
package com.lds.springbootdemo.redis; import com.alibaba.druid.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * @program: core * @description: * @author: lidongsheng * @createData: 2020-03-13 12:53 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 12:53 * @updateContent: * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ @Component public class JedisUtil { private static JedisPool jedisPool; private static Logger logger = LoggerFactory.getLogger(JedisUtil.class); @Autowired public void setJedisPool(JedisPool jedisPool) { JedisUtil.jedisPool = jedisPool; } /** * 获得锁 * * @param keyPrefix 为了方便redis中进行数据的管理,所以加的一个前缀类(一个模块下的可以为统一的前缀,方便管理,此项也可去除) * @param key 所有客户端的key都是一致的 * @param value 锁的过期时间戳(采用过期时间戳防止某一个节点一直阻塞无法释放锁,其他节点一直无法获得锁) * @param lockWaitTimeOut 获取锁的等待超时时间 毫秒(防止大量请求一直去不断获得锁占用资源) * @return */ public static boolean lock(KeyPrefix keyPrefix, String key, String value, Long lockWaitTimeOut) { Jedis jedis = null; try { jedis = jedisPool.getResource(); //所有key都是一样的 String realKey = keyPrefix.getPrefix() + key; //获得锁的超时时间戳 Long deadTime = System.currentTimeMillis() + lockWaitTimeOut; //for循环就是为了在获得锁失败的情况下,在lockWaitTimeOut内不断重新去尝试获得锁 for (; ; ) { //setnx是当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。 //这时候我们可以根据setNx的返回值来判断是否获得锁成功,返回1表示没有人占有锁,返回0则表示现在别的正在拿着锁 if (jedis.setnx(realKey, value) == 1) { return true; } String currentValue = jedis.get(realKey); //判断当前其他客户端拥有的锁是否过期 if (!StringUtils.isEmpty(currentValue) && Long.valueOf(currentValue) < System.currentTimeMillis()) { //其他客户端拥有的锁如果过期了就进入方法内 //getSet方法为设置新的value并返回旧的value //所以此时这个oldValue就为之前过期锁的value String oldValue = jedis.getSet(realKey, value); //如果oldValue==currentValue,就成功获得锁 //为什么加了一个oldValue==currentValue,就是防止在高并发的情况下多个客户端同时获得锁 //因为在高并发情况下,不进行value的验证就会很可能造成多个客户端同时执行到这里都拿到了锁。 //因此必须对value的前后一致性判断。因为只有一个客户端getSet操作之后返回的value肯定是和currentValue相等的 if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) { return true; } } lockWaitTimeOut = deadTime - System.currentTimeMillis(); //在lockWaitTimeOut时间内仍没有成功获得锁就返回false,在高并发情况下不能让一直占资源。 if (lockWaitTimeOut <= 0L) { return false; } } } catch (Exception ex) { logger.error(ex.getMessage()); return false; } finally { returnResource(jedis); } } /** * 释放锁 * * @param keyPrefix * @param key * @param value */ public static void unLock(KeyPrefix keyPrefix, String key, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String realKey = keyPrefix.getPrefix() + key; String currentValue = jedis.get(realKey); /* 比较释放锁的value和redis中的Value是否相等,防止释放掉其他客户端的锁 * 比如A客户端拿到了锁,A的执行时间比较长,超过了锁的过期时间,这时候B或者C就可以拿到锁了 * B拿到锁之后执行B的业务逻辑,在B的业务逻辑执行过程中,A这时候不再阻塞,执行完要释放锁了,如果要是不比较value,直接释放锁的话,就会释放掉B的锁,所以这是肯定不正确的。 * 所以必须要比较value */ if (!StringUtils.isEmpty(currentValue) && value.equals(currentValue)) { jedis.del(realKey); } } catch (Exception ex) { } finally { returnResource(jedis); } } /** * 将连接池的资源返回给连接池 * * @param jedis */ public static void returnResource(Jedis jedis) { if (jedis != null) { jedis.close(); } } /** * 添加键值对 * * @param keyPrefix * @param key * @param value */ public static void set(KeyPrefix keyPrefix, String key, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String realKey = keyPrefix.getPrefix() + key; jedis.set(realKey, value); } catch (Exception ex) { logger.error(ex.getMessage()); } finally { returnResource(jedis); } } /** * 得到键值对 * * @param keyPrefix * @param key * @return */ public static String get(KeyPrefix keyPrefix, String key) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String realKey = keyPrefix.getPrefix() + key; return jedis.get(realKey); } catch (Exception ex) { return null; } finally { returnResource(jedis); } } /** * 将给定的key的value减1操作 * * @param keyPrefix * @param key */ public static void decr(KeyPrefix keyPrefix, String key) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String realKey = keyPrefix.getPrefix() + key; jedis.decr(realKey); } catch (Exception ex) { } finally { returnResource(jedis); } } }
我自己使用测试工具测试了一下,就拿秒杀商品减库存的例子来说。
测试例子Controller:
package com.lds.springbootdemo.controller.test; import com.lds.springbootdemo.domain.vo.ResponseBO; import com.lds.springbootdemo.redis.JedisUtil; import com.lds.springbootdemo.redis.keyPrefix.GoodsoldPrefix; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.models.auth.In; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @program: core * @description: 商品售卖controller * @author: lidongsheng * @createData: 2020-03-13 15:51 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 15:51 * @updateContent: 商品售卖controller * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ @RestController @RequestMapping("/goodsoldController") @Api(tags = "高并发商品售卖测试接口") public class GoodsoldController { private Logger logger = LoggerFactory.getLogger(this.getClass()); //锁过期时长(防止死锁的情况,一般来说锁的过期时长应该稍微大于具体业务处理逻辑的那段代码的执行时长) private static Long LOCK_EXPIRE_TIME = 300L; private static Long stock = 1000L; //获取锁等待超时时间(一般来说要小于锁的过期时长,原因在后面) private static Long lockWaitTimeOut = 200L; @ApiOperation(value = "售卖减库存") @GetMapping(value = "/sell") public ResponseBO sell() { try { String stockTemp = JedisUtil.get(new GoodsoldPrefix("sell"), "stock"); //先判断库存,第一次过滤 if (stockTemp == null || Integer.parseInt(stockTemp) <= 0) { return ResponseBO.responseFail("被抢购光了!"); } //锁得value =当前系统时间毫秒数+锁得过期时间毫秒数 Long time = System.currentTimeMillis() + LOCK_EXPIRE_TIME; //拿锁操作 if (!JedisUtil.lock(new GoodsoldPrefix("sell"), "lock", String.valueOf(time), lockWaitTimeOut)) { return ResponseBO.responseFail("抢购的人太多了,请稍后重试"); } stockTemp = JedisUtil.get(new GoodsoldPrefix("sell"), "stock"); //再判断库存,因为有可能第一次判断库存时没有加锁,在并发条件下读的很有可能为脏数据 if (stockTemp == null || Integer.parseInt(stockTemp) <= 0) { return ResponseBO.responseFail("被抢购光了!"); } //减库存 JedisUtil.decr(new GoodsoldPrefix("sell"), "stock"); //记录减库存操作次数 JedisUtil.incr(new GoodsoldPrefix("sell"), "test"); JedisUtil.unLock(new GoodsoldPrefix("sell"), "lock", String.valueOf(time)); return ResponseBO.Builder.init().setReasonMessage("抢购成功").build(); } catch (Exception ex) { logger.error(ex.getMessage()); return ResponseBO.responseFail("当前接口异常:" + ex.getMessage()); } } @ApiOperation(value = "售卖加库存") @GetMapping(value = "/addStock") public ResponseBO addStock(Integer value) { try { String oldValue = JedisUtil.get(new GoodsoldPrefix("sell"), "stock"); Integer newValue = oldValue == null ? value : Integer.valueOf(oldValue) + value; JedisUtil.set(new GoodsoldPrefix("sell"), "stock", newValue.toString()); return ResponseBO.Builder.init().setReasonMessage("加库存成功").build(); } catch (Exception ex) { return ResponseBO.responseFail("加库存异常:" + ex.getMessage()); } } }
redis为了方便操作。windows可以使用redisDesktopManager客户端。本站下载地址: redisDesktopManager点击下载。
测试工具用的是Apache的Jmeter,感觉还是挺好用的。本站下载地址: Jmeter点击下载。
10000的并发量,设置为100的库存。
我设置模拟10000的用户同时进行秒杀商品减库存。
结果可以看到,最终成功减库存的次数为100次,redis中的库存数量也为0了。在高并发的情况下没有出现多减库存的情况。
但是经过多次的测试,最后发现库存在大多数情况下还是有问题的,有时候会出现负数的情况,也就是超卖了,总是会超卖几个。
说明获得锁和释放锁的逻辑还是存在一定问题的。
下面我们来具体分析一下获得锁和释放锁的逻辑存在哪些问题。
package com.lds.springbootdemo.redis; import com.alibaba.druid.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * @program: core * @description: * @author: lidongsheng * @createData: 2020-03-13 12:53 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 12:53 * @updateContent: * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ @Component public class JedisUtil { private static JedisPool jedisPool; private static Logger logger = LoggerFactory.getLogger(JedisUtil.class); @Autowired public void setJedisPool(JedisPool jedisPool) { JedisUtil.jedisPool = jedisPool; } /** * 获得锁 * * @param keyPrefix 为了方便redis中进行数据的管理,所以加的一个前缀类(一个模块下的可以为统一的前缀,方便管理,此项也可去除) * @param key 所有客户端的key都是一致的 * @param value 锁的过期时间戳(采用过期时间戳防止某一个节点一直阻塞无法释放锁,其他节点一直无法获得锁) * @param lockWaitTimeOut 获取锁的等待超时时间 毫秒(防止大量请求一直去不断获得锁占用资源) * @return */ public static boolean lock(KeyPrefix keyPrefix, String key, String value, Long lockWaitTimeOut) { Jedis jedis = null; try { jedis = jedisPool.getResource(); //所有key都是一样的 String realKey = keyPrefix.getPrefix() + key; //获得锁的超时时间戳 Long deadTime = System.currentTimeMillis() + lockWaitTimeOut; //for循环就是为了在获得锁失败的情况下,在lockWaitTimeOut内不断重新去尝试获得锁 for (; ; ) { //setnx是当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。 //这时候我们可以根据setNx的返回值来判断是否获得锁成功,返回1表示没有人占有锁,返回0则表示现在别的正在拿着锁 if (jedis.setnx(realKey, value) == 1) { return true; } String currentValue = jedis.get(realKey); /* 在这里会有一个并发的问题,就是当C拿到这个锁,但是过期的时候,在并发的情况下,有很多客户端都在循环等待判断拿锁,假设A和B同时进行了这个判断语句 * 在这里注意一下:A和B是分布式的,不是在一个Jvm中。currentValue假设这时为 CT */ if (!StringUtils.isEmpty(currentValue) && Long.valueOf(currentValue) < System.currentTimeMillis()) { /* 由于C的锁已经过期,A和B同时进行了判断语句已经执行到这里, * 是不是同时执行getSet是无关的。因为redis单线程的,所以不会有并发的问题,AgetSet和BgetSet在redis中一定会有个先后顺序, * 所以最终的是否拿到锁结果和value值有关, * 关于value在高并发的情况下,确实是会出现相等的,因为value=当前系统时间毫秒数+锁得过期时间毫秒数 * 在高并发的情况下,确实是有可能,A、B、C客户端的当时的当前系统时间毫秒数是相同的,所以这就导致了value有可能是相同的,有下列几种情况: * 1:A和B的value都是相同的 * 2:A和B的value不相同 * 3:A、B、C的value全部相同的 * * * 第1种情况分析: * 结果: * 正常运行,A或者B只有一个拿到了锁,并且拿到锁的能正常释放锁。 * 原因: * 因为当A和B的value相等时(AT=BT),这时候进行A和B进行getSet, * 假设A的getSet操作在redis中先行执行了,现在value为AT,返回的oldValue为C过期的那个value:CT, * B的getSet操作在A之后执行了,这时候B就把AT覆盖成了BT,返回的oldValue为AT, * A的oldValue(CT) = currentValue(CT)。A就会拿到锁,也能正常的释放锁(因为即使B把A的value覆盖了,但是BT=AT) * B的oldValue(AT) != currentValue(CT)。B没有拿到所,结束。 * * 第2种情况分析: * 结果: * A或者B拿到了锁,但是拿到锁的(A或B)无法释放锁。 * 原因: * 假设A的getSet操作在redis中先行执行了,现在value为AT,返回的oldValue为C过期的那个value:CT, * B的getSet操作在A之后执行了,这时候B就把AT覆盖成了BT,返回的oldValue为AT, * 所以这时候A的 oldValue(CT) = currentValue(CT)。A就会拿到锁 * B的 oldValue(AT) != currentValue(CT),B没有拿到锁,。但是很不幸,B在这个判断之前这家伙这时候已经把value(AT)更新为了BT。 * 所以最终就会导致一个情况:A拿到锁执行完业务之后,释放锁的时候发现value已经不是AT了,被改成BT了,所以就会导致无法释放锁,直观的就是我们在redis中看到会有锁的键值对没有删除。 * * 第3种情况分析: * 结果: * A和B都拿到了锁,但是并且释放锁的时候还会释放掉不是自己的锁 * 原因: * 由于A、B、C的value在高并发情况下,可能会同一时间进入到lock方法来获得锁,传进来的value是相同,然后肯定是有某个客户端会获得锁, * 而其他客户端会在lockWaitTimeOut时间内,继续尝试获得锁,假设C获得了锁,A和B在lockWaitTimeOut时间内会不断重试获得锁, * 当C阻塞时间过长,C的锁过期之后,这时候A和B还在尝试获得锁的lockWaitTimeOut时间内,刚好都执行完搜是否过期的判断,进入到getSet * 这时就会有一个问题,当A和B都执行完getSet时, * currentValue = CT * 假设A的getSet操作在redis中先行执行了,现在value为AT,返回的oldValue为C过期的那个value:CT, * B的getSet操作在A之后执行了,这时候B就把AT覆盖成了BT,返回的oldValue为AT, * 由于AT=BT=CT,所以这时候无论时A还是B的 oldValue==currentValue。 * 所以A和B最终都会拿到锁,并且在释放锁的时候,假设A先执行完,B这时候还在执行业务,A释放锁的时候,由于AT=BT,会把B的锁也给释放 * 这种情况是很严重的,在秒杀中会导致超卖的情况,在多次测试的下,库存多数情况最终会是负数。 */ String oldValue = jedis.getSet(realKey, value); if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) { return true; } } lockWaitTimeOut = deadTime - System.currentTimeMillis(); //在lockWaitTimeOut时间内仍没有成功获得锁就返回false,在高并发情况下不能让一直占资源。 if (lockWaitTimeOut <= 0L) { return false; } } } catch (Exception ex) { logger.error(ex.getMessage()); return false; } finally { returnResource(jedis); } } /** * 释放锁 * * @param keyPrefix * @param key * @param value */ public static void unLock(KeyPrefix keyPrefix, String key, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String realKey = keyPrefix.getPrefix() + key; String currentValue = jedis.get(realKey); /* 比较释放锁的value和redis中的Value是否相等,防止释放掉其他客户端的锁 * 比如A客户端拿到了锁,A的执行时间比较长,超过了锁的过期时间,这时候B或者C就可以拿到锁了 * B拿到锁之后执行B的业务逻辑,在B的业务逻辑执行过程中,A这时候不再阻塞,执行完要释放锁了,如果要是不比较value,直接释放锁的话,就会释放掉B的锁,所以这是肯定不正确的。 * 所以必须要比较value */ if (!StringUtils.isEmpty(currentValue) && value.equals(currentValue)) { jedis.del(realKey); } } catch (Exception ex) { } finally { returnResource(jedis); } } /** * 将连接池的资源返回给连接池 * * @param jedis */ public static void returnResource(Jedis jedis) { if (jedis != null) { jedis.close(); } } }
那我们怎么来避免这种的超卖情况呢?
其实这个超卖的情况问题就是出现在这个过期时间我们不应该由我们的逻辑来处理,我们应该交给redis,在redis中可以设置某个key的过期时间,所以我们应该把setnx和设置过期时间(setex)放在一个操作命令中,保证原子性的执行setnx和setex。锁的value的话必须要保证是唯一的。因为释放锁的时候依旧要判断value,避免释放掉其他客户端拥有的锁。
所以这时候就可以用到lua脚本来执行setnx和setex来保证两个命令为原子性操作。
下面就是对JedisUtil类的改造:
这个才是正确的!!
package com.lds.springbootdemo.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * @program: core * @description: * @author: lidongsheng * @createData: 2020-03-13 12:53 * @updateAuthor: lidongsheng * @updateData: 2020-03-13 12:53 * @updateContent: * @Version: 1.0.0 * @email: lidongshenglife@163.com * @blog: www.b0c0.com * ************************************************ * Copyright@李东升2020.Allrightsreserved * ************************************************ */ @Component public class NewJedisUtil { private static JedisPool jedisPool; private static Logger logger = LoggerFactory.getLogger(NewJedisUtil.class); //setNx和setEx 加锁原子性操作lua脚本 private static final String SET_NX_EX_LOCK = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then return redis.call('pexpire',KEYS[1],tonumber(ARGV[2])) else return 0 end"; //删除锁lua脚本 private static final String DEL_LOCK = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; @Autowired public void setJedisPool(JedisPool jedisPool) { NewJedisUtil.jedisPool = jedisPool; } /** * 获得锁 * * @param keyPrefix 为了方便redis中进行数据的管理,所以加的一个前缀类(一个模块下的可以为统一的前缀,方便管理,此项也可去除) * @param key 所有客户端的key都是一致的 * @param value 锁的value,必须保证其唯一性 * @param lockWaitTimeOut 获取锁的等待超时时间 毫秒(防止大量请求一直去不断获得锁占用资源) * @return */ public static boolean lock(KeyPrefix keyPrefix, String key, String value, Long lockWaitTimeOut) { Jedis jedis = null; try { jedis = jedisPool.getResource(); //所有key都是一样的 String realKey = keyPrefix.getPrefix() + key; //获得锁的超时时间戳 Long deadTime = System.currentTimeMillis() + lockWaitTimeOut; List<String> arvgs = new ArrayList<>(2); arvgs.add(value); arvgs.add(keyPrefix.expireMillisecond().toString()); //jedis.eval返回的数字类型的为Long,转换成Integer的话会报错 Long result = 0L; //for循环就是为了在获得锁失败的情况下,在lockWaitTimeOut内不断重新去尝试获得锁 for (; ; ) { result = (Long) jedis.eval(SET_NX_EX_LOCK, Collections.singletonList(realKey), arvgs); if (1 == result) { return true; } lockWaitTimeOut = deadTime - System.currentTimeMillis(); //在lockWaitTimeOut时间内仍没有成功获得锁就返回false,在高并发情况下不能让一直占资源。 if (lockWaitTimeOut <= 0L) { return false; } } } catch ( Exception ex) { logger.error(ex.getMessage()); return false; } finally { returnResource(jedis); } } /** * 释放锁 * * @param keyPrefix * @param key * @param value */ public static boolean unLock(KeyPrefix keyPrefix, String key, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String realKey = keyPrefix.getPrefix() + key; Long result = (Long) jedis.eval(DEL_LOCK, Collections.singletonList(realKey), Collections.singletonList(value)); if (1 == result) { return true; } } catch (Exception ex) { logger.error(ex.getMessage()); return false; } finally { returnResource(jedis); } return false; } /** * 将连接池的资源返回给连接池 * * @param jedis */ public static void returnResource(Jedis jedis) { if (jedis != null) { jedis.close(); } } ...省略部分代码 }
测试方法:
@ApiOperation(value = "售卖减库存") @GetMapping(value = "/newSell") public ResponseBO newSell() { try { String stockTemp = NewJedisUtil.get(new GoodsoldPrefix("sell"), "stock"); //先判断库存,第一次过滤 if (stockTemp == null || Integer.parseInt(stockTemp) <= 0) { return ResponseBO.responseFail("被抢购光了!"); } String value = UUID.randomUUID().toString(); //拿锁操作 if (!NewJedisUtil.lock(new GoodsoldPrefix(LOCK_EXPIRE_TIME, "sell"), "lock", value, lockWaitTimeOut)) { return ResponseBO.responseFail("抢购的人太多了,请稍后重试"); } stockTemp = NewJedisUtil.get(new GoodsoldPrefix("sell"), "stock"); //再判断库存,因为有可能第一次判断库存时没有加锁,在并发条件下读的很有可能为脏数据 if (stockTemp == null || Integer.parseInt(stockTemp) <= 0) { return ResponseBO.responseFail("被抢购光了!"); } //减库存 NewJedisUtil.decr(new GoodsoldPrefix("sell"), "stock"); //记录减库存操作次数 NewJedisUtil.incr(new GoodsoldPrefix("sell"), "test"); NewJedisUtil.unLock(new GoodsoldPrefix("sell"), "lock", value); return ResponseBO.Builder.init().setReasonMessage("抢购成功").build(); } catch (Exception ex) { logger.error(ex.getMessage()); return ResponseBO.responseFail("当前接口异常:" + ex.getMessage()); } }
然后进行了多次测试,就不会再出现多减库存的情况了,而且锁的键也正常删除了,不会出现锁最后没有删除的情况。
2 条评论 “Redis分布式锁实现以及错误案例分析”
I get very useful information on your page, I feel lucky Xena Austen Rodrique
Nice response in return of this question with solid arguments and describing everything about that. Shawn Hanan Corabelle Vanya Gannon Legge