Redis中提供了原子性命令SETEX或SET来写入STRING类型数据并设置Key的过时时间:git
> SET key value EX 60 NX ok > SETEX key 60 value ok
但对于HASH结构则没有这样的命令,只能先写入数据而后设置过时时间:github
> HSET key field value ok > EXPIRE key 60 ok
这样就带了一个问题:HSET命令执行成功而EXPIRE命令执行失败(如命令未能成功发送到Redis服务器),那么数据将不会过时。针对这个问题,本文提供了几种解决方案:redis
向Redis中写入HASH结构的Lua脚本以下:服务器
local fieldIndex=3 local valueIndex=4 local key=KEYS[1] local fieldCount=ARGV[1] local expired=ARGV[2] for i=1,fieldCount,1 do redis.pcall('HSET',key,ARGV[fieldIndex],ARGV[valueIndex]) fieldIndex=fieldIndex+2 valueIndex=valueIndex+2 end redis.pcall('EXPIRE',key,expired)
使用Redis命令行工具执行Lua脚本,须要将脚本内容单行化,并以分号间隔不一样的命令:async
> SCRIPT LOAD "local fieldIndex=3;local valueIndex=4;local key=KEYS[1];local fieldCount=ARGV[1];local expired=ARGV[2];for i=1,fieldCount,1 do redis.pcall('HSET',key,ARGV[fieldIndex],ARGV[valueIndex]) fieldIndex=fieldIndex+2 valueIndex=valueIndex+2 end;redis.pcall('EXPIRE',key,expired);" "e03e7868920b7669d1c8c8b16dcee86ebfac650d" > evalsha e03e7868920b7669d1c8c8b16dcee86ebfac650d 1 key 2 1000 field1 value1 field2 value2 nil
写入结果:函数
使用StackExchange.Redis执行Lua脚本:工具
public async Task WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry) { async Task func() { if (valueDict.Empty()) { return; } var luaScriptPath = $"{AppDomain.CurrentDomain.BaseDirectory}/Lua/HSET.lua"; var script = File.ReadAllText(luaScriptPath); var seconds = (int)Math.Ceiling(expiry.TotalSeconds); var fieldCount = valueDict.Count; var redisValues = new RedisValue[fieldCount * 2 + 2]; redisValues[0] = fieldCount; redisValues[1] = seconds; var i = 2; foreach (var item in valueDict) { redisValues[i] = item.Key; redisValues[i + 1] = item.Value; i += 2; } //await Database.ScriptEvaluateAsync(script, new RedisKey[] { key, fieldCount.ToString(), seconds.ToString() }, redisValues); await Database.ScriptEvaluateAsync(script, new RedisKey[] { key }, redisValues); } await ExecuteCommandAsync(func, $"redisError:hashWrite:{key}"); }
Redis官方文档在事务一节中指出:Redis命令只会在有语法错误或对Key使用了错误的数据类型时执行失败。所以,只要咱们保证将正确的写数据和设置过时时间的命令做为一个总体发送到服务器端便可,使用Lua脚本正式基于此。lua
StackExchange.Redis官方文档中关于事务的说明,参见:Transactionsspa
如下是代码实现:命令行
public async Task<bool> WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry) { var tranc = Database.CreateTransaction(); foreach (var item in valueDict) { tranc.HashSetAsync(key, item.Key, item.Value); } tranc.KeyExpireAsync(key, expiry); return await tranc.ExecuteAsync(); }
这种方案比较差,思路以下,共分为4步,每一步都有可能失败:
在读取Hash的值时,判断读到的field的值是不是Nil,如果则删除并忽略,若不是则处理。
代码以下:
namespace RedisClient.Imples { public class RedisHashOperator : RedisCommandExecutor, IRedisHashOperator { private readonly string KeyExpiryPlaceHolder = "expiryPlaceHolder"; public RedisHashOperator(ILogger<RedisHashOperator> logger, IRedisConnection redisConnection) : base(logger, redisConnection) { } public async Task WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry) { async Task action() { if (valueDict.Empty()) { return; } var hashList = new List<HashEntry>(); foreach (var value in valueDict) { hashList.Add(new HashEntry(value.Key, value.Value)); } await Database.HashSetAsync(key, hashList.ToArray()); } async Task successed() { await ExecuteCommandAsync(action, $"redisEorror:hashWrite:{key}"); } await SetKeyExpireAsync(key, expiry, successed); } public async Task<RedisReadResult<IDictionary<string, string>>> ReadAllFieldsAsync(string key) { async Task<RedisReadResult<IDictionary<string, string>>> func() { var redisReadResult = new RedisReadResult<IDictionary<string, string>>(); if (Database.KeyExists(key) == false) { return redisReadResult.Failed(); } var resultList = await Database.HashGetAllAsync(key); if (resultList == null) { return redisReadResult.Failed(); } var dict = new Dictionary<string, string>(); if (resultList.Any()) { foreach (var result in resultList) { if (result.Name == KeyExpiryPlaceHolder || result.Value == KeyExpiryPlaceHolder) { await RemoveKeyExpiryPlaceHolderAsync(key); continue; } dict[result.Name] = result.Value; } } return redisReadResult.Success(dict); } return await ExecuteCommandAsync(func, $"redisError:hashReadAll:{key}"); } #region private /// <summary> /// 设置HASH结构KEY的过时时间 /// </summary> /// <param name="successed">设置过时时间成功以后的回调函数</param> private async Task SetKeyExpireAsync(string key, TimeSpan expiry, Func<Task> successed) { // 确保KEY的过时时间写入成功以后再执其它的操做 await Database.HashSetAsync(key, new HashEntry[] { new HashEntry(KeyExpiryPlaceHolder, KeyExpiryPlaceHolder) }); if (Database.KeyExpire(key, expiry)) { await successed(); } await Database.HashDeleteAsync(key, KeyExpiryPlaceHolder); } private async Task RemoveKeyExpiryPlaceHolderAsync(string key) { await Database.HashDeleteAsync(key, KeyExpiryPlaceHolder); } #endregion } }
文中屡次出现的ExecuteCommandAsync方法主要目的是实现针对异常状况的统一处理,实现以下:
namespace RedisClient.Imples { public class RedisCommandExecutor { private readonly ILogger Logger; protected readonly IDatabase Database; public RedisCommandExecutor(ILogger<RedisCommandExecutor> logger, IRedisConnection redisConnection) { Logger = logger; Database = redisConnection.GetDatabase(); } protected async Task ExecuteCommandAsync(Func<Task> func, string errorMessage = null) { try { await func(); } catch (Exception ex) { if (string.IsNullOrEmpty(errorMessage)) { errorMessage = ex.Message; } Logger.LogError(errorMessage, ex); } } protected async Task<T> ExecuteCommandAsync<T>(Func<Task<T>> func, string errorMessage = null) { try { return await func(); } catch (Exception ex) { if (string.IsNullOrEmpty(errorMessage)) { errorMessage = ex.Message; } Logger.LogError(errorMessage, ex); return default(T); } } } }