2024-04-16 18:29:40 +00:00
|
|
|
using System.Collections.Concurrent;
|
2025-01-11 09:17:49 +03:00
|
|
|
using System.Diagnostics;
|
2025-01-11 10:52:23 +03:00
|
|
|
using System.Text.Json;
|
2025-01-13 20:05:36 +03:00
|
|
|
using LootDumpProcessor.Model.Config;
|
2024-12-31 09:12:48 +00:00
|
|
|
using LootDumpProcessor.Model.Input;
|
2023-08-12 19:08:38 +01:00
|
|
|
using LootDumpProcessor.Process.Collector;
|
|
|
|
using LootDumpProcessor.Process.Processor.DumpProcessor;
|
2024-04-16 18:29:40 +00:00
|
|
|
using LootDumpProcessor.Process.Processor.FileProcessor;
|
2023-08-12 19:08:38 +01:00
|
|
|
using LootDumpProcessor.Process.Reader.Filters;
|
2024-04-16 18:29:40 +00:00
|
|
|
using LootDumpProcessor.Process.Reader.Intake;
|
2023-08-12 19:08:38 +01:00
|
|
|
using LootDumpProcessor.Process.Writer;
|
2024-12-31 09:12:48 +00:00
|
|
|
using LootDumpProcessor.Serializers.Json;
|
|
|
|
using LootDumpProcessor.Storage;
|
2024-04-16 18:29:40 +00:00
|
|
|
using LootDumpProcessor.Utils;
|
2025-01-11 09:12:21 +03:00
|
|
|
using Microsoft.Extensions.Logging;
|
2025-01-13 20:05:36 +03:00
|
|
|
using Microsoft.Extensions.Options;
|
2023-08-12 19:08:38 +01:00
|
|
|
|
|
|
|
namespace LootDumpProcessor.Process;
|
|
|
|
|
2025-01-11 09:12:21 +03:00
|
|
|
public class QueuePipeline(
|
2025-01-11 11:12:10 +03:00
|
|
|
IFileProcessor fileProcessor, IDumpProcessor dumpProcessor, ILogger<QueuePipeline> logger, IFileFilter fileFilter,
|
2025-01-13 20:05:36 +03:00
|
|
|
IIntakeReader intakeReader, ICollector collector, IDataStorage dataStorage, IOptions<Config> config, IWriter writer
|
2025-01-11 09:12:21 +03:00
|
|
|
)
|
|
|
|
: IPipeline
|
2023-08-12 19:08:38 +01:00
|
|
|
{
|
2025-01-11 06:54:59 +03:00
|
|
|
private readonly IFileProcessor _fileProcessor =
|
|
|
|
fileProcessor ?? throw new ArgumentNullException(nameof(fileProcessor));
|
|
|
|
|
|
|
|
private readonly IDumpProcessor _dumpProcessor =
|
|
|
|
dumpProcessor ?? throw new ArgumentNullException(nameof(dumpProcessor));
|
|
|
|
|
2025-01-11 09:12:21 +03:00
|
|
|
private readonly ILogger<QueuePipeline> _logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
|
|
|
|
|
|
private readonly IFileFilter _fileFilter = fileFilter ?? throw new ArgumentNullException(nameof(fileFilter));
|
|
|
|
|
|
|
|
private readonly IIntakeReader
|
|
|
|
_intakeReader = intakeReader ?? throw new ArgumentNullException(nameof(intakeReader));
|
|
|
|
|
2025-01-13 20:05:36 +03:00
|
|
|
private readonly ICollector _collector = collector ?? throw new ArgumentNullException(nameof(collector));
|
|
|
|
|
|
|
|
private readonly IDataStorage _dataStorage = dataStorage ?? throw new ArgumentNullException(nameof(dataStorage));
|
|
|
|
|
|
|
|
private readonly IWriter _writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
|
|
|
|
|
|
|
private readonly Config _config = (config ?? throw new ArgumentNullException(nameof(config))).Value;
|
|
|
|
|
2025-01-11 08:24:04 +03:00
|
|
|
private readonly List<string> _filesToRename = new();
|
|
|
|
private readonly BlockingCollection<string> _filesToProcess = new();
|
2024-12-31 09:12:48 +00:00
|
|
|
|
2025-01-13 20:05:36 +03:00
|
|
|
private IReadOnlyList<string> MapNames => _config.MapsToProcess;
|
2023-08-12 19:08:38 +01:00
|
|
|
|
|
|
|
|
2025-01-11 09:17:49 +03:00
|
|
|
public async Task Execute()
|
2023-08-12 19:08:38 +01:00
|
|
|
{
|
2025-01-11 09:17:49 +03:00
|
|
|
var stopwatch = Stopwatch.StartNew();
|
2023-08-12 19:08:38 +01:00
|
|
|
// Single collector instance to collect results
|
2025-01-13 20:05:36 +03:00
|
|
|
_collector.Setup();
|
2023-08-12 19:08:38 +01:00
|
|
|
|
2025-01-11 09:12:21 +03:00
|
|
|
_logger.LogInformation("Gathering files to begin processing");
|
2024-12-31 09:12:48 +00:00
|
|
|
|
2023-08-12 19:08:38 +01:00
|
|
|
try
|
|
|
|
{
|
2025-01-11 08:24:04 +03:00
|
|
|
await FixFilesFromDumps();
|
2025-01-13 20:05:36 +03:00
|
|
|
foreach (var mapName in MapNames) await ProcessFilesFromDumpsPerMap(_collector, mapName);
|
2023-08-12 19:08:38 +01:00
|
|
|
}
|
|
|
|
finally
|
|
|
|
{
|
2025-01-11 09:17:49 +03:00
|
|
|
stopwatch.Stop();
|
|
|
|
_logger.LogInformation("Dumps processed in {@Time}", stopwatch.Elapsed);
|
2023-08-12 19:08:38 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private List<string> GatherFiles()
|
|
|
|
{
|
|
|
|
// Read locations
|
2025-01-13 20:05:36 +03:00
|
|
|
var inputPath = _config.ReaderConfig.DumpFilesLocation;
|
2023-08-12 19:08:38 +01:00
|
|
|
|
|
|
|
if (inputPath == null || inputPath.Count == 0)
|
|
|
|
throw new Exception("Reader dumpFilesLocations must be set to a valid value");
|
|
|
|
|
|
|
|
// We gather up all files into a queue
|
|
|
|
var queuedFilesToProcess = GetFileQueue(inputPath);
|
|
|
|
// Then we preprocess the files in the queue and get them ready for processing
|
|
|
|
return PreProcessQueuedFiles(queuedFilesToProcess);
|
|
|
|
}
|
|
|
|
|
|
|
|
private List<string> PreProcessQueuedFiles(Queue<string> queuedFilesToProcess)
|
|
|
|
{
|
|
|
|
var gatheredFiles = new List<string>();
|
|
|
|
|
|
|
|
if (queuedFilesToProcess.Count == 0)
|
|
|
|
throw new Exception("No files matched accepted extension types in configs");
|
|
|
|
|
|
|
|
while (queuedFilesToProcess.TryDequeue(out var file))
|
|
|
|
{
|
2025-01-11 08:24:04 +03:00
|
|
|
var extensionFull = Path.GetExtension(file);
|
|
|
|
if (extensionFull.Length > 1)
|
2023-08-12 19:08:38 +01:00
|
|
|
{
|
2025-01-11 11:12:10 +03:00
|
|
|
if (_fileFilter.Accept(file)) gatheredFiles.Add(file);
|
|
|
|
|
|
|
|
else gatheredFiles.Add(file);
|
2023-08-12 19:08:38 +01:00
|
|
|
}
|
2025-01-11 08:24:04 +03:00
|
|
|
else
|
|
|
|
{
|
|
|
|
// Handle invalid extension
|
2025-01-11 09:12:21 +03:00
|
|
|
_logger.LogWarning("File '{File}' has an invalid extension.", file);
|
2025-01-11 08:24:04 +03:00
|
|
|
}
|
2023-08-12 19:08:38 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return gatheredFiles;
|
|
|
|
}
|
|
|
|
|
2025-01-13 20:05:36 +03:00
|
|
|
private Queue<string> GetFileQueue(IReadOnlyList<string> inputPath)
|
2023-08-12 19:08:38 +01:00
|
|
|
{
|
|
|
|
var queuedPathsToProcess = new Queue<string>();
|
|
|
|
var queuedFilesToProcess = new Queue<string>();
|
|
|
|
|
2025-01-13 20:05:36 +03:00
|
|
|
foreach (var path in inputPath) queuedPathsToProcess.Enqueue(path);
|
2023-08-12 19:08:38 +01:00
|
|
|
|
|
|
|
while (queuedPathsToProcess.TryDequeue(out var path))
|
|
|
|
{
|
|
|
|
// Check the input file to be sure its going to have data on it.
|
2025-01-11 10:52:23 +03:00
|
|
|
if (!Directory.Exists(path)) throw new Exception($"Input directory \"{path}\" could not be found");
|
2024-12-31 09:12:48 +00:00
|
|
|
|
2023-08-12 19:08:38 +01:00
|
|
|
// If we should process subfolder, queue them up as well
|
2025-01-13 20:05:36 +03:00
|
|
|
if (_config.ReaderConfig.ProcessSubFolders)
|
2023-08-12 19:08:38 +01:00
|
|
|
foreach (var directory in Directory.GetDirectories(path))
|
|
|
|
queuedPathsToProcess.Enqueue(directory);
|
|
|
|
|
|
|
|
var files = Directory.GetFiles(path);
|
|
|
|
|
2025-01-11 10:52:23 +03:00
|
|
|
foreach (var file in files) queuedFilesToProcess.Enqueue(file);
|
2023-08-12 19:08:38 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return queuedFilesToProcess;
|
|
|
|
}
|
|
|
|
|
2025-01-13 02:19:41 +03:00
|
|
|
private async Task ProcessFilesFromDumpsPerMap(ICollector collector, string mapName)
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
|
|
|
// Gather all files, sort them by date descending and then add them into the processing queue
|
|
|
|
GatherFiles().FindAll(f => f.ToLower().Contains($"{mapName}--")).OrderByDescending(f =>
|
|
|
|
{
|
|
|
|
FileDateParser.TryParseFileDate(f, out var date);
|
|
|
|
return date;
|
|
|
|
}
|
|
|
|
).ToList().ForEach(f => _filesToProcess.Add(f));
|
|
|
|
|
2025-01-11 09:12:21 +03:00
|
|
|
_logger.LogInformation("Files sorted and ready to begin pre-processing");
|
2024-12-31 09:12:48 +00:00
|
|
|
|
2025-01-13 20:05:36 +03:00
|
|
|
await Parallel.ForEachAsync(_filesToProcess, async (file, _) =>
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
2025-01-11 08:24:04 +03:00
|
|
|
try
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
2025-01-11 10:52:23 +03:00
|
|
|
if (!_intakeReader.Read(file, out var basicInfo)) return;
|
|
|
|
|
2025-01-13 20:05:36 +03:00
|
|
|
var partialData = await _fileProcessor.Process(basicInfo);
|
2025-01-11 10:52:23 +03:00
|
|
|
collector.Hold(partialData);
|
2024-12-31 09:12:48 +00:00
|
|
|
}
|
2025-01-11 08:24:04 +03:00
|
|
|
catch (Exception e)
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
2025-01-11 09:12:21 +03:00
|
|
|
_logger.LogError(e, "Error occurred while processing file {File}", file);
|
2024-12-31 09:12:48 +00:00
|
|
|
}
|
2025-01-11 08:24:04 +03:00
|
|
|
});
|
2024-12-31 09:12:48 +00:00
|
|
|
|
2025-01-11 09:12:21 +03:00
|
|
|
_logger.LogInformation("Pre-processing finished");
|
2024-12-31 09:12:48 +00:00
|
|
|
|
|
|
|
// Single writer instance to collect results
|
|
|
|
// Single collector instance to collect results
|
2025-01-13 02:19:41 +03:00
|
|
|
var partialData = collector.Retrieve();
|
|
|
|
var processedDumps = await _dumpProcessor.ProcessDumps(partialData);
|
2025-01-13 20:05:36 +03:00
|
|
|
_writer.WriteAll(processedDumps);
|
2024-12-31 09:12:48 +00:00
|
|
|
|
|
|
|
// clear collector and datastorage as we process per map now
|
|
|
|
collector.Clear();
|
2025-01-13 20:05:36 +03:00
|
|
|
_dataStorage.Clear();
|
2024-12-31 09:12:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// <summary>
|
2025-01-11 09:12:21 +03:00
|
|
|
/// Adds map name to file if they don't have it already.
|
2024-12-31 09:12:48 +00:00
|
|
|
/// </summary>
|
2025-01-11 08:24:04 +03:00
|
|
|
private async Task FixFilesFromDumps()
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
2025-01-13 20:05:36 +03:00
|
|
|
var inputPath = _config.ReaderConfig.DumpFilesLocation;
|
2024-12-31 09:12:48 +00:00
|
|
|
|
|
|
|
if (inputPath == null || inputPath.Count == 0)
|
|
|
|
throw new Exception("Reader dumpFilesLocations must be set to a valid value");
|
|
|
|
|
|
|
|
GetFileQueue(inputPath).ToList().ForEach(f => _filesToRename.Add(f));
|
|
|
|
|
2025-01-11 08:24:04 +03:00
|
|
|
await Parallel.ForEachAsync(_filesToRename, async (file, cancellationToken) =>
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
2025-01-13 20:05:36 +03:00
|
|
|
if (MapNames.Any(file.Contains)) return;
|
2025-01-11 08:24:04 +03:00
|
|
|
|
|
|
|
try
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
2025-01-11 08:24:04 +03:00
|
|
|
var data = await File.ReadAllTextAsync(file, cancellationToken);
|
2025-01-11 10:52:23 +03:00
|
|
|
var fileData = JsonSerializer.Deserialize<RootData>(data, JsonSerializerSettings.Default);
|
2025-01-11 08:24:04 +03:00
|
|
|
var newPath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp");
|
|
|
|
File.Move(file, newPath);
|
2024-12-31 09:12:48 +00:00
|
|
|
}
|
2025-01-11 08:24:04 +03:00
|
|
|
catch (Exception e)
|
2024-12-31 09:12:48 +00:00
|
|
|
{
|
2025-01-11 09:12:21 +03:00
|
|
|
_logger.LogError(e, "Error occurred while processing file {File}", file);
|
2024-12-31 09:12:48 +00:00
|
|
|
}
|
2025-01-11 08:24:04 +03:00
|
|
|
});
|
2024-12-31 09:12:48 +00:00
|
|
|
|
2025-01-11 09:12:21 +03:00
|
|
|
_logger.LogInformation("File-processing finished");
|
2024-12-31 09:12:48 +00:00
|
|
|
}
|
2023-08-12 19:08:38 +01:00
|
|
|
}
|