From 04a371ce1a96e4a58f270cf2bb00218b58ebbc75 Mon Sep 17 00:00:00 2001 From: bluextx Date: Sat, 11 Jan 2025 08:24:04 +0300 Subject: [PATCH] Refactored QueuePipeline to use async/await and parallel processing --- source/LootDumpProcessor/Process/IPipeline.cs | 2 +- .../Process/QueuePipeline.cs | 187 +++++++----------- 2 files changed, 67 insertions(+), 122 deletions(-) diff --git a/source/LootDumpProcessor/Process/IPipeline.cs b/source/LootDumpProcessor/Process/IPipeline.cs index 1848ac3..70b796c 100644 --- a/source/LootDumpProcessor/Process/IPipeline.cs +++ b/source/LootDumpProcessor/Process/IPipeline.cs @@ -2,5 +2,5 @@ namespace LootDumpProcessor.Process; public interface IPipeline { - void DoProcess(); + Task DoProcess(); } \ No newline at end of file diff --git a/source/LootDumpProcessor/Process/QueuePipeline.cs b/source/LootDumpProcessor/Process/QueuePipeline.cs index d7f9d67..af3fbcb 100644 --- a/source/LootDumpProcessor/Process/QueuePipeline.cs +++ b/source/LootDumpProcessor/Process/QueuePipeline.cs @@ -24,10 +24,8 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc private readonly IDumpProcessor _dumpProcessor = dumpProcessor ?? throw new ArgumentNullException(nameof(dumpProcessor)); - private static readonly BlockingCollection _filesToRename = new(); - private static readonly BlockingCollection _filesToProcess = new(); - private static readonly List Runners = new(); - private static readonly List Renamers = new(); + private readonly List _filesToRename = new(); + private readonly BlockingCollection _filesToProcess = new(); private readonly List _mapNames = LootDumpProcessorContext.GetConfig().MapsToProcess; @@ -45,16 +43,12 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc ) ?? new Dictionary(); } - public void DoProcess() + public async Task DoProcess() { // Single collector instance to collect results var collector = CollectorFactory.GetInstance(); collector.Setup(); - // We add 2 more threads to the total count to account for subprocesses and others - int threads = LootDumpProcessorContext.GetConfig().Threads; - ThreadPool.SetMaxThreads(threads + 2, threads + 2); - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) { LoggerFactory.GetInstance().Log("Gathering files to begin processing", LogLevel.Info); @@ -62,10 +56,10 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc try { - FixFilesFromDumps(threads); + await FixFilesFromDumps(); foreach (var mapName in _mapNames) { - ProcessFilesFromDumpsPerMap(threads, collector, mapName); + ProcessFilesFromDumpsPerMap(collector, mapName); } } finally @@ -107,34 +101,42 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc while (queuedFilesToProcess.TryDequeue(out var file)) { - var extension = Path.GetExtension(file)[1..].ToLower(); - // if there is a preprocessor, call it and preprocess the file, then add them to the queue - if (_preProcessReaders.TryGetValue(extension, out var preProcessor)) + var extensionFull = Path.GetExtension(file); + if (extensionFull.Length > 1) { - // if the preprocessor found something new to process or generated new files, add them to the queue - if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories)) + var extension = extensionFull[1..].ToLower(); + // if there is a preprocessor, call it and preprocess the file, then add them to the queue + if (_preProcessReaders.TryGetValue(extension, out var preProcessor)) { - // all new directories need to be processed as well - GetFileQueue(outputDirectories).ToList().ForEach(queuedFilesToProcess.Enqueue); - // all output files need to be queued as well - outputFiles.ForEach(queuedFilesToProcess.Enqueue); - } - } - else - { - // if there is no preprocessor for the file, means its ready to filter or accept - if (fileFilters.TryGetValue(extension, out var filter)) - { - if (filter.Accept(file)) + // if the preprocessor found something new to process or generated new files, add them to the queue + if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories)) { - gatheredFiles.Add(file); + // all new directories need to be processed as well + GetFileQueue(outputDirectories).ToList().ForEach(queuedFilesToProcess.Enqueue); + // all output files need to be queued as well + outputFiles.ForEach(queuedFilesToProcess.Enqueue); } } else { - gatheredFiles.Add(file); + // if there is no preprocessor for the file, means its ready to filter or accept + if (fileFilters.TryGetValue(extension, out var filter)) + { + if (filter.Accept(file)) + { + gatheredFiles.Add(file); + } + } + else + { + gatheredFiles.Add(file); + } } } + else + { + // Handle invalid extension + } } return gatheredFiles; @@ -149,8 +151,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc var acceptedFileExtension = LootDumpProcessorContext.GetConfig() .ReaderConfig .AcceptedFileExtensions - .Select(ex => ex.ToLower()) - .ToHashSet(); + .Select(ex => ex.ToLower()); inputPath.ForEach(p => queuedPathsToProcess.Enqueue(p)); while (queuedPathsToProcess.TryDequeue(out var path)) @@ -158,7 +159,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc // Check the input file to be sure its going to have data on it. if (!Directory.Exists(path)) { - throw new Exception($"Input directory \"{inputPath}\" could not be found"); + throw new Exception($"Input directory \"{path}\" could not be found"); } // If we should process subfolder, queue them up as well @@ -195,7 +196,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc ); } - private void ProcessFilesFromDumpsPerMap(int threads, ICollector collector, string mapName) + private void ProcessFilesFromDumpsPerMap(ICollector collector, string mapName) { // Gather all files, sort them by date descending and then add them into the processing queue GatherFiles().FindAll(f => f.ToLower().Contains($"{mapName}--")).OrderByDescending(f => @@ -210,55 +211,26 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc LoggerFactory.GetInstance().Log("Files sorted and ready to begin pre-processing", LogLevel.Info); } - // We startup all the threads and collect them into a runners list - for (int i = 0; i < threads; i++) + Parallel.ForEach(_filesToProcess, file => { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) + try { - LoggerFactory.GetInstance().Log("Creating pre-processing threads", LogLevel.Info); + var reader = IntakeReaderFactory.GetInstance(); + if (reader.Read(file, out var basicInfo)) + { + collector.Hold(_fileProcessor.Process(basicInfo)); + } } - - Runners.Add( - Task.Factory.StartNew( - () => - { - while (_filesToProcess.TryTake(out var file, TimeSpan.FromMilliseconds(5000))) - { - try - { - var reader = IntakeReaderFactory.GetInstance(); - if (reader.Read(file, out var basicInfo)) - { - collector.Hold(_fileProcessor.Process(basicInfo)); - } - } - catch (Exception e) - { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error)) - { - LoggerFactory.GetInstance().Log( - $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}", - LogLevel.Error); - } - } - } - }, - TaskCreationOptions.LongRunning) - ); - } - - // Wait until all runners are done processing - while (!Runners.All(r => r.IsCompleted)) - { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) + catch (Exception e) { - LoggerFactory.GetInstance().Log( - $"One or more file processors are still processing files. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs}ms before checking again", - LogLevel.Info); + if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error)) + { + LoggerFactory.GetInstance().Log( + $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}", + LogLevel.Error); + } } - - Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs)); - } + }); if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) { @@ -279,7 +251,7 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc /// Adds map name to file if they dont have it already. /// /// Number of threads to use - private void FixFilesFromDumps(int threads) + private async Task FixFilesFromDumps() { var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation; @@ -292,54 +264,27 @@ public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProc var jsonUtil = JsonSerializerFactory.GetInstance(JsonSerializerTypes.DotNet); - for (var i = 0; i < threads; i++) + await Parallel.ForEachAsync(_filesToRename, async (file, cancellationToken) => { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) - { - LoggerFactory.GetInstance().Log("Creating file-processing threads", LogLevel.Info); - } + if (_mapNames.Any(file.Contains)) return; - Renamers.Add(Task.Factory.StartNew(() => + try { - while (_filesToRename.TryTake(out var file, TimeSpan.FromMilliseconds(5000))) + var data = await File.ReadAllTextAsync(file, cancellationToken); + var fileData = jsonUtil.Deserialize(data); + var newPath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp"); + File.Move(file, newPath); + } + catch (Exception e) + { + if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error)) { - if (_mapNames.Any(x => file.Contains(x))) - { - continue; - } - - try - { - var data = File.ReadAllText(file); - var fileData = jsonUtil.Deserialize(data); - var newpath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp"); - File.Move(file, newpath); - } - catch (Exception e) - { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error)) - { - LoggerFactory.GetInstance().Log( - $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}", - LogLevel.Error); - } - } + LoggerFactory.GetInstance().Log( + $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}", + LogLevel.Error); } - }, TaskCreationOptions.LongRunning)); - } - - while (!Renamers.All(r => r.IsCompleted)) - { - if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) - { - LoggerFactory.GetInstance() - .Log( - $"one or more files are being processed. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs} ms", - LogLevel.Info); } - - Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs)); - } + }); if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) {