using System.Collections.Concurrent; using LootDumpProcessor.Logger; using LootDumpProcessor.Model.Input; using LootDumpProcessor.Process.Collector; using LootDumpProcessor.Process.Processor.DumpProcessor; using LootDumpProcessor.Process.Processor.FileProcessor; using LootDumpProcessor.Process.Reader.Filters; using LootDumpProcessor.Process.Reader.Intake; using LootDumpProcessor.Process.Reader.PreProcess; using LootDumpProcessor.Process.Writer; using LootDumpProcessor.Serializers.Json; using LootDumpProcessor.Storage; using LootDumpProcessor.Utils; namespace LootDumpProcessor.Process; public class QueuePipeline(IFileProcessor fileProcessor, IDumpProcessor dumpProcessor) : IPipeline { private readonly IFileProcessor _fileProcessor = fileProcessor ?? throw new ArgumentNullException(nameof(fileProcessor)); private readonly IDumpProcessor _dumpProcessor = dumpProcessor ?? throw new ArgumentNullException(nameof(dumpProcessor)); private readonly List _filesToRename = new(); private readonly BlockingCollection _filesToProcess = new(); private readonly List _mapNames = LootDumpProcessorContext.GetConfig().MapsToProcess; private static readonly Dictionary _preProcessReaders; static QueuePipeline() { _preProcessReaders = LootDumpProcessorContext.GetConfig() .ReaderConfig .PreProcessorConfig ?.PreProcessors ?.ToDictionary( t => PreProcessReaderFactory.GetInstance(t).GetHandleExtension().ToLower(), PreProcessReaderFactory.GetInstance ) ?? new Dictionary(); } public async Task DoProcess() { // Single collector instance to collect results var collector = CollectorFactory.GetInstance(); collector.Setup(); if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) { LoggerFactory.GetInstance().Log("Gathering files to begin processing", LogLevel.Info); } try { await FixFilesFromDumps(); foreach (var mapName in _mapNames) { ProcessFilesFromDumpsPerMap(collector, mapName); } } finally { // use dispose on the preprocessreaders to eliminate any temporary files generated foreach (var (_, value) in _preProcessReaders) { value.Dispose(); } } } private List GatherFiles() { // Read locations var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation; if (inputPath == null || inputPath.Count == 0) { throw new Exception("Reader dumpFilesLocations must be set to a valid value"); } // We gather up all files into a queue var queuedFilesToProcess = GetFileQueue(inputPath); // Then we preprocess the files in the queue and get them ready for processing return PreProcessQueuedFiles(queuedFilesToProcess); } private List PreProcessQueuedFiles(Queue queuedFilesToProcess) { var gatheredFiles = new List(); if (queuedFilesToProcess.Count == 0) { throw new Exception("No files matched accepted extension types in configs"); } var fileFilters = GetFileFilters() ?? new Dictionary(); while (queuedFilesToProcess.TryDequeue(out var file)) { var extensionFull = Path.GetExtension(file); if (extensionFull.Length > 1) { var extension = extensionFull[1..].ToLower(); // if there is a preprocessor, call it and preprocess the file, then add them to the queue if (_preProcessReaders.TryGetValue(extension, out var preProcessor)) { // if the preprocessor found something new to process or generated new files, add them to the queue if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories)) { // all new directories need to be processed as well GetFileQueue(outputDirectories).ToList().ForEach(queuedFilesToProcess.Enqueue); // all output files need to be queued as well outputFiles.ForEach(queuedFilesToProcess.Enqueue); } } else { // if there is no preprocessor for the file, means its ready to filter or accept if (fileFilters.TryGetValue(extension, out var filter)) { if (filter.Accept(file)) { gatheredFiles.Add(file); } } else { gatheredFiles.Add(file); } } } else { // Handle invalid extension } } return gatheredFiles; } private Queue GetFileQueue(List inputPath) { var queuedPathsToProcess = new Queue(); var queuedFilesToProcess = new Queue(); // Accepted file extensions on raw files var acceptedFileExtension = LootDumpProcessorContext.GetConfig() .ReaderConfig .AcceptedFileExtensions .Select(ex => ex.ToLower()); inputPath.ForEach(p => queuedPathsToProcess.Enqueue(p)); while (queuedPathsToProcess.TryDequeue(out var path)) { // Check the input file to be sure its going to have data on it. if (!Directory.Exists(path)) { throw new Exception($"Input directory \"{path}\" could not be found"); } // If we should process subfolder, queue them up as well if (LootDumpProcessorContext.GetConfig().ReaderConfig.ProcessSubFolders) { foreach (var directory in Directory.GetDirectories(path)) { queuedPathsToProcess.Enqueue(directory); } } var files = Directory.GetFiles(path); foreach (var file in files) { if (acceptedFileExtension.Contains(Path.GetExtension(file)[1..].ToLower())) { queuedFilesToProcess.Enqueue(file); } } } return queuedFilesToProcess; } private Dictionary? GetFileFilters() { return LootDumpProcessorContext.GetConfig() .ReaderConfig .FileFilters ?.ToDictionary( t => FileFilterFactory.GetInstance(t).GetExtension(), FileFilterFactory.GetInstance ); } private void ProcessFilesFromDumpsPerMap(ICollector collector, string mapName) { // Gather all files, sort them by date descending and then add them into the processing queue GatherFiles().FindAll(f => f.ToLower().Contains($"{mapName}--")).OrderByDescending(f => { FileDateParser.TryParseFileDate(f, out var date); return date; } ).ToList().ForEach(f => _filesToProcess.Add(f)); if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) { LoggerFactory.GetInstance().Log("Files sorted and ready to begin pre-processing", LogLevel.Info); } Parallel.ForEach(_filesToProcess, file => { try { var reader = IntakeReaderFactory.GetInstance(); if (reader.Read(file, out var basicInfo)) { collector.Hold(_fileProcessor.Process(basicInfo)); } } catch (Exception e) { if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error)) { LoggerFactory.GetInstance().Log( $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}", LogLevel.Error); } } }); if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) { LoggerFactory.GetInstance().Log("Pre-processing finished", LogLevel.Info); } // Single writer instance to collect results var writer = WriterFactory.GetInstance(); // Single collector instance to collect results writer.WriteAll(_dumpProcessor.ProcessDumps(collector.Retrieve())); // clear collector and datastorage as we process per map now collector.Clear(); DataStorageFactory.GetInstance().Clear(); } /// /// Adds map name to file if they dont have it already. /// /// Number of threads to use private async Task FixFilesFromDumps() { var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation; if (inputPath == null || inputPath.Count == 0) { throw new Exception("Reader dumpFilesLocations must be set to a valid value"); } GetFileQueue(inputPath).ToList().ForEach(f => _filesToRename.Add(f)); var jsonUtil = JsonSerializerFactory.GetInstance(JsonSerializerTypes.DotNet); await Parallel.ForEachAsync(_filesToRename, async (file, cancellationToken) => { if (_mapNames.Any(file.Contains)) return; try { var data = await File.ReadAllTextAsync(file, cancellationToken); var fileData = jsonUtil.Deserialize(data); var newPath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp"); File.Move(file, newPath); } catch (Exception e) { if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error)) { LoggerFactory.GetInstance().Log( $"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}", LogLevel.Error); } } }); if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info)) { LoggerFactory.GetInstance().Log("File-processing finished", LogLevel.Info); } } }