redis相关基础文章

开始正文, 请睁大你的双眼,有任何疑问都可以在评论区留言,以laravel5.8框架为基础来编写业务逻辑。

带你一步步用php实现redis分布式、高并发库存问题

例1

<?php    
       
     $redis = \Illuminate\Support\Facades\Redis::connection();  //步骤1:   redis实例
     
     $stockKey = 'stock';                                       //步骤2:   库存key
   //$redis->set($stockKey, 50);                                //步骤3:   第一次运行,初始化库存(注意:初次执行)
     $stock = $redis->get('stock');                             //步骤4:   获取库存值
     if ($stock > 0) {                                          //步骤5:   库存大于0
          $stock = $stock - 1;                                  //步骤6:   减库存
          $redis->set('lock', $stock);                          //步骤7:   重新设置到缓存 
          echo true;                                            //步骤8:   减库存成功返回true 
     } else {
          echo false;                                           //步骤9:   减库存失败返false
     }
?>
对于以上代码我们指出不足以及改进(如下图所示)

1.png

  • 在高并发情况下,步骤4(获取库存值)可能会被多个线程在同一时间获取到相同的值,假如获取到相同的值为30,同时执行$stock = $stock - 1操作,值都为29,设置缓存成功,这样就会引起超卖现象.
  • 我们可以给程序加一把锁,只有获得锁的线程(用户)可以继续执行下面程序操作减库存.

例2 加锁

<?php    
       
     $redis = \Illuminate\Support\Facades\Redis::connection();  //步骤1:   redis实例
     $lockKey = 'lockKey';                                      //步骤2:   线程锁键key
     $isLock = $redis->setnx($lockKey, 1);                      //步骤3:   加锁
     if (!$isLock) {                                            //步骤4:未获得锁的线程(用户)直接返回,稍后再试~
         return '服务器繁忙,请稍后再试~';
     }      
     $stockKey = 'stock';                                       //步骤5:   库存键key     
   //$redis->set($stockKey, 50);                                //步骤6:   第一次运行,初始化库存(注意:初次执行)
     $stock = $redis->get('stock');                             //步骤7:   获取库存值
     if ($stock > 0) {                                          //步骤8:   库存大于0
          $stock = $stock - 1;                                  //步骤9:   减库存
          $redis->set('lock', $stock);                          //步骤10:   重新设置到缓存 
          echo true;                                            //步骤11:  减库存成功返回true 
     } else {
          echo false;                                           //步骤12:  减库存失败返false
     }
     $redis->del($isLock);                                      //步骤13:  删除当前锁  
?>
对于以上代码我们指出不足以及改进
  • 假如 步骤5-12过程中程序抛出异常,锁并未删除,依然造成锁死,我们可以加一个try{}finally{}去优先执行删除锁.

例3 异常捕获优先删除锁

<?php    
       
     $redis = \Illuminate\Support\Facades\Redis::connection();  //步骤1:   redis实例
     $lockKey = 'lockKey';                                      //步骤2:   线程锁键key
     $isLock = $redis->setnx($lockKey, 1);                      //步骤3:   加锁
     if (!$isLock) {                                            //步骤4:未获得锁的线程(用户)直接返回,稍后再试~
         return '服务器繁忙,请稍后再试~';
     }      
     try{
       $stockKey = 'stock';                                       //步骤5:   库存键key     
      //$redis->set($stockKey, 50);                               //步骤6:   第一次运行,初始化库存(注意:初次执行)
        $stock = $redis->get('stock');                            //步骤7:   获取库存值
        if ($stock > 0) {                                         //步骤8:   库存大于0
            $stock = $stock - 1;                                  //步骤9:   减库存
            $redis->set('lock', $stock);                          //步骤10:   重新设置到缓存 
            echo true;                                            //步骤11:  减库存成功返回true 
        } else {
            echo false;                                           //步骤12:  减库存成功返false
        }   
     }finally{
        $redis->del($isLock);                                     //步骤13:  删除当前锁  
     }
  
?>
对于以上代码我们指出不足以及改进
  • 假如 步骤3-4过程中服务器直接宕机,下面程序没办法执行,依然出现锁死问题,我加锁时设置一个过期时间。

例4 加完锁直接宕机了,我们给锁加一个超时时间

<?php    
       
     $redis = \Illuminate\Support\Facades\Redis::connection();  //步骤1:   redis实例
     $lockKey = 'lockKey';                                      //步骤2:   线程锁键key
     $isLock = $redis->setnx($lockKey, 1);                      //步骤3:   加锁
     $redis->expire($lockKey, 10);                              //步骤4:   给锁设置超时时间              
     if (!$isLock) {                                            //步骤5:未获得锁的线程(用户)直接返回,稍后再试~
         return '服务器繁忙,请稍后再试~';
     }      
     try{
       $stockKey = 'stock';                                      //步骤6:   库存键key     
      //$redis->set($stockKey, 50);                              //步骤7:   第一次运行,初始化库存(注意:初次执行)
        $stock = $redis->get('stock');                           //步骤8:   获取库存值
        if ($stock > 0) {                                        //步骤9:   库存大于0
            $stock = $stock - 1;                                 //步骤10:   减库存
            $redis->set('lock', $stock);                         //步骤11:   重新设置到缓存 
            echo true;                                           //步骤12:  减库存成功返回true 
        } else {
            echo false;                                          //步骤13:  减库存成功返false
        }   
     }finally{
        $redis->del($isLock);                                    //步骤14:  删除当前锁  
     }
  
?>
对于以上代码我们指出不足以及改进
  • 假如 步骤3-4过程中加锁时设置一个过期时间。但是两个程序依然不是原子块执行,步骤3直接宕机依然存在以上问题。
  • redis 从2.6.12版本开始,redis为SET命令可以保证加锁和设置一个过期时间在一个原子块内操作。

例5 保证加锁和设置一个过期时间在一个原子块内操作,使用Set(2.6.12版本)开始。

<?php    
       
     $redis = \Illuminate\Support\Facades\Redis::connection();  //步骤1:   redis实例
     $lockKey = 'lockKey';                                      //步骤2:   线程锁键key
     $isLock = $redis->set($lockKey, 1, 'ex', 10, 'nx');        //步骤3:   加锁并设置超时时间           
     if (!$isLock) {                                            //步骤4:未获得锁的线程(用户)直接返回,稍后再试~
         return '服务器繁忙,请稍后再试~';
     }      
     try{
       $stockKey = 'stock';                                      //步骤5:   库存键key     
      //$redis->set($stockKey, 50);                              //步骤6:   第一次运行,初始化库存(注意:初次执行)
        $stock = $redis->get('stock');                           //步骤7:   获取库存值
        if ($stock > 0) {                                        //步骤8:   库存大于0
            $stock = $stock - 1;                                 //步骤9:   减库存
            $redis->set('lock', $stock);                         //步骤10:   重新设置到缓存 
            echo true;                                           //步骤11:  减库存成功返回true 
        } else {
            echo false;                                          //步骤12:  减库存成功返false
        }   
     }finally{
        $redis->del($isLock);                                    //步骤13:  删除当前锁  
     }
  
?>
对于以上代码我们指出不足以及改进

6.png

  • 假设线程1(用户1)执行总时间为15s,假设线程2(用户2)执行总时间为8s,假设线程3(用户3)执行总时间为5s
  • 线程1(用户1)加完锁在步骤4-12执行10s业务逻辑redis释放了锁,用户2获得锁,用户2在步骤4-12执行了5s, 这时用户1释放了用户2的锁(用户1结束),用户3获得锁,用户3在步骤4-12执行了3s,用户2释放了用户3的锁,以此类推,就会造成锁永久失效。

例6 保证用户1只能释放自己的锁,改进代码

<?php    
       
     $redis = \Illuminate\Support\Facades\Redis::connection();  //步骤1:   redis实例
     $datacenterId = 123124354;                                 //指定数据中心ID 
     $workerId = 1122435;                                       //计算机ID
     $uuid  = new \Godruoyi\Snowflake\Snowflake($datacenterId, $workerId);              ////步骤2:分布式生成唯一uuid(https://github.com/godruoyi/php-snowflake)  
     $lockKey = 'lockKey';                                      //步骤3:   线程锁键key
     $isLock = $redis->set($lockKey, $uuid, 'ex', 10, 'nx');    //步骤4:   加锁并设置超时时间,设置值为uuid           
     if (!$isLock) {                                            //步骤5:未获得锁的线程(用户)直接返回,稍后再试~
         return '服务器繁忙,请稍后再试~';
     }      
     try{
       $stockKey = 'stock';                                      //步骤6:   库存键key     
      //$redis->set($stockKey, 50);                              //步骤7:   第一次运行,初始化库存(注意:初次执行)
        $stock = $redis->get('stock');                           //步骤8:   获取库存值
        if ($stock > 0) {                                        //步骤9:   库存大于0
            $stock = $stock - 1;                                 //步骤10:   减库存
            $redis->set('lock', $stock);                         //步骤11:   重新设置到缓存 
            echo true;                                           //步骤12:  减库存成功返回true 
        } else {
            echo false;                                          //步骤13:  减库存成功返false
        }   
     }finally{
        //这一步不是原子性操作,还是会有问题,我们用lua原子性去处理
        if ($uuid === $redis->get($lockKey)) {                   //步骤14:  保证用户删除的是自己的锁    
             $redis->del($lockKey);                              //步骤15:  删除当前锁 
         }
                                           
     }
?>
解决获取和删除不是原子性操作
<?php
class RedisLock
{
    /**
     * @var 当前锁标识,用于解锁
     */
    private $_lockFlag;

    private $_redis;

    public function __construct($host = '127.0.0.1', $port = '6379', $passwd = '')
    {
        $this->_redis = new Redis();
        $this->_redis->connect($host, $port);
        if ($passwd) {
            $this->_redis->auth($passwd);
        }
    }

    public function lock($key, $expire = 5)
    {
        $now= time();
        $expireTime = $expire + $now;
        if ($this->_redis->setnx($key, $expireTime)) {
            $this->_lockFlag = $expireTime;
            return true;
        }

        // 获取上一个锁的到期时间
        $currentLockTime = $this->_redis->get($key);
        if ($currentLockTime < $now) {
            /* 用于解决
            C0超时了,还持有锁,加入C1/C2/...同时请求进入了方法里面
            C1/C2都执行了getset方法(由于getset方法的原子性,
            所以两个请求返回的值必定不相等保证了C1/C2只有一个获取了锁) */
            $oldLockTime = $this->_redis->getset($key, $expireTime);
            if ($currentLockTime == $oldLockTime) {
                $this->_lockFlag = $expireTime;
                return true;
            }
        }

        return false;
    }

    public function lockByLua($key, $expire = 5)
    {
        $script = <<<EOF

            local key = KEYS[1]
            local value = ARGV[1]
            local ttl = ARGV[2]

            if (redis.call('setnx', key, value) == 1) then
                return redis.call('expire', key, ttl)
            elseif (redis.call('ttl', key) == -1) then
                return redis.call('expire', key, ttl)
            end

            return 0
EOF;

        $this->_lockFlag = md5(microtime(true));
        return $this->_eval($script, [$key, $this->_lockFlag, $expire]);
    }

    public function unlock($key)
    {
        $script = <<<EOF

            local key = KEYS[1]
            local value = ARGV[1]

            if (redis.call('exists', key) == 1 and redis.call('get', key) == value) 
            then
                return redis.call('del', key)
            end

            return 0

EOF;

        if ($this->_lockFlag) {
            return $this->_eval($script, [$key, $this->_lockFlag]);
        }
    }

    private function _eval($script, array $params, $keyNum = 1)
    {
        $hash = $this->_redis->script('load', $script);
        return $this->_redis->evalSha($hash, $params, $keyNum);
    }

}

$redisLock = new RedisLock();

$key = 'lock';
if ($redisLock->lockByLua($key)) {
    // to do...
    $redisLock->unlock($key);
}

 
?>

目前并发情况下还有一些问题,当某个进程执行时间大于锁过期时间,这时候还是会有2个进程同时执行减库存代码。

实现思路:可以在加锁的时候开一个子进程去监控 主进程是否完成,未完成则给主进程延时,目前未实现代码。
流程图

关注友儿不迷路

Last modification:October 14th, 2021 at 04:19 pm
如果觉得我的文章对你有用,请随意赞赏