从源码分析基于Redis的分布式锁

分布式锁

分布式锁是控制分布式系统之间同时操作一个数据的一种方式,通过互斥来保证数据的一致性。

安全和可靠性保证:

  • 一致性: 互斥,不管任何时候,只有一个客户端能持有同一个锁。
  • 分区可容忍性: 不会死锁,最终一定会得到锁,就算一个持有锁的客户端宕掉或者发生网络分区。
  • 可用性: 只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。

Redlock算法

在分布式版本的算法里我们假设我们有NRedis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调算法。我们已经描述了如何在单节点环境下安全地获取和释放锁。因此我们理所当然地应当用这个方法在每个单节点里来获取和释放锁。在我们的例子里面我们把N设成5,这个数字是一个相对比较合理的数值,因此我们需要在不同的计算机或者虚拟机上运行5master节点来保证他们大多数情况下都不会同时宕机。一个客户端需要做如下操作来获取锁:

  • 获取当前时间(单位是毫秒)。

  • 轮流用相同的key和随机值在N个节点上请求锁,在这一步里,客户端在每个master上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是10秒钟,那每个节点锁请求的超时时间可能是5-50毫秒的范围,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,我们应该尽快尝试下一个master节点。

  • 客户端计算第二步中获取锁所花的时间,只有当客户端在大多数master节点上成功获取了锁(在这里是3个),而且总共消耗的时间不超过锁释放时间,这个锁就认为是获取成功了。

  • 如果锁获取成功了,那现在锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。

  • 如果锁获取失败了,不管是因为获取成功的锁不超过一半(N/2+1)还是因为总消耗时间超过了锁释放时间,客户端都会到每个master节点上释放锁,即便是那些他认为没有获取成功的锁。

redlock-py

redlock-py 是一个python基于redis实现的分布式锁

我们先来看看redlock-py的使用, 这里使用多进程来操作获取锁和释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool
from redlock import Redlock
dlm = Redlock([{"host": "127.0.0.1", "port": 6379, "db": 0}], retry_count=3, retry_delay=0.2)
def test_pool():
my_lock = dlm.lock("my_resource_name", 1000) //获取锁
print(my_lock)
a = dlm.unlock(my_lock) //释放锁
print(a)
if __name__ == '__main__':
pool = Pool(2)
for i in range(0, 5):
pool.apply_async(test_pool)
pool.close()
pool.join()

redlock-py源码分析

类变量

1
2
3
4
5
6
7
8
9
default_retry_count = 3
default_retry_delay = 0.2
clock_drift_factor = 0.01
unlock_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end"""

类中定义的变量:

  • default_retry_count: 默认重试次数, (3次)
  • default_retry_delay: 默认重试延时, (0.2毫秒)
  • clock_drift_factor: 过期时间精度, (0.01秒)
  • unlock_script: 释放锁的lua脚本,获取锁的value(value为随机数)是否和传入参数相同,相同既释放锁(删除key)

lock_instance函数

获取锁, 通过redisset命令实现

1
2
3
4
5
6
def lock_instance(self, server, resource, val, ttl):
try:
assert isinstance(ttl, int), 'ttl {} is not an integer'.format(ttl)
except AssertionError as e:
raise ValueError(str(e))
return server.set(resource, val, nx=True, px=ttl)

Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:

  • EX second: 设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value

  • PX millisecond: 设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value

  • NX: 只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value

  • XX :只在键已经存在时,才对键进行设置操作

unlock_instance函数

释放锁, 通过redislua脚本实现

1
2
3
4
5
def unlock_instance(self, server, resource, val):
try:
server.eval(self.unlock_script, 1, resource, val)
except Exception as e:
logging.exception("Error unlocking resource %s in server %s", resource, str(server))

Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。

  • script 参数是一段 Lua 5.1 脚本程序,它会被运行在 Redis 服务器上下文中,这段脚本不必(也不应该)定义为一个 Lua 函数。

  • numkeys 参数用于指定键名参数的个数。

get_unique_id函数

生成随机数, 随机值是为了以安全的方式释放锁

例如:如果不使用随机数,A客户端获取了锁,在业务操作中的时间超过了锁的有效时间(过期时间), B客户端就可以获取锁, 此时A客户端业务操作完成想要去释放锁,就会导致将B客户端的释放了。

1
2
3
def get_unique_id(self):
CHARACTERS = string.ascii_letters + string.digits
return ''.join(random.choice(CHARACTERS) for _ in range(22)).encode()

lock函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def lock(self, resource, ttl):
retry = 0
val = self.get_unique_id() // 生产随机数
# Add 2 milliseconds to the drift to account for Redis expires
# precision, which is 1 millisecond, plus 1 millisecond min
# drift for small TTLs.
drift = int(ttl * self.clock_drift_factor) + 2
redis_errors = list()
while retry < self.retry_count: //重试机制
n = 0
start_time = int(time.time() * 1000)
del redis_errors[:]
for server in self.servers:
try:
if self.lock_instance(server, resource, val, ttl): //获取锁
n += 1
except RedisError as e:
redis_errors.append(e)
elapsed_time = int(time.time() * 1000) - start_time
validity = int(ttl - elapsed_time - drift) //通过获取开始时间和结束时间计算剩余ttl
if validity > 0 and n >= self.quorum:
if redis_errors:
raise MultipleRedlockException(redis_errors)
return Lock(validity, resource, val) //返回锁信息
else:
for server in self.servers:
try:
self.unlock_instance(server, resource, val) //获取锁失败,尝试释放锁
except:
pass
retry += 1
time.sleep(self.retry_delay) //重试延时
return False

unlock函数

1
2
3
4
5
6
7
8
9
def unlock(self, lock):
redis_errors = []
for server in self.servers:
try:
self.unlock_instance(server, lock.resource, lock.key) // 释放锁
except RedisError as e:
redis_errors.append(e)
if redis_errors:
raise MultipleRedlockException(redis_errors)

参考文章: https://redis.io/topics/distlock

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器