mirror of
https://github.com/sp-tarkov/loot-dump-processor.git
synced 2025-02-13 02:30:45 -05:00
![BlueXTX](/assets/img/avatar_default.png)
* Improved thread safety and async processing in dump processor components * Removed unused _processedDumps field and simplified variable scope in QueuePipeline * Refactored service registration into dedicated extension methods * Added configuration binding and environment variables support * Refactored collector initialization to use dependency injection * Refactored data storage to use dependency injection * Refactored configuration models to use records and added validation * Refactored static loot configuration to use dependency injection The changes include: - Moved static weapon IDs and forced items from LootDumpProcessorContext to ForcedStatic record - Added ForcedStatic configuration injection in StaticLootProcessor and StaticContainerProcessor - Improved immutability by using read-only collections in ForcedStatic model - Simplified LootDumpProcessorContext by removing unused methods * Refactored configuration access to use dependency injection consistently * Fixed ForcedStatic configuration * Refactored forced items configuration to use async provider pattern The changes introduce a new `IForcedItemsProvider` abstraction to handle loading and caching of forced static and loose loot configurations. This improves the code by: 1. Making configuration loading asynchronous 2. Implementing caching of loaded configurations 3. Centralizing forced items configuration access 4. Removing direct file system dependencies from processors 5. Improving testability through dependency injection The change also updates related processors and interfaces to use async/await pattern consistently. * Refactored loose loot processor to use async forced items provider * Reorganized processor and service components into dedicated namespaces
209 lines
7.7 KiB
C#
209 lines
7.7 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Diagnostics;
|
|
using System.Text.Json;
|
|
using LootDumpProcessor.Model.Config;
|
|
using LootDumpProcessor.Model.Input;
|
|
using LootDumpProcessor.Process.Collector;
|
|
using LootDumpProcessor.Process.Processor.DumpProcessor;
|
|
using LootDumpProcessor.Process.Processor.FileProcessor;
|
|
using LootDumpProcessor.Process.Reader.Filters;
|
|
using LootDumpProcessor.Process.Reader.Intake;
|
|
using LootDumpProcessor.Process.Writer;
|
|
using LootDumpProcessor.Serializers.Json;
|
|
using LootDumpProcessor.Storage;
|
|
using LootDumpProcessor.Utils;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
|
|
namespace LootDumpProcessor.Process;
|
|
|
|
public class QueuePipeline(
|
|
IFileProcessor fileProcessor, IDumpProcessor dumpProcessor, ILogger<QueuePipeline> logger, IFileFilter fileFilter,
|
|
IIntakeReader intakeReader, ICollector collector, IDataStorage dataStorage, IOptions<Config> config, IWriter writer
|
|
)
|
|
: IPipeline
|
|
{
|
|
private readonly IFileProcessor _fileProcessor =
|
|
fileProcessor ?? throw new ArgumentNullException(nameof(fileProcessor));
|
|
|
|
private readonly IDumpProcessor _dumpProcessor =
|
|
dumpProcessor ?? throw new ArgumentNullException(nameof(dumpProcessor));
|
|
|
|
private readonly ILogger<QueuePipeline> _logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
|
|
private readonly IFileFilter _fileFilter = fileFilter ?? throw new ArgumentNullException(nameof(fileFilter));
|
|
|
|
private readonly IIntakeReader
|
|
_intakeReader = intakeReader ?? throw new ArgumentNullException(nameof(intakeReader));
|
|
|
|
private readonly ICollector _collector = collector ?? throw new ArgumentNullException(nameof(collector));
|
|
|
|
private readonly IDataStorage _dataStorage = dataStorage ?? throw new ArgumentNullException(nameof(dataStorage));
|
|
|
|
private readonly IWriter _writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
|
|
|
private readonly Config _config = (config ?? throw new ArgumentNullException(nameof(config))).Value;
|
|
|
|
private readonly List<string> _filesToRename = new();
|
|
private readonly BlockingCollection<string> _filesToProcess = new();
|
|
|
|
private IReadOnlyList<string> MapNames => _config.MapsToProcess;
|
|
|
|
|
|
public async Task Execute()
|
|
{
|
|
var stopwatch = Stopwatch.StartNew();
|
|
// Single collector instance to collect results
|
|
_collector.Setup();
|
|
|
|
_logger.LogInformation("Gathering files to begin processing");
|
|
|
|
try
|
|
{
|
|
await FixFilesFromDumps();
|
|
foreach (var mapName in MapNames) await ProcessFilesFromDumpsPerMap(_collector, mapName);
|
|
}
|
|
finally
|
|
{
|
|
stopwatch.Stop();
|
|
_logger.LogInformation("Dumps processed in {@Time}", stopwatch.Elapsed);
|
|
}
|
|
}
|
|
|
|
private List<string> GatherFiles()
|
|
{
|
|
// Read locations
|
|
var inputPath = _config.ReaderConfig.DumpFilesLocation;
|
|
|
|
if (inputPath == null || inputPath.Count == 0)
|
|
throw new Exception("Reader dumpFilesLocations must be set to a valid value");
|
|
|
|
// We gather up all files into a queue
|
|
var queuedFilesToProcess = GetFileQueue(inputPath);
|
|
// Then we preprocess the files in the queue and get them ready for processing
|
|
return PreProcessQueuedFiles(queuedFilesToProcess);
|
|
}
|
|
|
|
private List<string> PreProcessQueuedFiles(Queue<string> queuedFilesToProcess)
|
|
{
|
|
var gatheredFiles = new List<string>();
|
|
|
|
if (queuedFilesToProcess.Count == 0)
|
|
throw new Exception("No files matched accepted extension types in configs");
|
|
|
|
while (queuedFilesToProcess.TryDequeue(out var file))
|
|
{
|
|
var extensionFull = Path.GetExtension(file);
|
|
if (extensionFull.Length > 1)
|
|
{
|
|
if (_fileFilter.Accept(file)) gatheredFiles.Add(file);
|
|
|
|
else gatheredFiles.Add(file);
|
|
}
|
|
else
|
|
{
|
|
// Handle invalid extension
|
|
_logger.LogWarning("File '{File}' has an invalid extension.", file);
|
|
}
|
|
}
|
|
|
|
return gatheredFiles;
|
|
}
|
|
|
|
private Queue<string> GetFileQueue(IReadOnlyList<string> inputPath)
|
|
{
|
|
var queuedPathsToProcess = new Queue<string>();
|
|
var queuedFilesToProcess = new Queue<string>();
|
|
|
|
foreach (var path in inputPath) queuedPathsToProcess.Enqueue(path);
|
|
|
|
while (queuedPathsToProcess.TryDequeue(out var path))
|
|
{
|
|
// Check the input file to be sure its going to have data on it.
|
|
if (!Directory.Exists(path)) throw new Exception($"Input directory \"{path}\" could not be found");
|
|
|
|
// If we should process subfolder, queue them up as well
|
|
if (_config.ReaderConfig.ProcessSubFolders)
|
|
foreach (var directory in Directory.GetDirectories(path))
|
|
queuedPathsToProcess.Enqueue(directory);
|
|
|
|
var files = Directory.GetFiles(path);
|
|
|
|
foreach (var file in files) queuedFilesToProcess.Enqueue(file);
|
|
}
|
|
|
|
return queuedFilesToProcess;
|
|
}
|
|
|
|
private async Task ProcessFilesFromDumpsPerMap(ICollector collector, string mapName)
|
|
{
|
|
// Gather all files, sort them by date descending and then add them into the processing queue
|
|
GatherFiles().FindAll(f => f.ToLower().Contains($"{mapName}--")).OrderByDescending(f =>
|
|
{
|
|
FileDateParser.TryParseFileDate(f, out var date);
|
|
return date;
|
|
}
|
|
).ToList().ForEach(f => _filesToProcess.Add(f));
|
|
|
|
_logger.LogInformation("Files sorted and ready to begin pre-processing");
|
|
|
|
await Parallel.ForEachAsync(_filesToProcess, async (file, _) =>
|
|
{
|
|
try
|
|
{
|
|
if (!_intakeReader.Read(file, out var basicInfo)) return;
|
|
|
|
var partialData = await _fileProcessor.Process(basicInfo);
|
|
collector.Hold(partialData);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError(e, "Error occurred while processing file {File}", file);
|
|
}
|
|
});
|
|
|
|
_logger.LogInformation("Pre-processing finished");
|
|
|
|
// Single writer instance to collect results
|
|
// Single collector instance to collect results
|
|
var partialData = collector.Retrieve();
|
|
var processedDumps = await _dumpProcessor.ProcessDumps(partialData);
|
|
_writer.WriteAll(processedDumps);
|
|
|
|
// clear collector and datastorage as we process per map now
|
|
collector.Clear();
|
|
_dataStorage.Clear();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Adds map name to file if they don't have it already.
|
|
/// </summary>
|
|
private async Task FixFilesFromDumps()
|
|
{
|
|
var inputPath = _config.ReaderConfig.DumpFilesLocation;
|
|
|
|
if (inputPath == null || inputPath.Count == 0)
|
|
throw new Exception("Reader dumpFilesLocations must be set to a valid value");
|
|
|
|
GetFileQueue(inputPath).ToList().ForEach(f => _filesToRename.Add(f));
|
|
|
|
await Parallel.ForEachAsync(_filesToRename, async (file, cancellationToken) =>
|
|
{
|
|
if (MapNames.Any(file.Contains)) return;
|
|
|
|
try
|
|
{
|
|
var data = await File.ReadAllTextAsync(file, cancellationToken);
|
|
var fileData = JsonSerializer.Deserialize<RootData>(data, JsonSerializerSettings.Default);
|
|
var newPath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp");
|
|
File.Move(file, newPath);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError(e, "Error occurred while processing file {File}", file);
|
|
}
|
|
});
|
|
|
|
_logger.LogInformation("File-processing finished");
|
|
}
|
|
} |