using System.Collections.Concurrent; using LootDumpProcessor.Logger; using LootDumpProcessor.Process.Collector; using LootDumpProcessor.Process.Processor; using LootDumpProcessor.Process.Processor.DumpProcessor; using LootDumpProcessor.Process.Reader; using LootDumpProcessor.Process.Reader.Filters; using LootDumpProcessor.Process.Reader.PreProcess; using LootDumpProcessor.Process.Writer; namespace LootDumpProcessor.Process; public class QueuePipeline : IPipeline { private static readonly BlockingCollection _filesToProcess = new(); private static readonly List Runners = new(); private static readonly Dictionary _preProcessReaders; static QueuePipeline() { _preProcessReaders = LootDumpProcessorContext.GetConfig() .ReaderConfig .PreProcessorConfig ?.PreProcessors ?.ToDictionary( t => PreProcessReaderFactory.GetInstance(t).GetHandleExtension().ToLower(), PreProcessReaderFactory.GetInstance ) ?? new Dictionary(); } public void DoProcess() { // Single collector instance to collect results var collector = CollectorFactory.GetInstance(); collector.Setup(); // We add 2 more threads to the total count to account for subprocesses and others int threads = LootDumpProcessorContext.GetConfig().Threads; ThreadPool.SetMaxThreads(threads + 2, threads + 2); try { // Gather all files, then add them into the processing queue GatherFiles().ForEach(f => _filesToProcess.Add(f)); // We startup all the threads and collect them into a runners list for (int i = 0; i < threads; i++) { Runners.Add( Task.Factory.StartNew( () => { while (_filesToProcess.TryTake(out var file, TimeSpan.FromMilliseconds(5000))) { try { var reader = IntakeReaderFactory.GetInstance(); var processor = FileProcessorFactory.GetInstance(); if (reader.Read(file, out var basicInfo)) collector.Hold(processor.Process(basicInfo)); } catch (Exception e) { LoggerFactory.GetInstance().Log( $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}", LogLevel.Error); } } }, TaskCreationOptions.LongRunning) ); } // Wait until all runners are done processing while (!Runners.All(r => r.IsCompleted)) { LoggerFactory.GetInstance().Log( $"One or more file processors are still processing files. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs}ms before checking again", LogLevel.Info); Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs)); } // Single writer instance to collect results var writer = WriterFactory.GetInstance(); // Single collector instance to collect results var dumpProcessor = DumpProcessorFactory.GetInstance(); writer.WriteAll(dumpProcessor.ProcessDumps(collector.Retrieve())); } finally { // use dispose on the preprocessreaders to eliminate any temporary files generated foreach (var (_, value) in _preProcessReaders) { value.Dispose(); } } } private List GatherFiles() { // Read locations var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation; if (inputPath == null || inputPath.Count == 0) throw new Exception("Reader dumpFilesLocations must be set to a valid value"); // We gather up all files into a queue var queuedFilesToProcess = GetFileQueue(inputPath); // Then we preprocess the files in the queue and get them ready for processing return PreProcessQueuedFiles(queuedFilesToProcess); } private List PreProcessQueuedFiles(Queue queuedFilesToProcess) { var gatheredFiles = new List(); if (queuedFilesToProcess.Count == 0) throw new Exception("No files matched accepted extension types in configs"); var fileFilters = GetFileFilters() ?? new Dictionary(); while (queuedFilesToProcess.TryDequeue(out var file)) { var extension = Path.GetExtension(file)[1..].ToLower(); // if there is a preprocessor, call it and preprocess the file, then add them to the queue if (_preProcessReaders.TryGetValue(extension, out var preProcessor)) { // if the preprocessor found something new to process or generated new files, add them to the queue if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories)) { // all new directories need to be processed as well GetFileQueue(outputDirectories).ToList() .ForEach(queuedFilesToProcess.Enqueue); // all output files need to be queued as well outputFiles.ForEach(queuedFilesToProcess.Enqueue); } } else { // if there is no preprocessor for the file, means its ready to filter or accept if (fileFilters.TryGetValue(extension, out var filter)) { if (filter.Accept(file)) gatheredFiles.Add(file); } else { gatheredFiles.Add(file); } } } return gatheredFiles; } private Queue GetFileQueue(List inputPath) { var queuedPathsToProcess = new Queue(); var queuedFilesToProcess = new Queue(); // Accepted file extensions on raw files var acceptedFileExtension = LootDumpProcessorContext.GetConfig() .ReaderConfig .AcceptedFileExtensions .Select(ex => ex.ToLower()) .ToHashSet(); inputPath.ForEach(p => queuedPathsToProcess.Enqueue(p)); while (queuedPathsToProcess.TryDequeue(out var path)) { // Check the input file to be sure its going to have data on it. if (!Directory.Exists(path)) throw new Exception($"Input directory \"{inputPath}\" could not be found"); // If we should process subfolder, queue them up as well if (LootDumpProcessorContext.GetConfig().ReaderConfig.ProcessSubFolders) foreach (var directory in Directory.GetDirectories(path)) queuedPathsToProcess.Enqueue(directory); var files = Directory.GetFiles(path); foreach (var file in files) if (acceptedFileExtension.Contains(Path.GetExtension(file)[1..].ToLower())) queuedFilesToProcess.Enqueue(file); } return queuedFilesToProcess; } private Dictionary? GetFileFilters() { return LootDumpProcessorContext.GetConfig() .ReaderConfig .FileFilters ?.ToDictionary( t => FileFilterFactory.GetInstance(t).GetExtension(), FileFilterFactory.GetInstance ); } }