Skip to content

Instantly share code, notes, and snippets.

@MarshalOfficial
Created January 5, 2021 07:49
Show Gist options
  • Save MarshalOfficial/89c14cfaed7bbfa4f7c3f9eec16fa96a to your computer and use it in GitHub Desktop.
Save MarshalOfficial/89c14cfaed7bbfa4f7c3f9eec16fa96a to your computer and use it in GitHub Desktop.
batch insert to redis via MSET in stackexchange.redis c# library
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace YOURPROJECTNAMESPACE
{
public class RedisEngine
{
private string filesPath;
public RedisEngine(DBRepository dBRepository)
{
try
{
filesPath = Path.Combine(Extension.Extensions.AssemblyDirectory, "Storage");
Program.Logger.Info("Target Folder Path: " + filesPath);
}
catch(Exception e)
{
Program.Logger.Error(e);
}
}
public async Task ExecuteAsync()
{
try
{
if (!Extension.Extensions.CheckConnection(Program.GetSettingValue("RedisServerIP"), int.Parse(Program.GetSettingValue("RedisServerPort"))))
{
Program.Logger.Info($"redis server is unavailable on ip & port that you put in the app config, please check the issue with your network administrator.");
return;
}
if (!Directory.Exists(filesPath))
{
Program.Logger.Info($"there is no storage folder to processing for redis loader");
return;
}
if (Program.GetSettingValue("IsAutoRedisInserter") == "1")
{
while (true)
{
BodyProcess();
Thread.Sleep(5000);
}
}
else
{
BodyProcess();
}
}
catch (Exception ex)
{
Program.Logger.Error(ex);
}
}
private void BodyProcess()
{
var tasks = new List<Task>();
DirectoryInfo dir = new DirectoryInfo(filesPath);
Program.Logger.Info("*****************************************************");
Program.Logger.Info("List of All Files in Storage Dir:");
dir.GetFiles("*.txt").ToList().ForEach(p=>Program.Logger.Info(p.FullName));
Program.Logger.Info("*****************************************************");
foreach (var file in dir.GetFiles("*.txt").OrderBy(l => l.FullName))
{
tasks.Add(new Task(() => InsertToRedis(file)));
}
var maxparallelwrite = int.Parse(Program.GetSettingValue("MaxWriteThread"));
while (tasks.Any(l => !l.IsCompleted))
{
var innserTasks = tasks.Where(l => !l.IsCompleted).Take(maxparallelwrite).ToList();
innserTasks.ForEach(l => l.Start());
Task.WaitAll(innserTasks.ToArray());
}
Task.WaitAll(tasks.ToArray());
//tasks.ForEach(l => l.Start());
//await Task.WhenAll(tasks);
//Task.WaitAll(tasks.ToArray());
}
private void InsertToRedis(FileInfo file)
{
try
{
var lines = File.ReadLines(file.FullName);
if (lines == null || !lines.Any())
{
file.Delete();
return;
}
if (!lines.FirstOrDefault().StartsWith(Program.GetSettingValue("MetaDataSignature")))
{
Program.Logger.Info($"this file: '{file.Name}' is not valid, first line must be MetaData information.");
return;
}
var config = JsonConvert.DeserializeObject<RedisConfig>(lines.First().Replace(Program.GetSettingValue("MetaDataSignature"), string.Empty));
if (config == null)
{
Program.Logger.Info($"can not deserialize config from MetaData information, in this file: {file.Name}");
return;
}
using (var redis = ConnectionMultiplexer.Connect($"{Program.GetSettingValue("RedisServerIP")}:{Program.GetSettingValue("RedisServerPort")},syncTimeout=150000,allowAdmin=true,password={Program.GetSettingValue("RedisServerPassword")}"))
{
var db = redis.GetDatabase();
const int BatchSize = 50000;
var batch = new List<KeyValuePair<RedisKey, RedisValue>>(BatchSize);
foreach (var pair in lines.Skip(1))
{
//Program.Logger.Info(pair);
string keyvalue;
if (config.ArrayValue)
{
dynamic result = JsonConvert.DeserializeObject(pair);
var obj = (JObject)result[0];
keyvalue = obj[config.ModelClassKey].Value<string>();
}
else
{
var obj = (JObject)JsonConvert.DeserializeObject(pair);
keyvalue = obj[config.ModelClassKey].Value<string>();
}
batch.Add(new KeyValuePair<RedisKey, RedisValue>($"{config?.RedisKeyTitle}{(string.IsNullOrWhiteSpace(config?.Seprator) ? "_" : config?.Seprator)}" + keyvalue, pair));
if (batch.Count == BatchSize)
{
Program.Logger.Info($"trying to add a new batch to redis ...");
db.StringSet(batch.ToArray());
Program.Logger.Info($"{BatchSize} record has been added to redis.");
//await db.StringSetAsync(batch.ToArray());
//tasks.Add(new Task(async () => await db.StringSetAsync(batch.ToArray())));
batch.Clear();
}
}
if (batch.Count != 0) // final batch
{
db.StringSet(batch.ToArray());
Program.Logger.Info($"{batch.Count} record has been added to redis.");
}
//await db.StringSetAsync(batch.ToArray());
//tasks.Add(new Task(async () => await db.StringSetAsync(batch.ToArray())));
}
//tasks.ForEach(l => l.Start());
//await Task.WhenAll(tasks);
file.Delete();
Program.Logger.Info($"{file.FullName} file has been deleted.");
}
catch (Exception ex)
{
Program.Logger.Error(ex);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment