开始正文, 有任何疑问都可以在评论区留言,以laravel5.8框架为基础来编写业务逻辑。

普通减库存(使用redis简单模拟减库存操作)

<?php
           
use  \Illuminate\Support\Facades\Redis;
  
$redis = Redis::connection();                              //步骤1:   redis实例
$stockKey = 'stock';                                       //步骤2:   库存key
//$redis->set($stockKey, 50);                              //步骤3:   模拟初始化库存50                                     
$stock = $redis->get('stock');                             //步骤4:   获取库存值
if ($stock > 0) {                                          //步骤5:   库存大于0
      $stock = $stock - 1;                                  //步骤6:   减库存
      $redis->set('lock', $stock);                          //步骤7:   重新设置到缓存 
      echo true;                                            //步骤8:   减库存成功返回true 
   } else {
      echo false;                                           //步骤9:   减库存失败返false
}
?>
  • 并发用户在同一时间点到达步骤4(获取库存值)得到同一库存值并进行库存减一操作即会引起超卖现象

    加锁

  • 用setnx命令,给当前活动加一把锁(value的话,这里的话,我们暂且设置为1)。

    <?php    
    use  \Illuminate\Support\Facades\Redis;
    
    $redis = Redis::connection();                              //步骤1:   redis实例
    $lockKey = 'lockKey';                                      //步骤2:   线程锁键key
    $isLock = $redis->setnx($lockKey, 1);                      //步骤3:   加锁
    if (!$isLock) {                                            //步骤4:未获得锁的线程(用户)直接返回,稍后再试~
      return '服务器繁忙,请稍后再试~';
    }      
    $stockKey = 'stock';                                       //步骤5:   库存key
    //$redis->set($stockKey, 50);                              //步骤6:   模拟初始化库存50                                     
    $stock = $redis->get('stock');                             //步骤7:   获取库存值
    if ($stock > 0) {                                          //步骤8:   库存大于0
        $stock = $stock - 1;                                 //步骤9:   减库存
        $redis->set('lock', $stock);                         //步骤10:   重新设置到缓存 
        echo true;                                           //步骤11:   减库存成功返回true 
     } else {
        echo false;                                           //步骤12:   减库存失败返false
    }
    $redis->del($isLock);                                       //步骤13:  删除当前锁  
    ?>
  • 步骤3加锁如果执行setnx返回1,说明lockKey不存在,获取锁成功;当返回结果为0,说明lockKey已经存在,获取锁失败。
  • 如果一个拿到锁的线程,在执行任务的过程中挂掉了,来不及显示的释放锁,则会一直占用着资源,导致其他线程无法拿到锁, 没法执行任务。所以在执行setnx命令之后,需要给锁显示设置一个锁超时时间,以保证即使拿到锁的线程挂掉了,也能在超过一定时间自动释放锁,让出资源。而setnx不支持设置超时参数,所以需要其他命令来执行。
  • 如果执行完setnx之后,节点1宕机了,还没来得及执行expire命令:(即步骤3-4过程中加锁时设置一个过期时间,但是两个 程序依然不是原子块执行,步骤3直接宕机依然存在以上问题),这时候我们就需要添加异常捕获优先删除锁try{}finally{},redis 从2.6.12版本开始,redis为SET命令可以保证加锁和设置一个过期时间在一个原子块内操作。

    设置锁超时并且添加异常捕获优先删除锁

    <?php
    
    use \Illuminate\Support\Facades\Redis;
    
    $redis = Redis::connection();                              //步骤1:   redis实例
    $lockKey = 'lockKey';                                      //步骤2:   线程锁键key
    $isLock = $redis->setnx($lockKey, 1);                      //步骤3:   加锁
    $redis->expire($lockKey, 10);                              //步骤4:   给锁设置超时时间
    //2.6.12版本可用,如版本低于2.6.12请使用lua脚本执行保证原子性操作
    $isLock = $redis->set($lockKey, 1, 'ex', 10, 'nx');                                                                
    if (!$isLock) {                                            //步骤5:未获得锁的线程(用户)直接返回,稍后再试~
    return '服务器繁忙,请稍后再试~';
    }      
    try{
     $stockKey = 'stock';                                   //步骤6:   库存键key     
     //$redis->set($stockKey, 50);                          //步骤7:   模拟初始化库存50
     $stock = $redis->get('stock');                         //步骤8:   获取库存值
     if ($stock > 0) {                                      //步骤9:   库存大于0
           $stock = $stock - 1;                             //步骤10:   减库存
           $redis->set('lock', $stock);                     //步骤11:   重新设置到缓存 
           echo true;                                       //步骤12:  减库存成功返回true 
     } else {
           echo false;                                      //步骤13:  减库存成功返false
     }   
    }finally{
     $redis->del($isLock);                                  //步骤14:  删除当前锁  
    }
    ?>

    DEL误删

  • 又是一个极端场景,假设节点1的线程A通过set拿到了锁,并设置了过期时间30秒。
    1.png
  • 由于某些原因,导致线程A执行的很慢,超时时间30秒过去了,但线程A还没执行完,这个时候锁自动释放,线程B得到了锁。
    2.png
  • 随后,线程A任务执行完,进行del操作释放锁,这个时候线程B还没执行完,线程A实际上删除的是线程B加的锁。
    3.png
  • 如何解决这个问题呢?

    • 每个线程在set操作的时候,可以给value设置一个唯一的值,然后在del释放锁之前加一个判断,验证当前的锁是不是自身加的锁。

      <?php
      use \Illuminate\Support\Facades\Redis;
      use \Godruoyi\Snowflake\Snowflake;
      
      $redis = Redis::connection();                              //步骤1: redis实例
      $datacenterId = 123124354;                                 //指定数据中心ID 
      $workerId = 1122435;                                       //计算机ID
      $uuid = new Snowflake($datacenterId, $workerId);           //步骤2:分布式生成唯一uuid(https://github.com/godruoyi/php-snowflake)  
      //$uuid = session_create_id()
      $lockKey = 'lockKey';                                      //步骤3:   线程锁键key
      $isLock = $redis->set($lockKey, $uuid, 'ex', 10, 'nx');    //步骤4:   加锁并设置超时时间,设置值为uuid           
      if (!$isLock) {                                            //步骤5:未获得锁的线程(用户)直接返回,稍后再试~
      return '服务器繁忙,请稍后再试~';
      }
      try {
      $stockKey = 'stock';                                    //步骤6:   库存键key     
      //$redis->set($stockKey, 50);                           //步骤7:   第一次运行,初始化库存(注意:初次执行)
      $stock = $redis->get('stock');                          //步骤8:   获取库存值
      if ($stock > 0) {                                       //步骤9:   库存大于0
          $stock = $stock - 1;                                //步骤10:   减库存
          $redis->set('lock', $stock);                        //步骤11:   重新设置到缓存 
          echo true;                                          //步骤12:  减库存成功返回true 
      } else {
          echo false;                                         //步骤13:  减库存成功返false
      }
      } finally {
      //这一步不是原子性操作,还是会有问题,我们用lua原子性去处理
      if ($uuid === $redis->get($lockKey)) {                   //步骤14:  保证用户删除的是自己的锁    
          $redis->del($lockKey);                               //步骤15:  删除当前锁 
      }
      //lua原子性去处理如下
       $script = <<<EOF
                  local key = KEYS[1]
                  local value = ARGV[1]
                  if (redis.call('exists', key) == 1 and redis.call('get', key) == value) 
                  then
                      return redis.call('del', key)
                  end
                  return 0
      
      EOF;
      $redis->eval($script, [$lockKey,$uuid]);
      
      }
      ?>
  • 依然存在get和del非原子性操作(步骤14和步骤15),需要通过lua脚本进行原子性处理。

    <?php
    class RedisLock
    {
      /**
       * @var 当前锁标识,用于解锁
       */
      private $_lockFlag;
    
      private $_redis;
    
      public function __construct($host = '127.0.0.1', $port = '6379', $passwd = '')
      {
          $this->_redis = new Redis();
          $this->_redis->connect($host, $port);
          if ($passwd) {
              $this->_redis->auth($passwd);
          }
      }
    
      public function lock($key, $expire = 5)
      {
          $now= time();
          $expireTime = $expire + $now;
          if ($this->_redis->setnx($key, $expireTime)) {
              $this->_lockFlag = $expireTime;
              return true;
          }
          // 获取上一个锁的到期时间
          $currentLockTime = $this->_redis->get($key);
          if ($currentLockTime < $now) {
              /* 用于解决
              C0超时了,还持有锁,加入C1/C2/...同时请求进入了方法里面
              C1/C2都执行了getset方法(由于getset方法的原子性,
              所以两个请求返回的值必定不相等保证了C1/C2只有一个获取了锁) */
              $oldLockTime = $this->_redis->getset($key, $expireTime);
              if ($currentLockTime == $oldLockTime) {
                  $this->_lockFlag = $expireTime;
                  return true;
              }
          }
    
          return false;
      }
    
      public function lockByLua($key, $expire = 5)
      {
          $script = <<<EOF
    
              local key = KEYS[1]
              local value = ARGV[1]
              local ttl = ARGV[2]
    
              if (redis.call('setnx', key, value) == 1) then
                  return redis.call('expire', key, ttl)
              elseif (redis.call('ttl', key) == -1) then
                  return redis.call('expire', key, ttl)
              end
    
              return 0
    EOF;
    
          $this->_lockFlag = md5(microtime(true));
          return $this->_eval($script, [$key, $this->_lockFlag, $expire]);
      }
    
      public function unlock($key)
      {
          $script = <<<EOF
    
              local key = KEYS[1]
              local value = ARGV[1]
    
              if (redis.call('exists', key) == 1 and redis.call('get', key) == value) 
              then
                  return redis.call('del', key)
              end
    
              return 0
    
    EOF;
    
          if ($this->_lockFlag) {
              return $this->_eval($script, [$key, $this->_lockFlag]);
          }
      }
    
      private function _eval($script, array $params, $keyNum = 1)
      {
          $hash = $this->_redis->script('load', $script);
          return $this->_redis->evalSha($hash, $params, $keyNum);
      }
    
    }
    
    $redisLock = new RedisLock();
    
    $key = 'lock';
    if ($redisLock->lockByLua($key)) {
      // to do...
      $redisLock->unlock($key);
    }
    
     
    ?>

    目前并发情况下还有一些问题,当某个进程执行时间大于锁过期时间,进行延时。

  • 可以在加锁的时候开一个子进程去监控 主进程是否完成,未完成则给主进程延时,目前未实现代码。
    子进程去监控流程图

完整流程图

redis实现分布式锁.png

Last modification:May 29, 2024
如果觉得我的文章对你有用,请随意赞赏