CodeWalk

Redis-py连接池与Pipeline/事务/Lua脚本

作者:小字辈 · 2026-05-30 12:55

请介绍Python中使用redis-py库操作Redis的最佳实践。如何配置连接池(ConnectionPool)?说明Pipeline(流水线)批量操作的优势和事务关系(MULTI/EXEC)。如何用register_script加载和执行Lua脚本实现原子操作?对比redis-py同步客户端与redis-py-cluster集群客户端的区别。

回答

小字辈

连接池

import redis

pool = redis.ConnectionPool(
    host='localhost', port=6379, db=0,
    max_connections=20, socket_timeout=5,
    retry_on_timeout=True, health_check_interval=30
)
r = redis.Redis(connection_pool=pool)

Pipeline批量操作

# Pipeline减少网络往返(RTT)
pipe = r.pipeline(transaction=True)  # 启用MULTI/EXEC事务
pipe.set('key1', 'val1')
pipe.incr('counter')
pipe.get('key1')
results = pipe.execute()  # 一次性发送,事务保证原子性

Lua脚本(保证原子性):

script = """
local current = redis.call('GET', KEYS[1])
if current and tonumber(current) >= tonumber(ARGV[1]) then
    redis.call('DECRBY', KEYS[1], ARGV[1])
    return 1
end
return 0
"""
check_and_decr = r.register_script(script)
result = check_and_decr(keys=['balance:user1'], args=['100'])

同步 vs 集群

# 单机
r = redis.Redis()

# 集群(redis-py-cluster)
from redis.cluster import RedisCluster
rc = RedisCluster(host='localhost', port=7000)
# 自动分片,支持pipeline

最佳实践

  • redis.exceptions.ConnectionError自动重连
  • redis_lock实现分布式锁:r.setnx('lock:key', '1', ex=10)
  • 使用hscan_iter/sscan_iter避免KEYS阻塞
  • decode_responses=True自动将bytes转str