锁和分布式锁

AldridgeMarguerite 发布于2年前

锁的由来

多线程环境中,经常遇到多个线程访问同一个 共享资源 ,这时候作为开发者必须考虑如何维护数据一致性,这就需要某种机制来保证只有满足某个条件(获取锁成功)的线程才能访问资源,而不满足条件(获取锁失败)的线程只能等待,在下一轮竞争中来获取锁才能访问资源。

两个知识点:

1.高级缓存Cache

锁和分布式锁

CPU为了提高处理速度,不和内存直接进行交互,而是使用Cache。

可能引发的问题:

锁和分布式锁

如果多个处理器同时对共享变量进行读改写操作 (i++就是经典的读改写操作),那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的了,操作完之后共享变量的值会和期望的不一致。

造成此结果的原因:

多个处理器同时从各自的缓存中读取变量i,分别进行加1操作,然后分别写入 系统内存中。

处理器层面的解决方案:

处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个 LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。

2.CAS(Compare And Swap)+volatile

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。执行CAS操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。

java的Atomic以及一些它自带的类中的cas操作都是通过借助cmpxchg指令完成的。他保证同一时刻只能有一个线程cas成功。

举个例子

以AtomicIneger的源码为例来看看CAS操作:

锁和分布式锁

for(;;)表示循环,只有当if判断为true才退出。而if判断的内容就是是否CAS成功。

锁和分布式锁

锁和分布式锁

volatile的作用:

1)将当前处理器缓存行的数据写回到系统内存。

2)这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效。

循环CAS+volatile是实现锁的关键。

Lock锁的部分细节

锁和分布式锁

锁和分布式锁

不同场景锁的表现不同:独占?共享?读写?

锁和分布式锁

分布式锁(redis的简单实现)

分布式锁实现的三个核心要素:

  • 1.加锁

最简单的方法是使用setnx命令。key是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给key命名为 “lock_sale_商品ID” 。而value设置成什么呢?我们可以姑且设置成1。加锁的伪代码如下:

setnx(key,1)

SETNX key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
时间复杂度:
O(1)
返回值:
设置成功,返回 1 。
设置失败,返回 0 。

当一个线程执行setnx返回1,说明key原本不存在,该线程成功得到了锁;当一个线程执行setnx返回0,说明key已经存在,该线程抢锁失败。

  • 2.解锁

有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行del指令,伪代码如下:

del(key)

释放锁之后,其他线程就可以继续执行setnx命令来获得锁。

  • 3.设置超时时间

如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。

所以,setnx的key必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx不支持超时参数,所以需要额外的指令,伪代码如下:

expire(key, 30)

综合起来,我们分布式锁实现的第一版伪代码如下:

if(setnx(key,1) == 1){
    expire(key,30)
    do something ......
    del(key)
    }

上述代码的问题:

  • 1 setnx和expire的非原子性

锁和分布式锁

setnx刚执行成功,还未来得及执行expire指令,节点1 Duang的一声挂掉了。

锁和分布式锁

这样一来,这个锁就长生不死了。

解决方案:

Redis 2.6.12以上版本为set指令增加了可选参数,伪代码如下:

set(key,1,30,NX)
  • 2 del 导致误删

锁和分布式锁

锁和分布式锁

锁和分布式锁

可以在del释放锁之前做一个判断,验证当前的锁是不是自己加的锁

至于具体的实现,可以在加锁的时候把当前的线程ID当做value,并在删除之前验证key对应的value是不是自己线程的ID。

加锁:

String threadId = Thread.currentThread().getId()
set(key,threadId ,30,NX)

解锁:

if(threadId .equals(redisClient.get(key))){
    del(key)
}

这样做又隐含了一个新的问题,判断和释放锁是两个独立操作,不是原子性。

这一块要用Lua脚本来实现:

String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

redisClient.eval(luaScript , Collections.singletonList(key), Collections.singletonList(threadId));

redis官方说:eval命令在执行lua脚本时会当作一个命令去执行,并且直到命令执行完成redis才会去执行其他命令,所以就变成了一个原子操作。

  • 3出现并发的可能性

进程1在超时时间内未执行完代码,此时进程2是可以获取锁的,会出现两个进程同时访问一个资源的情况。

解决方案:可以在进程1所在的jvm环境中开一个线程专门用来“续命”,当需要解锁的时候,通知这个续命线程结束执行。

private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 线程Id
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }
private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

查看原文: 锁和分布式锁

  • organicsnake