|
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
#nullable disable
using System;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using NuGet.Common;
using NuGet.Configuration;
using NuGet.Protocol.Core.Types;
using NuGet.Protocol.Model;
using NuGet.Protocol.Utility;
using NuGet.Shared;
using NuGet.Versioning;
namespace NuGet.Protocol
{
/// <summary>
/// Retrieves and caches service index.json files
/// ServiceIndexResourceV3 stores the json, all work is done in the provider
/// </summary>
public class ServiceIndexResourceV3Provider : ResourceProvider
{
private static readonly TimeSpan DefaultCacheDuration = TimeSpan.FromMinutes(40);
private readonly ConcurrentDictionary<string, ServiceIndexCacheInfo> _cache;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly EnhancedHttpRetryHelper _enhancedHttpRetryHelper;
private readonly IEnvironmentVariableReader _environmentVariableReader;
/// <summary>
/// Maximum amount of time to store index.json
/// </summary>
public TimeSpan MaxCacheDuration { get; protected set; }
public ServiceIndexResourceV3Provider() : this(environmentVariableReader: null) { }
internal ServiceIndexResourceV3Provider(IEnvironmentVariableReader environmentVariableReader)
: base(typeof(ServiceIndexResourceV3),
nameof(ServiceIndexResourceV3Provider),
NuGetResourceProviderPositions.Last)
{
_cache = new ConcurrentDictionary<string, ServiceIndexCacheInfo>(StringComparer.OrdinalIgnoreCase);
MaxCacheDuration = DefaultCacheDuration;
_environmentVariableReader = environmentVariableReader;
_enhancedHttpRetryHelper = new EnhancedHttpRetryHelper(environmentVariableReader ?? EnvironmentVariableWrapper.Instance);
}
public override async Task<Tuple<bool, INuGetResource>> TryCreate(SourceRepository source, CancellationToken token)
{
ServiceIndexResourceV3 index = null;
ServiceIndexCacheInfo cacheInfo = null;
var url = source.PackageSource.Source;
// the file type can easily rule out if we need to request the url
if (source.PackageSource.ProtocolVersion == 3 ||
(source.PackageSource.IsHttp &&
url.EndsWith(".json", StringComparison.OrdinalIgnoreCase)))
{
var utcNow = DateTime.UtcNow;
var entryValidCutoff = utcNow.Subtract(MaxCacheDuration);
// check the cache before downloading the file
if (!_cache.TryGetValue(url, out cacheInfo) ||
entryValidCutoff > cacheInfo.CachedTime)
{
await _semaphore.WaitAsync(token);
try
{
// check the cache again, another thread may have finished this one waited for the lock
if (!_cache.TryGetValue(url, out cacheInfo) ||
entryValidCutoff > cacheInfo.CachedTime)
{
index = await GetServiceIndexResourceV3(source, utcNow, NullLogger.Instance, token);
// cache the value even if it is null to avoid checking it again later
var cacheEntry = new ServiceIndexCacheInfo
{
CachedTime = utcNow,
Index = index
};
// If the cache entry has expired it will already exist
_cache.AddOrUpdate(url, cacheEntry, (key, value) => cacheEntry);
}
}
finally
{
_semaphore.Release();
}
}
}
if (index == null && cacheInfo != null)
{
index = cacheInfo.Index;
}
return new Tuple<bool, INuGetResource>(index != null, index);
}
/// <summary>
/// Read the source's end point to get the index json.
/// Retries are logged to any provided <paramref name="log"/> as LogMinimal.
/// </summary>
/// <param name="source"></param>
/// <param name="utcNow"></param>
/// <param name="log"></param>
/// <param name="token"></param>
/// <exception cref="OperationCanceledException">Logged to any provided <paramref name="log"/> as LogMinimal prior to throwing.</exception>
/// <exception cref="FatalProtocolException">Encapsulates all other exceptions.</exception>
/// <returns></returns>
private async Task<ServiceIndexResourceV3> GetServiceIndexResourceV3(
SourceRepository source,
DateTime utcNow,
ILogger log,
CancellationToken token)
{
var url = source.PackageSource.Source;
var httpSourceResource = await source.GetResourceAsync<HttpSourceResource>(token);
var client = httpSourceResource.HttpSource;
int maxRetries = _enhancedHttpRetryHelper.RetryCountOrDefault;
for (var retry = 1; retry <= maxRetries; retry++)
{
using (var sourceCacheContext = new SourceCacheContext())
{
var cacheContext = HttpSourceCacheContext.Create(sourceCacheContext, isFirstAttempt: retry == 1);
try
{
return await client.GetAsync(
new HttpSourceCachedRequest(
url,
"service_index",
cacheContext)
{
EnsureValidContents = stream => HttpStreamValidation.ValidateJObject(url, stream),
MaxTries = 1,
IsRetry = retry > 1,
IsLastAttempt = retry == maxRetries
},
async httpSourceResult =>
{
var result = await ConsumeServiceIndexStreamAsync(httpSourceResult.Stream, utcNow, source.PackageSource, token);
return result;
},
log,
token);
}
catch (OperationCanceledException ex)
{
var message = ExceptionUtilities.DisplayMessage(ex);
log.LogMinimal(message);
throw;
}
catch (Exception ex) when (retry < maxRetries)
{
var message = string.Format(CultureInfo.CurrentCulture, Strings.Log_RetryingServiceIndex, url)
+ Environment.NewLine
+ ExceptionUtilities.DisplayMessage(ex);
log.LogMinimal(message);
if (ex.InnerException != null &&
ex.InnerException is IOException &&
ex.InnerException.InnerException != null &&
ex.InnerException.InnerException is System.Net.Sockets.SocketException)
{
// An IO Exception with inner SocketException indicates server hangup ("Connection reset by peer").
// Azure DevOps feeds sporadically do this due to mandatory connection cycling.
// Stalling an extra <ExperimentalRetryDelayMilliseconds> gives Azure more of a chance to recover.
log.LogVerbose("Enhanced retry: Encountered SocketException, delaying between tries to allow recovery");
await Task.Delay(TimeSpan.FromMilliseconds(_enhancedHttpRetryHelper.DelayInMillisecondsOrDefault), token);
}
}
catch (Exception ex) when (retry == maxRetries)
{
var message = string.Format(CultureInfo.CurrentCulture, Strings.Log_FailedToReadServiceIndex, url);
throw new FatalProtocolException(message, ex);
}
}
}
return null;
}
private async Task<ServiceIndexResourceV3> ConsumeServiceIndexStreamAsync(Stream stream, DateTime utcNow, PackageSource source, CancellationToken token)
{
if (NuGetFeatureFlags.UseSystemTextJsonDeserializationFeatureSwitch
|| NuGetFeatureFlags.IsSystemTextJsonDeserializationEnabledByEnvironment(_environmentVariableReader))
{
return await ConsumeServiceIndexStreamStjAsync(stream, utcNow, source, token);
}
else
{
return await ConsumeServiceIndexStreamNsjAsync(stream, utcNow, source, token);
}
}
private static async Task<ServiceIndexResourceV3> ConsumeServiceIndexStreamStjAsync(Stream stream, DateTime utcNow, PackageSource source, CancellationToken token)
{
ServiceIndexModel index;
try
{
index = await JsonSerializer.DeserializeAsync(stream, JsonContext.Default.ServiceIndexModel, token);
}
catch (JsonException ex)
{
throw new InvalidDataException(string.Format(
CultureInfo.CurrentCulture,
Strings.Protocol_InvalidJsonObject,
source.Source), ex);
}
if (index is null)
{
throw new InvalidDataException(string.Format(
CultureInfo.CurrentCulture,
Strings.Protocol_InvalidJsonObject,
source.Source));
}
// Use SemVer instead of NuGetVersion; the service index should always be in strict SemVer format.
if (!SemanticVersion.TryParse(index.Version, out SemanticVersion version) || version.Major != 3)
{
throw new InvalidDataException(string.Format(
CultureInfo.CurrentCulture,
Strings.Protocol_UnsupportedVersion,
index.Version));
}
return new ServiceIndexResourceV3(index, utcNow, source);
}
private static async Task<ServiceIndexResourceV3> ConsumeServiceIndexStreamNsjAsync(Stream stream, DateTime utcNow, PackageSource source, CancellationToken token)
{
// Parse the JSON
JObject json = await stream.AsJObjectAsync(token);
// Use SemVer instead of NuGetVersion, the service index should always be
// in strict SemVer format
JToken versionToken;
if (json.TryGetValue("version", out versionToken) &&
versionToken.Type == JTokenType.String)
{
SemanticVersion version;
if (SemanticVersion.TryParse((string)versionToken, out version) &&
version.Major == 3)
{
return new ServiceIndexResourceV3(json, utcNow, source);
}
else
{
string errorMessage = string.Format(
CultureInfo.CurrentCulture,
Strings.Protocol_UnsupportedVersion,
(string)versionToken);
throw new InvalidDataException(errorMessage);
}
}
else
{
throw new InvalidDataException(Strings.Protocol_MissingVersion);
}
}
protected class ServiceIndexCacheInfo
{
public ServiceIndexResourceV3 Index { get; set; }
public DateTime CachedTime { get; set; }
}
}
}
|