mirror of
https://github.com/sp-tarkov/loot-dump-processor.git
synced 2025-02-12 15:10:44 -05:00
Per map process (#1)
* Seperate Tasks into their own methods, Add method to add map name to file name * Formatting and added a few comments * Add Clear method to collectors * Change to local var, so data isnt stored longer than needed * add Clear method to Storages * Add changes for Per Map Processing and file i forgot to commit last time * Update config to have mapNames in it, removed unused yaml file * update comment * changed to not throw so Filemode still can be used
This commit is contained in:
parent
468871b62c
commit
39ac387aae
@ -60,5 +60,19 @@
|
|||||||
"container_City_SW_04_DesignStuff_00002",
|
"container_City_SW_04_DesignStuff_00002",
|
||||||
"Lootable_00063"
|
"Lootable_00063"
|
||||||
]
|
]
|
||||||
}
|
},
|
||||||
|
"mapsToProcess": [
|
||||||
|
"bigmap",
|
||||||
|
"factory4_day",
|
||||||
|
"factory4_night",
|
||||||
|
"interchange",
|
||||||
|
"laboratory",
|
||||||
|
"lighthouse",
|
||||||
|
"rezervbase",
|
||||||
|
"sandbox",
|
||||||
|
"sandbox_high",
|
||||||
|
"shorline",
|
||||||
|
"tarkovstreets",
|
||||||
|
"woods"
|
||||||
|
]
|
||||||
}
|
}
|
@ -1,35 +0,0 @@
|
|||||||
---
|
|
||||||
Customs:
|
|
||||||
name:
|
|
||||||
- bigmap
|
|
||||||
Factory:
|
|
||||||
name:
|
|
||||||
- factory4_day
|
|
||||||
- factory4_night
|
|
||||||
Interchange:
|
|
||||||
name:
|
|
||||||
- interchange
|
|
||||||
Laboratory:
|
|
||||||
name:
|
|
||||||
- laboratory
|
|
||||||
Lighthouse:
|
|
||||||
name:
|
|
||||||
- lighthouse
|
|
||||||
ReserveBase:
|
|
||||||
name:
|
|
||||||
- rezervbase
|
|
||||||
Shoreline:
|
|
||||||
name:
|
|
||||||
- shoreline
|
|
||||||
Woods:
|
|
||||||
name:
|
|
||||||
- woods
|
|
||||||
Streets of Tarkov:
|
|
||||||
name:
|
|
||||||
- tarkovstreets
|
|
||||||
Sandbox:
|
|
||||||
name:
|
|
||||||
- Sandbox
|
|
||||||
SandboxHigh:
|
|
||||||
name:
|
|
||||||
- Sandbox_high
|
|
@ -26,9 +26,6 @@
|
|||||||
<None Update="Config\forced_static.yaml">
|
<None Update="Config\forced_static.yaml">
|
||||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
||||||
</None>
|
</None>
|
||||||
<None Update="Config\map_directory_mapping.yaml">
|
|
||||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
|
||||||
</None>
|
|
||||||
<None Update="Config\config.json">
|
<None Update="Config\config.json">
|
||||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
||||||
</None>
|
</None>
|
||||||
|
@ -54,6 +54,10 @@ public static class LootDumpProcessorContext
|
|||||||
return _forcedStatic;
|
return _forcedStatic;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Not Used
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
public static Dictionary<string, MapDirectoryMapping> GetDirectoryMappings()
|
public static Dictionary<string, MapDirectoryMapping> GetDirectoryMappings()
|
||||||
{
|
{
|
||||||
lock (_mapDirectoryMappingsLock)
|
lock (_mapDirectoryMappingsLock)
|
||||||
|
@ -57,4 +57,8 @@ public class Config
|
|||||||
[JsonProperty("containerIgnoreList")]
|
[JsonProperty("containerIgnoreList")]
|
||||||
[JsonPropertyName("containerIgnoreList")]
|
[JsonPropertyName("containerIgnoreList")]
|
||||||
public Dictionary<string, string[]> ContainerIgnoreList { get; set; }
|
public Dictionary<string, string[]> ContainerIgnoreList { get; set; }
|
||||||
|
|
||||||
|
[JsonProperty("mapsToProcess")]
|
||||||
|
[JsonPropertyName("mapsToProcess")]
|
||||||
|
public List<string> MapsToProcess { get; set; }
|
||||||
}
|
}
|
@ -42,4 +42,16 @@ public class DumpCollector : ICollector
|
|||||||
|
|
||||||
return processedDumps;
|
return processedDumps;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void Clear()
|
||||||
|
{
|
||||||
|
lock (lockObject)
|
||||||
|
{
|
||||||
|
foreach (var file in Directory.GetFiles(DumpLocation))
|
||||||
|
{
|
||||||
|
File.Delete(file);
|
||||||
|
}
|
||||||
|
processedDumps.Clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -23,4 +23,9 @@ public class HashSetCollector : ICollector
|
|||||||
{
|
{
|
||||||
return processedDumps.ToList();
|
return processedDumps.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void Clear()
|
||||||
|
{
|
||||||
|
processedDumps.Clear();
|
||||||
|
}
|
||||||
}
|
}
|
@ -6,6 +6,7 @@ public interface ICollector
|
|||||||
{
|
{
|
||||||
void Setup();
|
void Setup();
|
||||||
void Hold(PartialData parsedDump);
|
void Hold(PartialData parsedDump);
|
||||||
|
void Clear();
|
||||||
|
|
||||||
List<PartialData> Retrieve();
|
List<PartialData> Retrieve();
|
||||||
}
|
}
|
@ -17,8 +17,6 @@ public class MultithreadSteppedDumpProcessor : IDumpProcessor
|
|||||||
|
|
||||||
private static readonly List<Task> Runners = new();
|
private static readonly List<Task> Runners = new();
|
||||||
|
|
||||||
private static readonly BlockingCollection<PartialData> _partialDataToProcess = new();
|
|
||||||
|
|
||||||
// if we need to, this variable can be moved to use the factory, but since the factory
|
// if we need to, this variable can be moved to use the factory, but since the factory
|
||||||
// needs a locking mechanism to prevent dictionary access exceptions, its better to keep
|
// needs a locking mechanism to prevent dictionary access exceptions, its better to keep
|
||||||
// a reference to use here
|
// a reference to use here
|
||||||
@ -255,6 +253,9 @@ public class MultithreadSteppedDumpProcessor : IDumpProcessor
|
|||||||
looseLootCounts.ItemProperties = actualDictionaryItemProperties.GetKey();
|
looseLootCounts.ItemProperties = actualDictionaryItemProperties.GetKey();
|
||||||
|
|
||||||
dumpProcessData.LooseLootCounts.Add(mapName, looseLootCounts.GetKey());
|
dumpProcessData.LooseLootCounts.Add(mapName, looseLootCounts.GetKey());
|
||||||
|
|
||||||
|
BlockingCollection<PartialData> _partialDataToProcess = new();
|
||||||
|
|
||||||
// add the items to the queue
|
// add the items to the queue
|
||||||
foreach (var partialData in partialFileMetaData)
|
foreach (var partialData in partialFileMetaData)
|
||||||
{
|
{
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using LootDumpProcessor.Logger;
|
using LootDumpProcessor.Logger;
|
||||||
|
using LootDumpProcessor.Model.Input;
|
||||||
using LootDumpProcessor.Process.Collector;
|
using LootDumpProcessor.Process.Collector;
|
||||||
using LootDumpProcessor.Process.Processor;
|
using LootDumpProcessor.Process.Processor;
|
||||||
using LootDumpProcessor.Process.Processor.DumpProcessor;
|
using LootDumpProcessor.Process.Processor.DumpProcessor;
|
||||||
@ -9,14 +10,20 @@ using LootDumpProcessor.Process.Reader.Filters;
|
|||||||
using LootDumpProcessor.Process.Reader.Intake;
|
using LootDumpProcessor.Process.Reader.Intake;
|
||||||
using LootDumpProcessor.Process.Reader.PreProcess;
|
using LootDumpProcessor.Process.Reader.PreProcess;
|
||||||
using LootDumpProcessor.Process.Writer;
|
using LootDumpProcessor.Process.Writer;
|
||||||
|
using LootDumpProcessor.Serializers.Json;
|
||||||
|
using LootDumpProcessor.Storage;
|
||||||
using LootDumpProcessor.Utils;
|
using LootDumpProcessor.Utils;
|
||||||
|
|
||||||
namespace LootDumpProcessor.Process;
|
namespace LootDumpProcessor.Process;
|
||||||
|
|
||||||
public class QueuePipeline : IPipeline
|
public class QueuePipeline : IPipeline
|
||||||
{
|
{
|
||||||
|
private static readonly BlockingCollection<string> _filesToRename = new();
|
||||||
private static readonly BlockingCollection<string> _filesToProcess = new();
|
private static readonly BlockingCollection<string> _filesToProcess = new();
|
||||||
private static readonly List<Task> Runners = new();
|
private static readonly List<Task> Runners = new();
|
||||||
|
private static readonly List<Task> Renamers = new();
|
||||||
|
|
||||||
|
private readonly List<string> _mapNames = LootDumpProcessorContext.GetConfig().MapsToProcess;
|
||||||
|
|
||||||
private static readonly Dictionary<string, IPreProcessReader> _preProcessReaders;
|
private static readonly Dictionary<string, IPreProcessReader> _preProcessReaders;
|
||||||
|
|
||||||
@ -41,68 +48,19 @@ public class QueuePipeline : IPipeline
|
|||||||
// We add 2 more threads to the total count to account for subprocesses and others
|
// We add 2 more threads to the total count to account for subprocesses and others
|
||||||
int threads = LootDumpProcessorContext.GetConfig().Threads;
|
int threads = LootDumpProcessorContext.GetConfig().Threads;
|
||||||
ThreadPool.SetMaxThreads(threads + 2, threads + 2);
|
ThreadPool.SetMaxThreads(threads + 2, threads + 2);
|
||||||
|
|
||||||
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
LoggerFactory.GetInstance().Log("Gathering files to begin processing", LogLevel.Info);
|
LoggerFactory.GetInstance().Log("Gathering files to begin processing", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Gather all files, sort them by date descending and then add them into the processing queue
|
FixFilesFromDumps(threads);
|
||||||
GatherFiles().OrderByDescending(f =>
|
foreach (var mapName in _mapNames)
|
||||||
{
|
|
||||||
FileDateParser.TryParseFileDate(f, out var date);
|
|
||||||
return date;
|
|
||||||
}
|
|
||||||
).ToList().ForEach(f => _filesToProcess.Add(f));
|
|
||||||
|
|
||||||
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
|
||||||
LoggerFactory.GetInstance().Log("Files sorted and ready to begin pre-processing", LogLevel.Info);
|
|
||||||
|
|
||||||
// We startup all the threads and collect them into a runners list
|
|
||||||
for (int i = 0; i < threads; i++)
|
|
||||||
{
|
{
|
||||||
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
ProcessFilesFromDumpsPerMap(threads, collector, mapName);
|
||||||
LoggerFactory.GetInstance().Log("Creating pre-processing threads", LogLevel.Info);
|
|
||||||
Runners.Add(
|
|
||||||
Task.Factory.StartNew(
|
|
||||||
() =>
|
|
||||||
{
|
|
||||||
while (_filesToProcess.TryTake(out var file, TimeSpan.FromMilliseconds(5000)))
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var reader = IntakeReaderFactory.GetInstance();
|
|
||||||
var processor = FileProcessorFactory.GetInstance();
|
|
||||||
if (reader.Read(file, out var basicInfo))
|
|
||||||
collector.Hold(processor.Process(basicInfo));
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
|
|
||||||
LoggerFactory.GetInstance().Log(
|
|
||||||
$"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
|
|
||||||
LogLevel.Error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
TaskCreationOptions.LongRunning)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait until all runners are done processing
|
|
||||||
while (!Runners.All(r => r.IsCompleted))
|
|
||||||
{
|
|
||||||
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
|
||||||
LoggerFactory.GetInstance().Log(
|
|
||||||
$"One or more file processors are still processing files. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs}ms before checking again",
|
|
||||||
LogLevel.Info);
|
|
||||||
Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs));
|
|
||||||
}
|
|
||||||
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
|
||||||
LoggerFactory.GetInstance().Log("Pre-processing finished", LogLevel.Info);
|
|
||||||
// Single writer instance to collect results
|
|
||||||
var writer = WriterFactory.GetInstance();
|
|
||||||
// Single collector instance to collect results
|
|
||||||
var dumpProcessor = DumpProcessorFactory.GetInstance();
|
|
||||||
writer.WriteAll(dumpProcessor.ProcessDumps(collector.Retrieve()));
|
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@ -120,7 +78,9 @@ public class QueuePipeline : IPipeline
|
|||||||
var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation;
|
var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation;
|
||||||
|
|
||||||
if (inputPath == null || inputPath.Count == 0)
|
if (inputPath == null || inputPath.Count == 0)
|
||||||
|
{
|
||||||
throw new Exception("Reader dumpFilesLocations must be set to a valid value");
|
throw new Exception("Reader dumpFilesLocations must be set to a valid value");
|
||||||
|
}
|
||||||
|
|
||||||
// We gather up all files into a queue
|
// We gather up all files into a queue
|
||||||
var queuedFilesToProcess = GetFileQueue(inputPath);
|
var queuedFilesToProcess = GetFileQueue(inputPath);
|
||||||
@ -133,7 +93,9 @@ public class QueuePipeline : IPipeline
|
|||||||
var gatheredFiles = new List<string>();
|
var gatheredFiles = new List<string>();
|
||||||
|
|
||||||
if (queuedFilesToProcess.Count == 0)
|
if (queuedFilesToProcess.Count == 0)
|
||||||
|
{
|
||||||
throw new Exception("No files matched accepted extension types in configs");
|
throw new Exception("No files matched accepted extension types in configs");
|
||||||
|
}
|
||||||
|
|
||||||
var fileFilters = GetFileFilters() ?? new Dictionary<string, IFileFilter>();
|
var fileFilters = GetFileFilters() ?? new Dictionary<string, IFileFilter>();
|
||||||
|
|
||||||
@ -147,8 +109,7 @@ public class QueuePipeline : IPipeline
|
|||||||
if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories))
|
if (preProcessor.TryPreProcess(file, out var outputFiles, out var outputDirectories))
|
||||||
{
|
{
|
||||||
// all new directories need to be processed as well
|
// all new directories need to be processed as well
|
||||||
GetFileQueue(outputDirectories).ToList()
|
GetFileQueue(outputDirectories).ToList().ForEach(queuedFilesToProcess.Enqueue);
|
||||||
.ForEach(queuedFilesToProcess.Enqueue);
|
|
||||||
// all output files need to be queued as well
|
// all output files need to be queued as well
|
||||||
outputFiles.ForEach(queuedFilesToProcess.Enqueue);
|
outputFiles.ForEach(queuedFilesToProcess.Enqueue);
|
||||||
}
|
}
|
||||||
@ -159,7 +120,9 @@ public class QueuePipeline : IPipeline
|
|||||||
if (fileFilters.TryGetValue(extension, out var filter))
|
if (fileFilters.TryGetValue(extension, out var filter))
|
||||||
{
|
{
|
||||||
if (filter.Accept(file))
|
if (filter.Accept(file))
|
||||||
|
{
|
||||||
gatheredFiles.Add(file);
|
gatheredFiles.Add(file);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -188,17 +151,28 @@ public class QueuePipeline : IPipeline
|
|||||||
{
|
{
|
||||||
// Check the input file to be sure its going to have data on it.
|
// Check the input file to be sure its going to have data on it.
|
||||||
if (!Directory.Exists(path))
|
if (!Directory.Exists(path))
|
||||||
|
{
|
||||||
throw new Exception($"Input directory \"{inputPath}\" could not be found");
|
throw new Exception($"Input directory \"{inputPath}\" could not be found");
|
||||||
|
}
|
||||||
|
|
||||||
// If we should process subfolder, queue them up as well
|
// If we should process subfolder, queue them up as well
|
||||||
if (LootDumpProcessorContext.GetConfig().ReaderConfig.ProcessSubFolders)
|
if (LootDumpProcessorContext.GetConfig().ReaderConfig.ProcessSubFolders)
|
||||||
|
{
|
||||||
foreach (var directory in Directory.GetDirectories(path))
|
foreach (var directory in Directory.GetDirectories(path))
|
||||||
|
{
|
||||||
queuedPathsToProcess.Enqueue(directory);
|
queuedPathsToProcess.Enqueue(directory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var files = Directory.GetFiles(path);
|
var files = Directory.GetFiles(path);
|
||||||
|
|
||||||
foreach (var file in files)
|
foreach (var file in files)
|
||||||
|
{
|
||||||
if (acceptedFileExtension.Contains(Path.GetExtension(file)[1..].ToLower()))
|
if (acceptedFileExtension.Contains(Path.GetExtension(file)[1..].ToLower()))
|
||||||
|
{
|
||||||
queuedFilesToProcess.Enqueue(file);
|
queuedFilesToProcess.Enqueue(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return queuedFilesToProcess;
|
return queuedFilesToProcess;
|
||||||
@ -214,4 +188,240 @@ public class QueuePipeline : IPipeline
|
|||||||
FileFilterFactory.GetInstance
|
FileFilterFactory.GetInstance
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets all files and adds them to the processor.
|
||||||
|
/// use per map version now
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="threads"></param>
|
||||||
|
/// <param name="collector"></param>
|
||||||
|
private void ProcessFilesFromDumps(int threads, ICollector collector)
|
||||||
|
{
|
||||||
|
// Gather all files, sort them by date descending and then add them into the processing queue
|
||||||
|
GatherFiles().OrderByDescending(f =>
|
||||||
|
{
|
||||||
|
FileDateParser.TryParseFileDate(f, out var date);
|
||||||
|
return date;
|
||||||
|
}
|
||||||
|
).ToList().ForEach(f => _filesToProcess.Add(f));
|
||||||
|
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("Files sorted and ready to begin pre-processing", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We startup all the threads and collect them into a runners list
|
||||||
|
for (int i = 0; i < threads; i++)
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("Creating pre-processing threads", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
Runners.Add(
|
||||||
|
Task.Factory.StartNew(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
while (_filesToProcess.TryTake(out var file, TimeSpan.FromMilliseconds(5000)))
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var reader = IntakeReaderFactory.GetInstance();
|
||||||
|
var processor = FileProcessorFactory.GetInstance();
|
||||||
|
if (reader.Read(file, out var basicInfo))
|
||||||
|
{
|
||||||
|
collector.Hold(processor.Process(basicInfo));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log(
|
||||||
|
$"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
|
||||||
|
LogLevel.Error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
TaskCreationOptions.LongRunning)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all runners are done processing
|
||||||
|
while (!Runners.All(r => r.IsCompleted))
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log(
|
||||||
|
$"One or more file processors are still processing files. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs}ms before checking again",
|
||||||
|
LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("Pre-processing finished", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Single writer instance to collect results
|
||||||
|
var writer = WriterFactory.GetInstance();
|
||||||
|
// Single collector instance to collect results
|
||||||
|
var dumpProcessor = DumpProcessorFactory.GetInstance();
|
||||||
|
writer.WriteAll(dumpProcessor.ProcessDumps(collector.Retrieve()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ProcessFilesFromDumpsPerMap(int threads, ICollector collector, string mapName)
|
||||||
|
{
|
||||||
|
// Gather all files, sort them by date descending and then add them into the processing queue
|
||||||
|
GatherFiles().FindAll(f => f.ToLower().Contains($"{mapName}--")).OrderByDescending(f =>
|
||||||
|
{
|
||||||
|
FileDateParser.TryParseFileDate(f, out var date);
|
||||||
|
return date;
|
||||||
|
}
|
||||||
|
).ToList().ForEach(f => _filesToProcess.Add(f));
|
||||||
|
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("Files sorted and ready to begin pre-processing", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We startup all the threads and collect them into a runners list
|
||||||
|
for (int i = 0; i < threads; i++)
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("Creating pre-processing threads", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
Runners.Add(
|
||||||
|
Task.Factory.StartNew(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
while (_filesToProcess.TryTake(out var file, TimeSpan.FromMilliseconds(5000)))
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var reader = IntakeReaderFactory.GetInstance();
|
||||||
|
var processor = FileProcessorFactory.GetInstance();
|
||||||
|
if (reader.Read(file, out var basicInfo))
|
||||||
|
{
|
||||||
|
collector.Hold(processor.Process(basicInfo));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log(
|
||||||
|
$"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
|
||||||
|
LogLevel.Error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
TaskCreationOptions.LongRunning)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all runners are done processing
|
||||||
|
while (!Runners.All(r => r.IsCompleted))
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log(
|
||||||
|
$"One or more file processors are still processing files. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs}ms before checking again",
|
||||||
|
LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("Pre-processing finished", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Single writer instance to collect results
|
||||||
|
var writer = WriterFactory.GetInstance();
|
||||||
|
// Single collector instance to collect results
|
||||||
|
var dumpProcessor = DumpProcessorFactory.GetInstance();
|
||||||
|
writer.WriteAll(dumpProcessor.ProcessDumps(collector.Retrieve()));
|
||||||
|
|
||||||
|
// clear collector and datastorage as we process per map now
|
||||||
|
collector.Clear();
|
||||||
|
DataStorageFactory.GetInstance().Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Adds map name to file if they dont have it already.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="threads">Number of threads to use</param>
|
||||||
|
private void FixFilesFromDumps(int threads)
|
||||||
|
{
|
||||||
|
var inputPath = LootDumpProcessorContext.GetConfig().ReaderConfig.DumpFilesLocation;
|
||||||
|
|
||||||
|
if (inputPath == null || inputPath.Count == 0)
|
||||||
|
{
|
||||||
|
throw new Exception("Reader dumpFilesLocations must be set to a valid value");
|
||||||
|
}
|
||||||
|
|
||||||
|
GetFileQueue(inputPath).ToList().ForEach(f => _filesToRename.Add(f));
|
||||||
|
|
||||||
|
var jsonUtil = JsonSerializerFactory.GetInstance(JsonSerializerTypes.DotNet);
|
||||||
|
|
||||||
|
for (var i = 0; i < threads; i++)
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("Creating file-processing threads", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
Renamers.Add(Task.Factory.StartNew(() =>
|
||||||
|
{
|
||||||
|
while (_filesToRename.TryTake(out var file, TimeSpan.FromMilliseconds(5000)))
|
||||||
|
{
|
||||||
|
if (_mapNames.Any(x => file.Contains(x)))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var data = File.ReadAllText(file);
|
||||||
|
var fileData = jsonUtil.Deserialize<RootData>(data);
|
||||||
|
var newpath = file.Replace("resp", $"{fileData.Data.LocationLoot.Id.ToLower()}--resp");
|
||||||
|
File.Move(file, newpath);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Error))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log(
|
||||||
|
$"Error occurred while processing file {file}\n{e.Message}\n{e.StackTrace}",
|
||||||
|
LogLevel.Error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, TaskCreationOptions.LongRunning));
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!Renamers.All(r => r.IsCompleted))
|
||||||
|
{
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance()
|
||||||
|
.Log($"one or more files are being processed. Waiting {LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs} ms", LogLevel.Info);
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.Sleep(TimeSpan.FromMilliseconds(LootDumpProcessorContext.GetConfig().ThreadPoolingTimeoutMs));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LoggerFactory.GetInstance().CanBeLogged(LogLevel.Info))
|
||||||
|
{
|
||||||
|
LoggerFactory.GetInstance().Log("File-processing finished", LogLevel.Info);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -7,4 +7,5 @@ public interface IDataStorage
|
|||||||
bool Exists(IKey t);
|
bool Exists(IKey t);
|
||||||
T GetItem<T>(IKey key) where T : IKeyable;
|
T GetItem<T>(IKey key) where T : IKeyable;
|
||||||
List<T> GetAll<T>();
|
List<T> GetAll<T>();
|
||||||
|
void Clear();
|
||||||
}
|
}
|
@ -25,4 +25,9 @@ public class FileDataStorage : IDataStorage
|
|||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void Clear()
|
||||||
|
{
|
||||||
|
// leaving empty so this the File version can still be used it needed
|
||||||
|
}
|
||||||
}
|
}
|
@ -47,4 +47,12 @@ public class MemoryDataStorage : IDataStorage
|
|||||||
{
|
{
|
||||||
return string.Join("-", key.GetLookupIndex());
|
return string.Join("-", key.GetLookupIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void Clear()
|
||||||
|
{
|
||||||
|
lock (_cacheObjectLock)
|
||||||
|
{
|
||||||
|
CachedObjects.Clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user