0
0
mirror of https://github.com/sp-tarkov/loot-dump-processor.git synced 2025-02-13 03:30:45 -05:00

Refactored QueuePipeline to use async/await and parallel processing

This commit is contained in:
bluextx 2025-01-11 08:24:04 +03:00
parent e7dfc4349d
commit 04a371ce1a
2 changed files with 67 additions and 122 deletions

View File

@ -2,5 +2,5 @@ namespace LootDumpProcessor.Process;
public interface IPipeline public interface IPipeline
{ {
void DoProcess(); Task DoProcess();
} }

View File

@ -24,10 +24,8 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
private readonly IDumpProcessor _dumpProcessor = private readonly IDumpProcessor _dumpProcessor =
dumpProcessor ?? throw new ArgumentNullException(nameof(dumpProcessor)); dumpProcessor ?? throw new ArgumentNullException(nameof(dumpProcessor));
private static readonly BlockingCollection<string> _filesToRename = new(); private readonly List<string> _filesToRename = new();
private static readonly BlockingCollection<string> _filesToProcess = new(); private readonly BlockingCollection<string> _filesToProcess = new();
private static readonly List<Task> Runners = new();
private static readonly List<Task> Renamers = new();
private readonly List<string> _mapNames = LootDumpProcessorContext.GetConfig().MapsToProcess; private readonly List<string> _mapNames = LootDumpProcessorContext.GetConfig().MapsToProcess;
@ -45,16 +43,12 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
) ?? new Dictionary<string, IPreProcessReader>(); ) ?? new Dictionary<string, IPreProcessReader>();
} }
public void DoProcess() public async Task DoProcess()
{ {
// Single collector instance to collect results // Single collector instance to collect results
var collector = CollectorFactory.GetInstance(); var collector = CollectorFactory.GetInstance();
collector.Setup(); collector.Setup();
// We add 2 more threads to the total count to account for subprocesses and others
int threads = LootDumpProcessorContext.GetConfig().Threads;
ThreadPool.SetMaxThreads(threads + 2, threads + 2);
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
{ {
LoggerFactory.GetInstance().Log("Gathering files to begin processing", LogLevel.Info); LoggerFactory.GetInstance().Log("Gathering files to begin processing", LogLevel.Info);
@ -62,10 +56,10 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
try try
{ {
FixFilesFromDumps(threads); await FixFilesFromDumps();
foreach (var mapName in _mapNames) foreach (var mapName in _mapNames)
{ {
ProcessFilesFromDumpsPerMap(threads, collector, mapName); ProcessFilesFromDumpsPerMap(collector, mapName);
} }
} }
finally finally
@ -107,34 +101,42 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
while (queuedFilesToProcess.TryDequeue(out var file)) while (queuedFilesToProcess.TryDequeue(out var file))
{ {
var extension = Path.GetExtension(file)[1..].ToLower(); var extensionFull = Path.GetExtension(file);
// if there is a preprocessor, call it and preprocess the file, then add them to the queue if (extensionFull.Length > 1)
if (_preProcessReaders.TryGetValue(extension, out var preProcessor))
{ {
// if the preprocessor found something new to process or generated new files, add them to the queue var extension = extensionFull[1..].ToLower();
if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories)) // if there is a preprocessor, call it and preprocess the file, then add them to the queue
if (_preProcessReaders.TryGetValue(extension, out var preProcessor))
{ {
// all new directories need to be processed as well // if the preprocessor found something new to process or generated new files, add them to the queue
GetFileQueue(outputDirectories).ToList().ForEach(queuedFilesToProcess.Enqueue); if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories))
// all output files need to be queued as well
outputFiles.ForEach(queuedFilesToProcess.Enqueue);
}
}
else
{
// if there is no preprocessor for the file, means its ready to filter or accept
if (fileFilters.TryGetValue(extension, out var filter))
{
if (filter.Accept(file))
{ {
gatheredFiles.Add(file); // all new directories need to be processed as well
GetFileQueue(outputDirectories).ToList().ForEach(queuedFilesToProcess.Enqueue);
// all output files need to be queued as well
outputFiles.ForEach(queuedFilesToProcess.Enqueue);
} }
} }
else else
{ {
gatheredFiles.Add(file); // if there is no preprocessor for the file, means its ready to filter or accept
if (fileFilters.TryGetValue(extension, out var filter))
{
if (filter.Accept(file))
{
gatheredFiles.Add(file);
}
}
else
{
gatheredFiles.Add(file);
}
} }
} }
else
{
// Handle invalid extension
}
} }
return gatheredFiles; return gatheredFiles;
@ -149,8 +151,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
var acceptedFileExtension = LootDumpProcessorContext.GetConfig() var acceptedFileExtension = LootDumpProcessorContext.GetConfig()
.ReaderConfig .ReaderConfig
.AcceptedFileExtensions .AcceptedFileExtensions
.Select(ex => ex.ToLower()) .Select(ex => ex.ToLower());
.ToHashSet();
inputPath.ForEach(p => queuedPathsToProcess.Enqueue(p)); inputPath.ForEach(p => queuedPathsToProcess.Enqueue(p));
while (queuedPathsToProcess.TryDequeue(out var path)) while (queuedPathsToProcess.TryDequeue(out var path))
@ -158,7 +159,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
// Check the input file to be sure its going to have data on it. // Check the input file to be sure its going to have data on it.
if (!Directory.Exists(path)) if (!Directory.Exists(path))
{ {
throw new Exception($"Input directory \"{inputPath}\" could not be found"); throw new Exception($"Input directory \"{path}\" could not be found");
} }
// If we should process subfolder, queue them up as well // If we should process subfolder, queue them up as well
@ -195,7 +196,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
); );
} }
private void ProcessFilesFromDumpsPerMap(int threads, ICollector collector, string mapName) private void ProcessFilesFromDumpsPerMap(ICollector collector, string mapName)
{ {
// Gather all files, sort them by date descending and then add them into the processing queue // Gather all files, sort them by date descending and then add them into the processing queue
GatherFiles().FindAll(f => f.ToLower().Contains($"{mapName}--")).OrderByDescending(f => GatherFiles().FindAll(f => f.ToLower().Contains($"{mapName}--")).OrderByDescending(f =>
@ -210,55 +211,26 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
LoggerFactory.GetInstance().Log("Files sorted and ready to begin pre-processing", LogLevel.Info); LoggerFactory.GetInstance().Log("Files sorted and ready to begin pre-processing", LogLevel.Info);
} }
// We startup all the threads and collect them into a runners list Parallel.ForEach(_filesToProcess, file =>
for (int i = 0; i < threads; i++)
{ {
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) try
{ {
LoggerFactory.GetInstance().Log("Creating pre-processing threads", LogLevel.Info); var reader = IntakeReaderFactory.GetInstance();
if (reader.Read(file, out var basicInfo))
{
collector.Hold(_fileProcessor.Process(basicInfo));
}
} }
catch (Exception e)
Runners.Add(
Task.Factory.StartNew(
() =>
{
while (_filesToProcess.TryTake(out var file, TimeSpan.FromMilliseconds(5000)))
{
try
{
var reader = IntakeReaderFactory.GetInstance();
if (reader.Read(file, out var basicInfo))
{
collector.Hold(_fileProcessor.Process(basicInfo));
}
}
catch (Exception e)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
{
LoggerFactory.GetInstance().Log(
$"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
LogLevel.Error);
}
}
}
},
TaskCreationOptions.LongRunning)
);
}
// Wait until all runners are done processing
while (!Runners.All(r => r.IsCompleted))
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
{ {
LoggerFactory.GetInstance().Log( if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
$"One or more file processors are still processing files. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs}ms before checking again", {
LogLevel.Info); LoggerFactory.GetInstance().Log(
$"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
LogLevel.Error);
}
} }
});
Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs));
}
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
{ {
@ -279,7 +251,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
/// Adds map name to file if they dont have it already. /// Adds map name to file if they dont have it already.
/// </summary> /// </summary>
/// <param name="threads">Number of threads to use</param> /// <param name="threads">Number of threads to use</param>
private void FixFilesFromDumps(int threads) private async Task FixFilesFromDumps()
{ {
var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation; var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation;
@ -292,54 +264,27 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc
var jsonUtil = JsonSerializerFactory.GetInstance(JsonSerializerTypes.DotNet); var jsonUtil = JsonSerializerFactory.GetInstance(JsonSerializerTypes.DotNet);
for (var i = 0; i < threads; i++) await Parallel.ForEachAsync(_filesToRename, async (file, cancellationToken) =>
{ {
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (_mapNames.Any(file.Contains)) return;
{
LoggerFactory.GetInstance().Log("Creating file-processing threads", LogLevel.Info);
}
Renamers.Add(Task.Factory.StartNew(() => try
{ {
while (_filesToRename.TryTake(out var file, TimeSpan.FromMilliseconds(5000))) var data = await File.ReadAllTextAsync(file, cancellationToken);
var fileData = jsonUtil.Deserialize<RootData>(data);
var newPath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp");
File.Move(file, newPath);
}
catch (Exception e)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
{ {
if (_mapNames.Any(x => file.Contains(x))) LoggerFactory.GetInstance().Log(
{ $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
continue; LogLevel.Error);
}
try
{
var data = File.ReadAllText(file);
var fileData = jsonUtil.Deserialize<RootData>(data);
var newpath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp");
File.Move(file, newpath);
}
catch (Exception e)
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
{
LoggerFactory.GetInstance().Log(
$"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
LogLevel.Error);
}
}
} }
}, TaskCreationOptions.LongRunning));
}
while (!Renamers.All(r => r.IsCompleted))
{
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
{
LoggerFactory.GetInstance()
.Log(
$"one or more files are being processed. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs} ms",
LogLevel.Info);
} }
});
Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs));
}
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
{ {