Skip to content

Commit

Permalink
Fixes for the resumable downloads feature (#2345)
Browse files Browse the repository at this point in the history
* Fix handle of hashed file not closing

* Limit concurrent downloads to a maximum of 8

* Dynamically increase job size to avoid downloads appearing stalled

* Set downloader settings to avoid RAM and timeout issues

* Improve logging around downloads

* Adds more logging when starting stopping downloads
* Improves error message when GameFileSource download fails
* Stops logging errors when archive isn't available on WJ CDN

* Add retry mechanism to SingleThreadedDownloader

* Update CHANGELOG.md

* Remove hard limit for download threads

---------

Co-authored-by: UrbanCMC <UrbanCMC@web.de>
  • Loading branch information
EzioTheDeadPoet and UrbanCMC authored Jun 27, 2023
1 parent cbc87f8 commit 00faee4
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 26 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
### Changelog

#### Version TBD
* Fixed issues related to high RAM usage
* The resumable downloads now reserve drive space to write to in advance instead of being managed in system RAM

#### Version - 3.1.0.0 - 5/7/2023
* Fixed Readme opening twice
* Updated Text in the UI to better describe current app behavior
Expand Down
18 changes: 14 additions & 4 deletions Wabbajack.Downloaders.Dispatcher/DownloadDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Wabbajack.Common;
using Wabbajack.Downloaders.Interfaces;
using Wabbajack.Downloaders.VerificationCache;
using Wabbajack.DTOs;
Expand Down Expand Up @@ -50,7 +51,9 @@ public async Task<Hash> Download(Archive a, AbsolutePath dest, CancellationToken

using var downloadScope = _logger.BeginScope("Downloading {Name}", a.Name);
using var job = await _limiter.Begin("Downloading " + a.Name, a.Size, token);
return await Download(a, dest, job, token, proxy);
var hash = await Download(a, dest, job, token, proxy);
_logger.LogInformation("Finished downloading {name}. Hash: {hash}; Size: {size}/{expectedSize}", a.Name, hash, dest.Size().ToFileSizeString(), a.Size.ToFileSizeString());
return hash;
}

public async Task<Archive> MaybeProxy(Archive a, CancellationToken token)
Expand Down Expand Up @@ -153,8 +156,15 @@ public async Task<bool> Verify(Archive a, CancellationToken token)
if (downloadedHash != default && (downloadedHash == archive.Hash || archive.Hash == default))
return (DownloadResult.Success, downloadedHash);

downloadedHash = await DownloadFromMirror(archive, destination, token);
if (downloadedHash != default) return (DownloadResult.Mirror, downloadedHash);
try
{
downloadedHash = await DownloadFromMirror(archive, destination, token);
if (downloadedHash != default) return (DownloadResult.Mirror, downloadedHash);
}
catch (NotSupportedException)
{
// Thrown if downloading from mirror is not supported for archive, keep original hash
}

return (DownloadResult.Failure, downloadedHash);

Expand Down Expand Up @@ -234,7 +244,7 @@ private async Task<Hash> DownloadFromMirror(Archive archive, AbsolutePath destin

return await Download(newArchive, destination, token);
}
catch (Exception ex)
catch (Exception ex) when (ex is not NotSupportedException)
{
_logger.LogCritical(ex, "While finding mirror for {hash}", archive.Hash);
return default;
Expand Down
9 changes: 8 additions & 1 deletion Wabbajack.Installer/AInstaller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,17 @@ public async Task<bool> DownloadArchive(Archive archive, bool download, Cancella
await destination.Value.MoveToAsync(destination.Value.Parent.Combine(archive.Hash.ToHex()), true,
token);
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// No actual error. User canceled downloads.
}
catch (NotImplementedException) when (archive.State is GameFileSource)
{
_logger.LogError("Missing game file {name}. This could be caused by missing DLC or a modified installation.", archive.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "Download error for file {name}", archive.Name);
return false;
}

return false;
Expand Down
41 changes: 35 additions & 6 deletions Wabbajack.Networking.Http/ResumableDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using Downloader;
using Microsoft.Extensions.Logging;
using Wabbajack.Hashing.xxHash64;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
Expand All @@ -19,15 +20,18 @@ internal class ResumableDownloader
private readonly HttpRequestMessage _msg;
private readonly AbsolutePath _outputPath;
private readonly AbsolutePath _packagePath;
private readonly ILogger<SingleThreadedDownloader> _logger;
private CancellationToken _token;
private Exception? _error;

public ResumableDownloader(HttpRequestMessage msg, AbsolutePath outputPath, IJob job)

public ResumableDownloader(HttpRequestMessage msg, AbsolutePath outputPath, IJob job, ILogger<SingleThreadedDownloader> logger)
{
_job = job;
_msg = msg;
_outputPath = outputPath;
_packagePath = outputPath.WithExtension(Extension.FromPath(".download_package"));
_logger = logger;
}

public async Task<Hash> Download(CancellationToken token)
Expand All @@ -46,24 +50,37 @@ public async Task<Hash> Download(CancellationToken token)
// Resume with different Uri in case old one is no longer valid
downloadPackage.Address = _msg.RequestUri!.AbsoluteUri;

_logger.LogDebug("Download for {name} is resuming...", _outputPath.FileName.ToString());
await downloader.DownloadFileTaskAsync(downloadPackage, token);
}
else
{
_logger.LogDebug("Download for '{name}' is starting from scratch...", _outputPath.FileName.ToString());
_outputPath.Delete();
await downloader.DownloadFileTaskAsync(_msg.RequestUri!.AbsoluteUri, _outputPath.ToString(), token);
}

// Save progress if download isn't completed yet
if (downloader.Status is DownloadStatus.Stopped or DownloadStatus.Failed)
{
_logger.LogDebug("Download for '{name}' stopped before completion. Saving package...", _outputPath.FileName.ToString());
SavePackage(downloader.Package);
if (_error != null && _error.GetType() != typeof(TaskCanceledException))
if (_error == null || _error.GetType() == typeof(TaskCanceledException))
{
throw _error;
return new Hash();
}

return new Hash();
if (_error.GetType() == typeof(NotSupportedException))
{
_logger.LogWarning("Download for '{name}' doesn't support resuming. Deleting package...", _outputPath.FileName.ToString());
DeletePackage();
}
else
{
_logger.LogError(_error,"Download for '{name}' encountered error. Throwing...", _outputPath.FileName.ToString());
}

throw _error;
}

if (downloader.Status == DownloadStatus.Completed)
Expand All @@ -76,13 +93,16 @@ public async Task<Hash> Download(CancellationToken token)
return new Hash();
}

return await _outputPath.Open(FileMode.Open).Hash(token);
await using var file = _outputPath.Open(FileMode.Open);
return await file.Hash(token);
}

private DownloadConfiguration CreateConfiguration(HttpRequestMessage message)
{
var configuration = new DownloadConfiguration
{
Timeout = (int)TimeSpan.FromSeconds(120).TotalMilliseconds,
ReserveStorageSpaceBeforeStartingDownload = true,
RequestConfiguration = new RequestConfiguration
{
Headers = message.Headers.ToWebHeaderCollection(),
Expand All @@ -109,12 +129,21 @@ private async void OnDownloadProgressChanged(object? sender, DownloadProgressCha
}

await _job.Report(processedSize, _token);
if (_job.Current > _job.Size)
{
// Increase job size so progress doesn't appear stalled
_job.Size = (long)Math.Floor(_job.Current * 1.1);
}
}

private void OnDownloadStarted(object? sender, DownloadStartedEventArgs e)
{
_job.ResetProgress();
_job.Size = e.TotalBytesToReceive;

if (_job.Size < e.TotalBytesToReceive)
{
_job.Size = e.TotalBytesToReceive;
}

// Get rid of package, since we can't use it to resume anymore
DeletePackage();
Expand Down
19 changes: 17 additions & 2 deletions Wabbajack.Networking.Http/SingleThreadedDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,23 @@ public SingleThreadedDownloader(ILogger<SingleThreadedDownloader> logger, HttpCl
public async Task<Hash> Download(HttpRequestMessage message, AbsolutePath outputPath, IJob job,
CancellationToken token)
{
var downloader = new ResumableDownloader(message, outputPath, job);
return await downloader.Download(token);
Exception downloadError = null!;
var downloader = new ResumableDownloader(message, outputPath, job, _logger);
for (var i = 0; i < 3; i++)
{
try
{
return await downloader.Download(token);
}
catch (Exception ex)
{
downloadError = ex;
_logger.LogDebug("Download for '{name}' failed. Retrying...", outputPath.FileName.ToString());
}
}

_logger.LogError(downloadError, "Failed to download '{name}' after 3 tries.", outputPath.FileName.ToString());
return new Hash();

// using var response = await _client.SendAsync(message, HttpCompletionOption.ResponseHeadersRead, token);
// if (!response.IsSuccessStatusCode)
Expand Down
26 changes: 13 additions & 13 deletions Wabbajack.Services.OSIntegrated/ResourceSettingsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ public async Task<ResourceSetting> GetSettings(string name)
try
{
_settings ??= await _manager.Load<Dictionary<string, ResourceSetting>>("resource_settings");

if (_settings.TryGetValue(name, out var found)) return found;

var newSetting = new ResourceSetting
if (!_settings.ContainsKey(name))
{
MaxTasks = Environment.ProcessorCount,
MaxThroughput = 0
};

_settings.Add(name, newSetting);

await _manager.Save("resource_settings", _settings);

return _settings[name];
var newSetting = new ResourceSetting
{
MaxTasks = Environment.ProcessorCount,
MaxThroughput = 0
};

_settings.Add(name, newSetting);
await SaveSettings(_settings);
}

var setting = _settings[name];
return setting;
}
finally
{
Expand Down

0 comments on commit 00faee4

Please sign in to comment.