Redis分布式锁
1. 分布式锁设计要求
- 独占性:任何时刻只能有且仅有一个线程持有
- 高可用:若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
- 防死锁:杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案
- 不乱抢:防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放
- 重入性: 同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。
2. 编写分布式锁
2.1 使用Lua
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果值。程序继承Lock接口,使得使用更加规范。
java
public class RedisDistributedLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName; //KEYS[1]
private String uuidValue; //ARGV[1]
private long expireTime; //ARGV[2]
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuidValue) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue = uuidValue + ":" + Thread.currentThread().getId();
this.expireTime = 30L;
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
tryLock(-1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
// 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1L) {
this.expireTime = unit.toSeconds(time);
}
String script =
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
System.out.println("script: " + script);
System.out.println("lockName: " + lockName);
System.out.println("uuidValue: " + uuidValue);
System.out.println("expireTime: " + expireTime);
while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {
TimeUnit.MILLISECONDS.sleep(50);
}
this.renewExpire();
return true;
}
// 干活的,实现解锁功能
@Override
public void unlock() {
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
" return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
" return redis.call('del',KEYS[1]) " +
"else " +
" return 0 " +
"end";
// nil = false 1 = true 0 = false
System.out.println("lockName: " + lockName);
System.out.println("uuidValue: " + uuidValue);
System.out.println("expireTime: " + expireTime);
Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if (flag == null) {
throw new RuntimeException("This lock doesn't EXIST");
}
}
private void renewExpire() {
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
// 定时任务续期
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {
renewExpire();
}
}
}, (this.expireTime * 1000) / 3);
}
//===下面的redis分布式锁暂时用不到=======================================
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}
2. Redlock算法设计理念
Redis集群是AP。Redis提供了Redlock算法,用来实现基于多个实例的分布式锁。锁变量由多个实例维护,即使有实例发生了故障,锁变量仍然是存在的,客户端还是可以完成锁操作。
3. 基本使用Redisson框架
Redisson是java的redis客户端之一,提供了一些api方便操作redis。Redisson提供了Redlock算法的落地
3.1 使用单机
xml
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
3.2 配置RedisConfig
java
//单Redis节点模式
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.175:6379").setDatabase(0).setPassword("111111");
return (Redisson) Redisson.create(config);
}
3.3 使用Redisson
java
@Service
public class InventoryService2 {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
@Autowired
private Redisson redisson;
public String saleByRedisson() {
String retMessage = "";
String key = "zzyyRedisLock";
RLock redissonLock = redisson.getLock(key);
redissonLock.lock();
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
// 防止多线程并发,被其他线程解锁
if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) {
redissonLock.unlock();
}
}
return retMessage + "\t" + "服务端口号:" + port;
}
}
4. 使用分布式锁
这个锁的算法实现了多redis实例的情况,相对于单redis节点来说,优点在于防止了单节点故障造成整个服务停止运行的情况且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。
Redisson分布式锁支持MultiLock机制可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁。
java
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class CacheConfiguration {
@Autowired
RedisProperties redisProperties;
@Bean
RedissonClient redissonClient1() {
Config config = new Config();
String node = redisProperties.getSingle().getAddress1();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Bean
RedissonClient redissonClient2() {
Config config = new Config();
String node = redisProperties.getSingle().getAddress2();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Bean
RedissonClient redissonClient3() {
Config config = new Config();
String node = redisProperties.getSingle().getAddress3();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
}
java
@Data
public class RedisPoolProperties {
private int maxIdle;
private int minIdle;
private int maxActive;
private int maxWait;
private int connTimeout;
private int soTimeout;
// 池大小
private int size;
}
java
@ConfigurationProperties(prefix = "spring.redis", ignoreUnknownFields = false)
@Data
public class RedisProperties {
private int database;
/**
* 等待节点回复命令的时间。该时间从命令发送成功时开始计时
*/
private int timeout;
private String password;
private String mode;
// 池配置
private RedisPoolProperties pool;
// 单机信息配置
private RedisSingleProperties single;
}
java
@Data
public class RedisSingleProperties {
private String address1;
private String address2;
private String address3;
}
编写RedLockController:
java
@RestController
public class RedLockController {
public static final String CACHE_KEY_REDLOCK = "ROCKET_REDLOCK";
@Autowired
RedissonClient redissonClient1;
@Autowired
RedissonClient redissonClient2;
@Autowired
RedissonClient redissonClient3;
boolean isLockBoolean;
@GetMapping(value = "/multiLock")
public String getMultiLock() throws InterruptedException {
String uuid = IdUtil.simpleUUID();
String uuidValue = uuid+":"+Thread.currentThread().getId();
RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
RedissonMultiLock redLock = new RedissonMultiLock(lock1, lock2, lock3);
redLock.lock();
try {
System.out.println(uuidValue+"\t"+"---come in biz multiLock");
try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(uuidValue+"\t"+"---task is over multiLock");
} catch (Exception e) {
e.printStackTrace();
log.error("multiLock exception ",e);
} finally {
redLock.unlock();
log.info("释放分布式锁成功key:{}", CACHE_KEY_REDLOCK);
}
return "multiLock task is over "+uuidValue;
}
}