0
0
mirror of https://github.com/sp-tarkov/loot-dump-processor.git synced 2025-02-13 09:50:44 -05:00

Refactored MultithreadSteppedDumpProcessor to improve concurrency and thread safety

This commit is contained in:
bluextx 2025-01-11 08:12:58 +03:00
parent 1a4003595a
commit e7dfc4349d
2 changed files with 199 additions and 218 deletions

View File

@ -34,9 +34,7 @@ public class MultithreadSteppedDumpProcessor(
private readonly ILooseLootProcessor _looseLootProcessor = private readonly ILooseLootProcessor _looseLootProcessor =
looseLootProcessor ?? throw new ArgumentNullException(nameof(looseLootProcessor)); looseLootProcessor ?? throw new ArgumentNullException(nameof(looseLootProcessor));
private static IJsonSerializer _jsonSerializer = JsonSerializerFactory.GetInstance(); private static readonly IJsonSerializer _jsonSerializer = JsonSerializerFactory.GetInstance();
private static readonly List<Task> Runners = new();
// if we need to, this variable can be moved to use the factory, but since the factory // if we need to, this variable can be moved to use the factory, but since the factory
// needs a locking mechanism to prevent dictionary access exceptions, its better to keep // needs a locking mechanism to prevent dictionary access exceptions, its better to keep
@ -53,123 +51,24 @@ public class MultithreadSteppedDumpProcessor(
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
LoggerFactory.GetInstance().Log("Heavy processing done!", LogLevel.Info); LoggerFactory.GetInstance().Log("Heavy processing done!", LogLevel.Info);
var staticContainers = new Dictionary<string, MapStaticLoot>(); var staticContainers = new ConcurrentDictionary<string, MapStaticLoot>();
var staticContainersLock = new object();
// We need to count how many dumps we have for each map // We need to count how many dumps we have for each map
var mapDumpCounter = new Dictionary<string, int>(); var mapDumpCounter = new ConcurrentDictionary<string, int>();
var mapDumpCounterLock = new object();
// dictionary of maps, that has a dictionary of template and hit count // dictionary of maps, that has a dictionary of template and hit count
var mapStaticContainersAggregated = new Dictionary<string, Dictionary<Template, int>>(); var mapStaticContainersAggregated = new ConcurrentDictionary<string, ConcurrentDictionary<Template, int>>();
var mapStaticContainersAggregatedLock = new object();
Runners.Clear();
// BSG changed the map data so static containers are now dynamic, so we need to scan all dumps for the static containers. // BSG changed the map data so static containers are now dynamic, so we need to scan all dumps for the static containers.
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
LoggerFactory.GetInstance().Log("Queuing dumps for static data processing", LogLevel.Info); LoggerFactory.GetInstance().Log("Queuing dumps for static data processing", LogLevel.Info);
foreach (var dumped in dumps)
var parallelOptions = new ParallelOptions
{ {
Runners.Add( MaxDegreeOfParallelism = Environment.ProcessorCount
Task.Factory.StartNew(() => };
{ Parallel.ForEachAsync(dumps, parallelOptions,
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Debug)) async (partialData, cancellationToken) =>
LoggerFactory.GetInstance().Log($"Processing static data for file {dumped.BasicInfo.FileName}", await Process(partialData, staticContainers, mapStaticContainersAggregated, mapDumpCounter));
LogLevel.Debug);
var dataDump = _jsonSerializer.Deserialize<RootData>(File.ReadAllText(dumped.BasicInfo.FileName));
if (dataDump == null)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
LoggerFactory.GetInstance()
.Log($"Failed to deserialize data from file {dumped.BasicInfo.FileName}",
LogLevel.Error);
return; // Skip processing this dump
}
var mapId = dataDump.Data.LocationLoot.Id.ToLower();
// Because we may use multiple version dumps for map data, merge the static loot between dumps
lock (staticContainersLock)
{
if (!staticContainers.TryGetValue(mapId, out var mapStaticLoot))
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
LoggerFactory.GetInstance()
.Log($"Doing first time process for map {mapId} of real static data",
LogLevel.Info);
staticContainers[mapId] = new MapStaticLoot
{
StaticWeapons = new List<Template>(),
StaticForced = new List<StaticForced>()
};
}
else
{
var mapStaticContainers =
_staticContainersProcessor.CreateStaticWeaponsAndForcedContainers(dataDump);
var newStaticWeapons = mapStaticContainers.StaticWeapons.Where(x =>
!mapStaticLoot.StaticWeapons.Exists(y => y.Id == x.Id));
var newStaticForced = mapStaticContainers.StaticForced.Where(x =>
!mapStaticLoot.StaticForced.Exists(y => y.ContainerId == x.ContainerId));
mapStaticLoot.StaticWeapons.AddRange(newStaticWeapons);
mapStaticLoot.StaticForced.AddRange(newStaticForced);
}
}
// Takes care of finding how many "dynamic static containers" we have on the map
Dictionary<Template, int> mapAggregatedDataDict;
lock (mapStaticContainersAggregatedLock)
{
// Init dict if map key doesnt exist
if (!mapStaticContainersAggregated.TryGetValue(mapId, out mapAggregatedDataDict))
{
mapAggregatedDataDict = new Dictionary<Template, int>();
mapStaticContainersAggregated.Add(mapId, mapAggregatedDataDict);
}
}
// Only process the dump file if the date is higher (after) the configuration date
if (!DumpWasMadeAfterConfigThresholdDate(dumped))
{
return;
}
// Keep track of how many dumps we have for each map
lock (mapDumpCounterLock)
{
IncrementMapCounterDictionaryValue(mapDumpCounter, mapId);
}
var containerIgnoreListExists = LootDumpProcessorContext.GetConfig().ContainerIgnoreList
.TryGetValue(mapId, out string[]? ignoreListForMap);
foreach (var dynamicStaticContainer in _staticContainersProcessor.CreateDynamicStaticContainers(
dataDump))
{
lock (mapStaticContainersAggregatedLock)
{
if (containerIgnoreListExists && ignoreListForMap.Contains(dynamicStaticContainer.Id))
{
// Skip adding containers to aggregated data if container id is in ignore list
continue;
}
// Increment times container seen in dump by 1
if (!mapAggregatedDataDict.TryAdd(dynamicStaticContainer, 1))
{
mapAggregatedDataDict[dynamicStaticContainer] += 1;
}
}
}
GCHandler.Collect();
})
);
}
Task.WaitAll(Runners.ToArray());
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
LoggerFactory.GetInstance().Log("All static data processing threads finished", LogLevel.Info); LoggerFactory.GetInstance().Log("All static data processing threads finished", LogLevel.Info);
// Aggregate and calculate the probability of a static container // Aggregate and calculate the probability of a static container
@ -192,7 +91,7 @@ public class MultithreadSteppedDumpProcessor(
LoggerFactory.GetInstance().Log("Processing ammo distribution", LogLevel.Info); LoggerFactory.GetInstance().Log("Processing ammo distribution", LogLevel.Info);
var staticAmmo = new ConcurrentDictionary<string, IReadOnlyDictionary<string, List<AmmoDistribution>>>(); var staticAmmo = new ConcurrentDictionary<string, IReadOnlyDictionary<string, List<AmmoDistribution>>>();
Parallel.ForEach(dumpProcessData.ContainerCounts.Keys, mapId => Parallel.ForEach(dumpProcessData.ContainerCounts.Keys, parallelOptions: parallelOptions, mapId =>
{ {
var preProcessedStaticLoots = dumpProcessData.ContainerCounts[mapId]; var preProcessedStaticLoots = dumpProcessData.ContainerCounts[mapId];
var ammoDistribution = _ammoProcessor.CreateAmmoDistribution(mapId, preProcessedStaticLoots); var ammoDistribution = _ammoProcessor.CreateAmmoDistribution(mapId, preProcessedStaticLoots);
@ -208,7 +107,7 @@ public class MultithreadSteppedDumpProcessor(
LoggerFactory.GetInstance().Log("Processing static loot distribution", LogLevel.Info); LoggerFactory.GetInstance().Log("Processing static loot distribution", LogLevel.Info);
var staticLoot = new ConcurrentDictionary<string, IReadOnlyDictionary<string, StaticItemDistribution>>(); var staticLoot = new ConcurrentDictionary<string, IReadOnlyDictionary<string, StaticItemDistribution>>();
Parallel.ForEach(dumpProcessData.ContainerCounts.Keys, mapId => Parallel.ForEach(dumpProcessData.ContainerCounts.Keys, parallelOptions: parallelOptions, mapId =>
{ {
var preProcessedStaticLoots = dumpProcessData.ContainerCounts[mapId]; var preProcessedStaticLoots = dumpProcessData.ContainerCounts[mapId];
var staticLootDistribution = var staticLootDistribution =
@ -223,10 +122,10 @@ public class MultithreadSteppedDumpProcessor(
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
LoggerFactory.GetInstance().Log("Processing loose loot distribution", LogLevel.Info); LoggerFactory.GetInstance().Log("Processing loose loot distribution", LogLevel.Info);
// Loose loot distribution // Loose loot distribution
var looseLoot = new ConcurrentDictionary<string, LooseLootRoot>(); var looseLoot = new ConcurrentDictionary<string, LooseLootRoot>();
Parallel.ForEach(dumpProcessData.MapCounts.Keys, mapId => Parallel.ForEach(dumpProcessData.MapCounts.Keys, parallelOptions: parallelOptions, mapId =>
{ {
var mapCount = dumpProcessData.MapCounts[mapId]; var mapCount = dumpProcessData.MapCounts[mapId];
var looseLootCount = dumpProcessData.LooseLootCounts[mapId]; var looseLootCount = dumpProcessData.LooseLootCounts[mapId];
@ -246,6 +145,102 @@ public class MultithreadSteppedDumpProcessor(
return output; return output;
} }
private async Task Process(PartialData partialData,
ConcurrentDictionary<string, MapStaticLoot> staticContainers,
ConcurrentDictionary<string, ConcurrentDictionary<Template, int>> mapStaticContainersAggregated,
ConcurrentDictionary<string, int> mapDumpCounter)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Debug))
LoggerFactory.GetInstance().Log($"Processing static data for file {partialData.BasicInfo.FileName}",
LogLevel.Debug);
var fileContent = await File.ReadAllTextAsync(partialData.BasicInfo.FileName);
var dataDump = _jsonSerializer.Deserialize<RootData>(fileContent);
if (dataDump == null)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
LoggerFactory.GetInstance()
.Log($"Failed to deserialize data from file {partialData.BasicInfo.FileName}",
LogLevel.Error);
return;
}
var mapId = dataDump.Data.LocationLoot.Id.ToLower();
// Because we may use multiple version dumps for map data, merge the static loot between dumps
if (!staticContainers.TryGetValue(mapId, out var mapStaticLoot))
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
LoggerFactory.GetInstance()
.Log($"Doing first time process for map {mapId} of real static data",
LogLevel.Info);
staticContainers[mapId] = new MapStaticLoot
{
StaticWeapons = new List<Template>(),
StaticForced = new List<StaticForced>()
};
}
else
{
var mapStaticContainers =
_staticContainersProcessor.CreateStaticWeaponsAndForcedContainers(dataDump);
var newStaticWeapons = mapStaticContainers.StaticWeapons.Where(x =>
!mapStaticLoot.StaticWeapons.Exists(y => y.Id == x.Id));
var newStaticForced = mapStaticContainers.StaticForced.Where(x =>
!mapStaticLoot.StaticForced.Exists(y => y.ContainerId == x.ContainerId));
mapStaticLoot.StaticWeapons.AddRange(newStaticWeapons);
mapStaticLoot.StaticForced.AddRange(newStaticForced);
}
// Takes care of finding how many "dynamic static containers" we have on the map
ConcurrentDictionary<Template, int> mapAggregatedDataDict;
// Init dict if map key doesnt exist
if (!mapStaticContainersAggregated.TryGetValue(mapId, out mapAggregatedDataDict))
{
mapAggregatedDataDict = new ConcurrentDictionary<Template, int>();
mapStaticContainersAggregated.TryAdd(mapId, mapAggregatedDataDict);
}
// Only process the dump file if the date is higher (after) the configuration date
if (!DumpWasMadeAfterConfigThresholdDate(partialData))
{
return;
}
// Keep track of how many dumps we have for each map
IncrementMapCounterDictionaryValue(mapDumpCounter, mapId);
var containerIgnoreListExists = LootDumpProcessorContext.GetConfig().ContainerIgnoreList
.TryGetValue(mapId, out var ignoreListForMap);
foreach (var dynamicStaticContainer in _staticContainersProcessor.CreateDynamicStaticContainers(
dataDump))
{
if (containerIgnoreListExists && ignoreListForMap.Contains(dynamicStaticContainer.Id))
{
// Skip adding containers to aggregated data if container id is in ignore list
continue;
}
// Increment times container seen in dump by 1
if (!mapAggregatedDataDict.TryAdd(dynamicStaticContainer, 1))
{
mapAggregatedDataDict[dynamicStaticContainer] += 1;
}
}
GCHandler.Collect();
}
private static bool DumpWasMadeAfterConfigThresholdDate(PartialData dataDump) private static bool DumpWasMadeAfterConfigThresholdDate(PartialData dataDump)
{ {
return FileDateParser.TryParseFileDate(dataDump.BasicInfo.FileName, out var fileDate) && return FileDateParser.TryParseFileDate(dataDump.BasicInfo.FileName, out var fileDate) &&
@ -254,7 +249,7 @@ public class MultithreadSteppedDumpProcessor(
.SpawnContainerChanceIncludeAfterDate; .SpawnContainerChanceIncludeAfterDate;
} }
private static void IncrementMapCounterDictionaryValue(Dictionary<string, int> mapDumpCounter, string mapName) private static void IncrementMapCounterDictionaryValue(IDictionary<string, int> mapDumpCounter, string mapName)
{ {
if (!mapDumpCounter.TryAdd(mapName, 1)) if (!mapDumpCounter.TryAdd(mapName, 1))
{ {
@ -264,7 +259,7 @@ public class MultithreadSteppedDumpProcessor(
} }
private static double GetStaticContainerProbability(string mapName, KeyValuePair<Template, int> td, private static double GetStaticContainerProbability(string mapName, KeyValuePair<Template, int> td,
Dictionary<string, int> mapDumpCounter) IReadOnlyDictionary<string, int> mapDumpCounter)
{ {
return Math.Round((double)((decimal)td.Value / (decimal)mapDumpCounter[mapName]), 2); return Math.Round((double)((decimal)td.Value / (decimal)mapDumpCounter[mapName]), 2);
} }
@ -321,105 +316,18 @@ public class MultithreadSteppedDumpProcessor(
tuple = null; tuple = null;
GCHandler.Collect(); GCHandler.Collect();
// The data storage factory has a lock, we dont want the locks to occur when multithreading var parallelOptions = new ParallelOptions
for (int i = 0; i < LootDumpProcessorContext.GetConfig().Threads; i++)
{ {
Runners.Add( MaxDegreeOfParallelism = Environment.ProcessorCount
Task.Factory.StartNew( };
() => Parallel.ForEach(_partialDataToProcess, parallelOptions,
{ partialData =>
while (_partialDataToProcess.TryTake(out var partialData, {
TimeSpan.FromMilliseconds(5000))) ProcessPartialData(partialData, lockObjectContainerCounts, dumpProcessData, mapName,
{ lockObjectDictionaryCounts, dictionaryCounts, lockObjectDictionaryItemProperties,
try dictionaryItemProperties, actualDictionaryItemProperties, lockObjectCounts,
{ looseLootCounts);
var dumpData = _dataStorage.GetItem<ParsedDump>(partialData.ParsedDumpKey); });
// Static containers
lock (lockObjectContainerCounts)
{
if (!dumpProcessData.ContainerCounts.ContainsKey(mapName))
{
dumpProcessData.ContainerCounts.Add(mapName,
dumpData.Containers.ToList());
}
else
{
dumpProcessData.ContainerCounts[mapName].AddRange(dumpData.Containers);
}
}
// Loose loot into ids on files
var loadedDictionary =
_dataStorage
.GetItem<SubdivisionedKeyableDictionary<string, List<Template>>>(
dumpData.LooseLoot.ItemProperties
);
foreach (var (uniqueKey, containerTemplate) in loadedDictionary)
{
var count = dumpData.LooseLoot.Counts[uniqueKey];
lock (lockObjectDictionaryCounts)
{
if (dictionaryCounts.ContainsKey(uniqueKey))
dictionaryCounts[uniqueKey] += count;
else
dictionaryCounts[uniqueKey] = count;
}
/*
var itemList = dumpData.LooseLoot.Items[k];
if (!dictionaryItemCounts.TryGetValue(k, out var itemCounts))
{
itemCounts = new List<string>();
dictionaryItemCounts.Add(k, itemCounts);
}
itemCounts.AddRange(itemList);
*/
lock (lockObjectDictionaryItemProperties)
{
if (!dictionaryItemProperties.TryGetValue(uniqueKey, out var values))
{
values = new FlatKeyableList<Template>();
dictionaryItemProperties.Add(uniqueKey, values);
actualDictionaryItemProperties.Add(uniqueKey, values.GetKey());
}
values.AddRange(containerTemplate);
}
}
lock (lockObjectCounts)
{
looseLootCounts.MapSpawnpointCount.Add(
dumpData.LooseLoot.MapSpawnpointCount);
}
}
catch (Exception e)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
LoggerFactory.GetInstance().Log(
$"ERROR OCCURRED:{e.Message}\n{e.StackTrace}",
LogLevel.Error
);
}
}
},
TaskCreationOptions.LongRunning)
);
}
// Wait until all runners are done processing
while (!Runners.All(r => r.IsCompleted))
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
LoggerFactory.GetInstance().Log(
$"One or more file processors are still processing files. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs}ms before checking again",
LogLevel.Info
);
Thread.Sleep(
TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs));
}
foreach (var (_, value) in dictionaryItemProperties) foreach (var (_, value) in dictionaryItemProperties)
{ {
@ -443,4 +351,76 @@ public class MultithreadSteppedDumpProcessor(
}); });
return dumpProcessData; return dumpProcessData;
} }
private static void ProcessPartialData(PartialData partialDataToProcess,
object lockObjectContainerCounts,
DumpProcessData dumpProcessData, string mapName, object lockObjectDictionaryCounts,
FlatKeyableDictionary<string, int>? dictionaryCounts, object lockObjectDictionaryItemProperties,
FlatKeyableDictionary<string, FlatKeyableList<Template>> dictionaryItemProperties,
FlatKeyableDictionary<string, IKey>? actualDictionaryItemProperties,
object lockObjectCounts, LooseLootCounts? looseLootCounts)
{
try
{
var dumpData = _dataStorage.GetItem<ParsedDump>(partialDataToProcess.ParsedDumpKey);
// Static containers
lock (lockObjectContainerCounts)
{
if (!dumpProcessData.ContainerCounts.ContainsKey(mapName))
{
dumpProcessData.ContainerCounts.Add(mapName,
dumpData.Containers.ToList());
}
else
{
dumpProcessData.ContainerCounts[mapName].AddRange(dumpData.Containers);
}
}
// Loose loot into ids on files
var loadedDictionary =
_dataStorage
.GetItem<SubdivisionedKeyableDictionary<string, List<Template>>>(
dumpData.LooseLoot.ItemProperties
);
foreach (var (uniqueKey, containerTemplate) in loadedDictionary)
{
var count = dumpData.LooseLoot.Counts[uniqueKey];
lock (lockObjectDictionaryCounts)
{
if (dictionaryCounts.ContainsKey(uniqueKey))
dictionaryCounts[uniqueKey] += count;
else
dictionaryCounts[uniqueKey] = count;
}
lock (lockObjectDictionaryItemProperties)
{
if (!dictionaryItemProperties.TryGetValue(uniqueKey, out var values))
{
values = new FlatKeyableList<Template>();
dictionaryItemProperties.Add(uniqueKey, values);
actualDictionaryItemProperties.Add(uniqueKey, values.GetKey());
}
values.AddRange(containerTemplate);
}
}
lock (lockObjectCounts)
{
looseLootCounts.MapSpawnpointCount.Add(
dumpData.LooseLoot.MapSpawnpointCount);
}
}
catch (Exception e)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
LoggerFactory.GetInstance().Log(
$"ERROR OCCURRED:{e.Message}\n{e.StackTrace}",
LogLevel.Error
);
}
}
} }

View File

@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using LootDumpProcessor.Model.Output; using LootDumpProcessor.Model.Output;
using LootDumpProcessor.Model.Output.LooseLoot; using LootDumpProcessor.Model.Output.LooseLoot;
using LootDumpProcessor.Model.Output.StaticContainer; using LootDumpProcessor.Model.Output.StaticContainer;
@ -44,7 +45,7 @@ public class FileWriter : IWriter
switch (type) switch (type)
{ {
case OutputFileType.LooseLoot: case OutputFileType.LooseLoot:
var looseLootData = (Dictionary<string, LooseLootRoot>)data; var looseLootData = (IReadOnlyDictionary<string, LooseLootRoot>)data;
foreach (var (key, value) in looseLootData) foreach (var (key, value) in looseLootData)
{ {
if (!Directory.Exists($@"{_outputPath}\locations\{key}")) if (!Directory.Exists($@"{_outputPath}\locations\{key}"))
@ -55,7 +56,7 @@ public class FileWriter : IWriter
break; break;
case OutputFileType.StaticContainer: case OutputFileType.StaticContainer:
var staticContainer = (Dictionary<string, MapStaticLoot>)data; var staticContainer = (IReadOnlyDictionary<string, MapStaticLoot>)data;
foreach (var (key, value) in staticContainer) foreach (var (key, value) in staticContainer)
{ {
if (!Directory.Exists($@"{_outputPath}\locations\{key}")) if (!Directory.Exists($@"{_outputPath}\locations\{key}"))