From e7dfc4349d89e1c7f4e7a68002aaa07cfd087baf Mon Sep 17 00:00:00 2001 From: bluextx Date: Sat, 11 Jan 2025 08:12:58 +0300 Subject: [PATCH] Refactored MultithreadSteppedDumpProcessor to improve concurrency and thread safety --- .../MultithreadSteppedDumpProcessor.cs | 412 +++++++++--------- .../Process/Writer/FileWriter.cs | 5 +- 2 files changed, 199 insertions(+), 218 deletions(-) diff --git a/source/LootDumpProcessor/Process/Processor/DumpProcessor/MultithreadSteppedDumpProcessor.cs b/source/LootDumpProcessor/Process/Processor/DumpProcessor/MultithreadSteppedDumpProcessor.cs index 663971a..05bd92c 100644 --- a/source/LootDumpProcessor/Process/Processor/DumpProcessor/MultithreadSteppedDumpProcessor.cs +++ b/source/LootDumpProcessor/Process/Processor/DumpProcessor/MultithreadSteppedDumpProcessor.cs @@ -34,9 +34,7 @@ public class MultithreadSteppedDumpProcessor( private readonly ILooseLootProcessor _looseLootProcessor = looseLootProcessor ?? throw new ArgumentNullException(nameof(looseLootProcessor)); - private static IJsonSerializer _jsonSerializer = JsonSerializerFactory.GetInstance(); - - private static readonly List Runners = new(); + private static readonly IJsonSerializer _jsonSerializer = JsonSerializerFactory.GetInstance(); // if we need to, this variable can be moved to use the factory, but since the factory // needs a locking mechanism to prevent dictionary access exceptions, its better to keep @@ -53,123 +51,24 @@ public class MultithreadSteppedDumpProcessor( if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) LoggerFactory.GetInstance().Log("Heavy processing done!", LogLevel.Info); - var staticContainers = new Dictionary(); - var staticContainersLock = new object(); + var staticContainers = new ConcurrentDictionary(); // We need to count how many dumps we have for each map - var mapDumpCounter = new Dictionary(); - var mapDumpCounterLock = new object(); + var mapDumpCounter = new ConcurrentDictionary(); // dictionary of maps, that has a dictionary of template and hit count - var mapStaticContainersAggregated = new Dictionary>(); - var mapStaticContainersAggregatedLock = new object(); + var mapStaticContainersAggregated = new ConcurrentDictionary>(); - Runners.Clear(); // BSG changed the map data so static containers are now dynamic, so we need to scan all dumps for the static containers. if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) LoggerFactory.GetInstance().Log("Queuing dumps for static data processing", LogLevel.Info); - foreach (var dumped in dumps) + + var parallelOptions = new ParallelOptions { - Runners.Add( - Task.Factory.StartNew(() => - { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Debug)) - LoggerFactory.GetInstance().Log($"Processing static data for file {dumped.BasicInfo.FileName}", - LogLevel.Debug); + MaxDegreeOfParallelism = Environment.ProcessorCount + }; + Parallel.ForEachAsync(dumps, parallelOptions, + async (partialData, cancellationToken) => + await Process(partialData, staticContainers, mapStaticContainersAggregated, mapDumpCounter)); - var dataDump = _jsonSerializer.Deserialize(File.ReadAllText(dumped.BasicInfo.FileName)); - - if (dataDump == null) - { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error)) - LoggerFactory.GetInstance() - .Log($"Failed to deserialize data from file {dumped.BasicInfo.FileName}", - LogLevel.Error); - return; // Skip processing this dump - } - - var mapId = dataDump.Data.LocationLoot.Id.ToLower(); - - // Because we may use multiple version dumps for map data, merge the static loot between dumps - lock (staticContainersLock) - { - if (!staticContainers.TryGetValue(mapId, out var mapStaticLoot)) - { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) - LoggerFactory.GetInstance() - .Log($"Doing first time process for map {mapId} of real static data", - LogLevel.Info); - - staticContainers[mapId] = new MapStaticLoot - { - StaticWeapons = new List