开始正文, 有任何疑问都可以在评论区留言,以laravel5.8框架为基础来编写业务逻辑。
普通减库存(使用redis简单模拟减库存操作)
<?php
use \Illuminate\Support\Facades\Redis;
$redis = Redis::connection(); //步骤1: redis实例
$stockKey = 'stock'; //步骤2: 库存key
//$redis->set($stockKey, 50); //步骤3: 模拟初始化库存50
$stock = $redis->get('stock'); //步骤4: 获取库存值
if ($stock > 0) { //步骤5: 库存大于0
$stock = $stock - 1; //步骤6: 减库存
$redis->set('lock', $stock); //步骤7: 重新设置到缓存
echo true; //步骤8: 减库存成功返回true
} else {
echo false; //步骤9: 减库存失败返false
}
?>
并发用户在同一时间点到达步骤4(获取库存值)得到同一库存值并进行库存减一操作即会引起
超卖现象
。加锁
用setnx命令,给当前活动加一把锁(value的话,这里的话,我们暂且设置为1)。
<?php use \Illuminate\Support\Facades\Redis; $redis = Redis::connection(); //步骤1: redis实例 $lockKey = 'lockKey'; //步骤2: 线程锁键key $isLock = $redis->setnx($lockKey, 1); //步骤3: 加锁 if (!$isLock) { //步骤4:未获得锁的线程(用户)直接返回,稍后再试~ return '服务器繁忙,请稍后再试~'; } $stockKey = 'stock'; //步骤5: 库存key //$redis->set($stockKey, 50); //步骤6: 模拟初始化库存50 $stock = $redis->get('stock'); //步骤7: 获取库存值 if ($stock > 0) { //步骤8: 库存大于0 $stock = $stock - 1; //步骤9: 减库存 $redis->set('lock', $stock); //步骤10: 重新设置到缓存 echo true; //步骤11: 减库存成功返回true } else { echo false; //步骤12: 减库存失败返false } $redis->del($isLock); //步骤13: 删除当前锁 ?>
- 步骤3加锁如果执行setnx返回1,说明lockKey不存在,获取锁成功;当返回结果为0,说明lockKey已经存在,获取锁失败。
- 如果一个拿到锁的线程,在执行任务的过程中挂掉了,来不及显示的释放锁,则会一直占用着资源,导致其他线程无法拿到锁, 没法执行任务。所以在执行setnx命令之后,需要给锁显示设置一个锁超时时间,以保证即使拿到锁的线程挂掉了,也能在超过一定时间自动释放锁,让出资源。而setnx不支持设置超时参数,所以需要其他命令来执行。
如果执行完setnx之后,节点1宕机了,还没来得及执行expire命令:(即步骤3-4过程中加锁时设置一个过期时间,但是两个 程序依然不是原子块执行,步骤3直接宕机依然存在以上问题),这时候我们就需要添加异常捕获优先删除锁
try{}finally{}
,redis 从2.6.12版本
开始,redis为SET命令可以保证加锁和设置一个过期时间在一个原子块内操作。设置锁超时并且添加异常捕获优先删除锁
<?php use \Illuminate\Support\Facades\Redis; $redis = Redis::connection(); //步骤1: redis实例 $lockKey = 'lockKey'; //步骤2: 线程锁键key $isLock = $redis->setnx($lockKey, 1); //步骤3: 加锁 $redis->expire($lockKey, 10); //步骤4: 给锁设置超时时间 //2.6.12版本可用,如版本低于2.6.12请使用lua脚本执行保证原子性操作 $isLock = $redis->set($lockKey, 1, 'ex', 10, 'nx'); if (!$isLock) { //步骤5:未获得锁的线程(用户)直接返回,稍后再试~ return '服务器繁忙,请稍后再试~'; } try{ $stockKey = 'stock'; //步骤6: 库存键key //$redis->set($stockKey, 50); //步骤7: 模拟初始化库存50 $stock = $redis->get('stock'); //步骤8: 获取库存值 if ($stock > 0) { //步骤9: 库存大于0 $stock = $stock - 1; //步骤10: 减库存 $redis->set('lock', $stock); //步骤11: 重新设置到缓存 echo true; //步骤12: 减库存成功返回true } else { echo false; //步骤13: 减库存成功返false } }finally{ $redis->del($isLock); //步骤14: 删除当前锁 } ?>
DEL误删
- 又是一个极端场景,假设节点1的线程A通过set拿到了锁,并设置了过期时间30秒。
- 由于某些原因,导致线程A执行的很慢,超时时间30秒过去了,但线程A还没执行完,这个时候锁自动释放,线程B得到了锁。
- 随后,线程A任务执行完,进行del操作释放锁,这个时候线程B还没执行完,线程A实际上删除的是线程B加的锁。
如何解决这个问题呢?
每个线程在set操作的时候,可以给value设置一个唯一的值,然后在del释放锁之前加一个判断,验证当前的锁是不是自身加的锁。
<?php use \Illuminate\Support\Facades\Redis; use \Godruoyi\Snowflake\Snowflake; $redis = Redis::connection(); //步骤1: redis实例 $datacenterId = 123124354; //指定数据中心ID $workerId = 1122435; //计算机ID $uuid = new Snowflake($datacenterId, $workerId); //步骤2:分布式生成唯一uuid(https://github.com/godruoyi/php-snowflake) //$uuid = session_create_id() $lockKey = 'lockKey'; //步骤3: 线程锁键key $isLock = $redis->set($lockKey, $uuid, 'ex', 10, 'nx'); //步骤4: 加锁并设置超时时间,设置值为uuid if (!$isLock) { //步骤5:未获得锁的线程(用户)直接返回,稍后再试~ return '服务器繁忙,请稍后再试~'; } try { $stockKey = 'stock'; //步骤6: 库存键key //$redis->set($stockKey, 50); //步骤7: 第一次运行,初始化库存(注意:初次执行) $stock = $redis->get('stock'); //步骤8: 获取库存值 if ($stock > 0) { //步骤9: 库存大于0 $stock = $stock - 1; //步骤10: 减库存 $redis->set('lock', $stock); //步骤11: 重新设置到缓存 echo true; //步骤12: 减库存成功返回true } else { echo false; //步骤13: 减库存成功返false } } finally { //这一步不是原子性操作,还是会有问题,我们用lua原子性去处理 if ($uuid === $redis->get($lockKey)) { //步骤14: 保证用户删除的是自己的锁 $redis->del($lockKey); //步骤15: 删除当前锁 } //lua原子性去处理如下 $script = <<<EOF local key = KEYS[1] local value = ARGV[1] if (redis.call('exists', key) == 1 and redis.call('get', key) == value) then return redis.call('del', key) end return 0 EOF; $redis->eval($script, [$lockKey,$uuid]); } ?>
依然存在get和del非原子性操作(步骤14和步骤15),需要通过lua脚本进行原子性处理。
<?php class RedisLock { /** * @var 当前锁标识,用于解锁 */ private $_lockFlag; private $_redis; public function __construct($host = '127.0.0.1', $port = '6379', $passwd = '') { $this->_redis = new Redis(); $this->_redis->connect($host, $port); if ($passwd) { $this->_redis->auth($passwd); } } public function lock($key, $expire = 5) { $now= time(); $expireTime = $expire + $now; if ($this->_redis->setnx($key, $expireTime)) { $this->_lockFlag = $expireTime; return true; } // 获取上一个锁的到期时间 $currentLockTime = $this->_redis->get($key); if ($currentLockTime < $now) { /* 用于解决 C0超时了,还持有锁,加入C1/C2/...同时请求进入了方法里面 C1/C2都执行了getset方法(由于getset方法的原子性, 所以两个请求返回的值必定不相等保证了C1/C2只有一个获取了锁) */ $oldLockTime = $this->_redis->getset($key, $expireTime); if ($currentLockTime == $oldLockTime) { $this->_lockFlag = $expireTime; return true; } } return false; } public function lockByLua($key, $expire = 5) { $script = <<<EOF local key = KEYS[1] local value = ARGV[1] local ttl = ARGV[2] if (redis.call('setnx', key, value) == 1) then return redis.call('expire', key, ttl) elseif (redis.call('ttl', key) == -1) then return redis.call('expire', key, ttl) end return 0 EOF; $this->_lockFlag = md5(microtime(true)); return $this->_eval($script, [$key, $this->_lockFlag, $expire]); } public function unlock($key) { $script = <<<EOF local key = KEYS[1] local value = ARGV[1] if (redis.call('exists', key) == 1 and redis.call('get', key) == value) then return redis.call('del', key) end return 0 EOF; if ($this->_lockFlag) { return $this->_eval($script, [$key, $this->_lockFlag]); } } private function _eval($script, array $params, $keyNum = 1) { $hash = $this->_redis->script('load', $script); return $this->_redis->evalSha($hash, $params, $keyNum); } } $redisLock = new RedisLock(); $key = 'lock'; if ($redisLock->lockByLua($key)) { // to do... $redisLock->unlock($key); } ?>
目前并发情况下还有一些问题,当某个进程执行时间大于锁过期时间,进行延时。
- 可以在加锁的时候开一个子进程去监控 主进程是否完成,未完成则给主进程延时,目前未实现代码。
One comment
https://github.com/ronnylt/redlock-php