File: Utility\FindPackagesByIdNupkgDownloader.cs
Web Access
Project: src\src\nuget-client\src\NuGet.Core\NuGet.Protocol\NuGet.Protocol.csproj (NuGet.Protocol)
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.


using System;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using NuGet.Common;
using NuGet.Packaging;
using NuGet.Packaging.Core;
using NuGet.Protocol.Core.Types;
using NuGet.Protocol.Events;

namespace NuGet.Protocol
{
    public class FindPackagesByIdNupkgDownloader
    {
        private readonly TaskResultCache<string, CacheEntry> _cacheEntries = new();

        private readonly object _nuspecReadersLock = new object();
        private readonly ConcurrentDictionary<string, NuspecReader> _nuspecReaders =
            new ConcurrentDictionary<string, NuspecReader>();

        private readonly HttpSource _httpSource;
        private readonly EnhancedHttpRetryHelper _enhancedHttpRetryHelper;

        public FindPackagesByIdNupkgDownloader(HttpSource httpSource) : this(httpSource, EnvironmentVariableWrapper.Instance) { }

        internal FindPackagesByIdNupkgDownloader(HttpSource httpSource, IEnvironmentVariableReader environmentVariableReader)
        {
            if (httpSource == null)
            {
                throw new ArgumentNullException(nameof(httpSource));
            }

            _httpSource = httpSource;
            _enhancedHttpRetryHelper = new EnhancedHttpRetryHelper(environmentVariableReader);
        }

        /// <summary>
        /// Gets a <see cref="NuspecReader"/> from a .nupkg. If the URL cannot be fetched or there is a problem
        /// processing the .nuspec, an exception is throw. This method uses HTTP caching to avoid downloading the
        /// package over and over (unless <see cref="SourceCacheContext.DirectDownload"/> is specified).
        /// </summary>
        /// <param name="identity">The package identity.</param>
        /// <param name="url">The URL of the .nupkg.</param>
        /// <param name="cacheContext">The cache context.</param>
        /// <param name="token">The cancellation token.</param>
        /// <returns>The .nuspec reader.</returns>
        public async Task<NuspecReader> GetNuspecReaderFromNupkgAsync(
            PackageIdentity identity,
            string url,
            SourceCacheContext cacheContext,
            ILogger logger,
            CancellationToken token)
        {
            NuspecReader? reader = null;

            lock (_nuspecReadersLock)
            {
                if (_nuspecReaders.TryGetValue(url, out reader))
                {
                    return reader;
                }
            }

            await ProcessNupkgStreamAsync(
                identity,
                url,
                stream =>
                {
                    reader = PackageUtilities.OpenNuspecFromNupkg(identity.Id, stream, logger);

                    return TaskResult.True;
                },
                cacheContext,
                logger,
                token);

            if (reader == null)
            {
                // The package was not found on the feed. This typically means
                // that the feed listed the package, but then returned 404 for the nupkg.
                // The cache needs to be invaldiated and the download call made again.
                throw new PackageNotFoundProtocolException(identity);
            }

            lock (_nuspecReadersLock)
            {
                _nuspecReaders[url] = reader;
            }

            return reader;
        }

        /// <summary>
        /// Copies a .nupkg stream to the <paramref name="destination"/> stream. If the .nupkg cannot be found or if
        /// there is a network problem, no stream copy occurs.
        /// </summary>
        /// <param name="identity">The package identity.</param>
        /// <param name="url">The URL of the .nupkg.</param>
        /// <param name="destination">The destination stream. The .nupkg will be copied to this stream.</param>
        /// <param name="cacheContext">The cache context.</param>
        /// <param name="token">The cancellation token.</param>
        /// <returns>Returns true if the stream was copied, false otherwise.</returns>
        public async Task<bool> CopyNupkgToStreamAsync(
            PackageIdentity identity,
            string url,
            Stream destination,
            SourceCacheContext cacheContext,
            ILogger logger,
            CancellationToken token)
        {
            if (!destination.CanSeek)
            {
                // In order to handle retries, we need to write to a temporary file, then copy to destination in one pass.
                string tempFilePath = Path.GetTempFileName();
                using Stream tempFile = new FileStream(tempFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None, bufferSize: 4096, FileOptions.DeleteOnClose);
                bool result = await CopyNupkgToStreamAsync(identity, url, tempFile, cacheContext, logger, token);

                tempFile.Position = 0;
                await tempFile.CopyToAsync(destination, token);

                return result;
            }
            else
            {
                return await ProcessNupkgStreamAsync(
                    identity,
                    url,
                    async stream =>
                    {
                        try
                        {
                            await stream.CopyToAsync(destination, token);
                            ProtocolDiagnostics.RaiseEvent(new ProtocolDiagnosticNupkgCopiedEvent(_httpSource.PackageSource, destination.Length, identity.Id));
                        }
                        catch when (!token.IsCancellationRequested)
                        {
                            destination.Position = 0;
                            destination.SetLength(0);
                            throw;
                        }
                    },
                    cacheContext,
                    logger,
                    token);
            }
        }

        /// <summary>
        /// Manages the different ways of getting a .nupkg stream when using the global HTTP cache. When a stream is
        /// found, the <paramref name="processStreamAsync"/> method is invoked on said stream. This deals with the
        /// complexity of <see cref="SourceCacheContext.DirectDownload"/>.
        /// </summary>
        /// <param name="identity">The package identity.</param>
        /// <param name="url">The URL of the .nupkg to fetch.</param>
        /// <param name="processStreamAsync">The method to process the stream.</param>
        /// <param name="cacheContext">The cache context.</param>
        /// <param name="token">The cancellation token.</param>
        /// <returns>
        /// Returns true if the stream was processed, false if the stream could not fetched (either from the HTTP cache
        /// or from the network).
        /// </returns>
        private async Task<bool> ProcessNupkgStreamAsync(
            PackageIdentity identity,
            string url,
            Func<Stream, Task> processStreamAsync,
            SourceCacheContext cacheContext,
            ILogger logger,
            CancellationToken token)
        {
            if (identity == null)
            {
                throw new ArgumentNullException(nameof(identity));
            }

            if (url == null)
            {
                throw new ArgumentNullException(nameof(url));
            }

            if (cacheContext == null)
            {
                throw new ArgumentNullException(nameof(cacheContext));
            }

            token.ThrowIfCancellationRequested();

            // Try to get the NupkgEntry from the in-memory cache. If we find a match, we can open the cache file
            // and use that as the source stream, instead of going to the package source.
            CacheEntry cacheEntry = await _cacheEntries.GetOrAddAsync(
                url,
                refresh: cacheContext.DirectDownload, // Don't read from the in-memory cache if we are doing a direct download.
                static state => state.caller.ProcessStreamAndGetCacheEntryAsync(
                    state.identity,
                    state.url,
                    state.processStreamAsync,
                    state.cacheContext,
                    state.logger,
                    state.token),
                (caller: this, identity, url, processStreamAsync, cacheContext, logger, token),
                token);

            // Process the NupkgEntry
            return await ProcessCacheEntryAsync(cacheEntry, processStreamAsync, token);
        }

        private async Task<CacheEntry> ProcessStreamAndGetCacheEntryAsync(
            PackageIdentity identity,
            string url,
            Func<Stream, Task> processStreamAsync,
            SourceCacheContext cacheContext,
            ILogger logger,
            CancellationToken token)
        {
            return await ProcessHttpSourceResultAsync(
                identity,
                url,
                async httpSourceResult =>
                {
                    if (httpSourceResult == null ||
                        httpSourceResult.Stream == null)
                    {
                        return new CacheEntry(cacheFile: null, alreadyProcessed: false);
                    }

                    if (httpSourceResult.CacheFile != null)
                    {
                        // Return the cache file name so that the caller can open the cache file directly
                        // and copy it to the destination stream.
                        return new CacheEntry(httpSourceResult.CacheFile, alreadyProcessed: false);
                    }
                    else
                    {
                        await processStreamAsync(httpSourceResult.Stream);

                        // When the stream came from the network directly, there is not cache file name. This
                        // happens when the caller enables DirectDownload.
                        return new CacheEntry(cacheFile: null, alreadyProcessed: true);
                    }
                },
                cacheContext,
                logger,
                token);
        }

        private async Task<T> ProcessHttpSourceResultAsync<T>(
            PackageIdentity identity,
            string url,
            Func<HttpSourceResult?, Task<T>> processAsync,
            SourceCacheContext cacheContext,
            ILogger logger,
            CancellationToken token)
        {
            PackageIdValidator.Validate(identity.Id);

            int maxRetries = _enhancedHttpRetryHelper.RetryCountOrDefault;

            for (var retry = 1; retry <= maxRetries; ++retry)
            {
                var httpSourceCacheContext = HttpSourceCacheContext.Create(cacheContext, isFirstAttempt: retry == 1);

                try
                {
                    return await _httpSource.GetAsync(
                        new HttpSourceCachedRequest(
                            url,
                            "nupkg_" + identity.Id.ToLowerInvariant() + "." + identity.Version.ToNormalizedString(),
                            httpSourceCacheContext)
                        {
                            EnsureValidContents = stream => HttpStreamValidation.ValidatePackageIdentity(url, stream, identity),
                            IgnoreNotFounds = true,
                            MaxTries = 1,
                            IsRetry = retry > 1,
                            IsLastAttempt = retry == maxRetries
                        },
                        async httpSourceResult => await processAsync(httpSourceResult),
                        logger,
                        token);
                }
                catch (TaskCanceledException) when (retry < maxRetries)
                {
                    // Requests can get cancelled if we got the data from elsewhere, no reason to warn.
                    var message = string.Format(CultureInfo.CurrentCulture, Strings.Log_CanceledNupkgDownload, url);

                    logger.LogMinimal(message);
                }
                catch (Exception ex) when (retry < maxRetries)
                {
                    var message = string.Format(
                            CultureInfo.CurrentCulture,
                            Strings.Log_FailedToDownloadPackage,
                            identity,
                            url)
                        + Environment.NewLine
                        + ExceptionUtilities.DisplayMessage(ex);

                    logger.LogMinimal(message);

                    if (ex.InnerException != null &&
                        ex.InnerException is IOException &&
                        ex.InnerException.InnerException != null &&
                        ex.InnerException.InnerException is System.Net.Sockets.SocketException)
                    {
                        // An IO Exception with inner SocketException indicates server hangup ("Connection reset by peer").
                        // Azure DevOps feeds sporadically do this due to mandatory connection cycling.
                        // Delaying gives Azure more of a chance to recover.
                        logger.LogVerbose("Enhanced retry: Encountered SocketException, delaying between tries to allow recovery");
                        await Task.Delay(TimeSpan.FromMilliseconds(_enhancedHttpRetryHelper.DelayInMillisecondsOrDefault), token);
                    }
                }
                catch (Exception ex) when (retry == maxRetries)
                {
                    var message = string.Format(
                            CultureInfo.CurrentCulture,
                            Strings.Log_FailedToDownloadPackage,
                            identity,
                            url)
                        + Environment.NewLine
                        + ExceptionUtilities.DisplayMessage(ex);

                    logger.LogError(message);
                }
            }

            return await processAsync(null);
        }

        private async Task<bool> ProcessCacheEntryAsync(
            CacheEntry cacheEntry,
            Func<Stream, Task> processStreamAsync,
            CancellationToken token)
        {
            if (cacheEntry.AlreadyProcessed)
            {
                return true;
            }

            if (cacheEntry.CacheFile == null)
            {
                return false;
            }

            // Acquire the lock on a file before we open it to prevent this process
            // from opening a file deleted by another HTTP request.
            using (var cacheStream = await ConcurrencyUtilities.ExecuteWithFileLockedAsync(
                cacheEntry.CacheFile,
                lockedToken =>
                {
                    return Task.FromResult(new FileStream(
                        cacheEntry.CacheFile,
                        FileMode.Open,
                        FileAccess.Read,
                        FileShare.ReadWrite | FileShare.Delete,
                        StreamExtensions.BufferSize));
                },
                token))
            {
                await processStreamAsync(cacheStream);

                return true;
            }
        }

        private class CacheEntry
        {
            public CacheEntry(string? cacheFile, bool alreadyProcessed)
            {
                CacheFile = cacheFile;
                AlreadyProcessed = alreadyProcessed;
            }

            public string? CacheFile { get; }
            public bool AlreadyProcessed { get; }
        }
    }
}