异步和并行下载文件

Asynchronously and parallelly downloading files

提问人:aybe 提问时间:10/5/2013 最后编辑:aybe 更新时间:4/10/2023 访问量:15748

问:

编辑

我更改了问题的标题,以反映我遇到的问题,但也提供了有关如何轻松实现这一目标的答案。


我正在尝试使第二种方法返回而不是像第一种方法那样返回,但由于尝试修复它,我收到了一连串的错误。Task<TResult>Task

  • 我之前添加过returnawait body(partition.Current);
  • 反过来,它要求我在下面添加一个返回语句,所以我在下面添加了return null
  • 但现在 select 语句抱怨它无法从查询中推断出类型参数
  • 我改成但没有成功。Task.RunTask.Run<TResult>

我该如何解决?

第一种方法来自 http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx,第二种方法是我尝试创建的重载。

public static class Extensions
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current);
            }));
    }

    public static Task ForEachAsync<T, TResult>(this IEnumerable<T> source, int dop, Func<T, Task<TResult>> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current);
            }));
    }
}

使用示例:

使用这种方法,我想并行和异步下载多个文件:

private async void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
    Artist artist = await GetArtist();
    IEnumerable<string> enumerable = artist.Reviews.Select(s => s.ImageUrl);
    string[] downloadFile = await DownloadFiles(enumerable);
}

public static async Task<string[]> DownloadFiles(IEnumerable<string> enumerable)
{
    if (enumerable == null) throw new ArgumentNullException("enumerable");
    await enumerable.ForEachAsync(5, s => DownloadFile(s));
    // Incomplete, the above statement is void and can't be returned
}

public static async Task<string> DownloadFile(string address)
{
    /* Download a file from specified address, 
        * return destination file name on success or null on failure */

    if (address == null)
    {
        return null;
    }

    Uri result;
    if (!Uri.TryCreate(address, UriKind.Absolute, out result))
    {
        Debug.WriteLine(string.Format("Couldn't create URI from specified address: {0}", address));
        return null;
    }

    try
    {
        using (var client = new WebClient())
        {
            string fileName = Path.GetTempFileName();
            await client.DownloadFileTaskAsync(address, fileName);
            Debug.WriteLine(string.Format("Downloaded file saved to: {0} ({1})", fileName, address));
            return fileName;
        }
    }
    catch (WebException webException)
    {
        Debug.WriteLine(string.Format("Couldn't download file from specified address: {0}", webException.Message));
        return null;
    }
}
C# foreach 任务 async-await

评论

1赞 Jon Skeet 10/5/2013
目前还不清楚你期望的结果是什么。您正在传入整个值序列,并在它们上执行相同的函数 - 您期望从返回的值中得到什么结果?TTask<TResult>
0赞 aybe 10/5/2013
我想得到一个 Task<string>在这种情况下,我在我的问题上添加了一个示例。
0赞 L.B 10/5/2013
“使用这种方法,我想并行和异步下载多个文件”:还不够吗?Parallel.Foreach
0赞 Matt Smith 10/5/2013
@Aybe,你是否希望它是你的情况,或者如果你真的想要,你会返回什么字符串?Task<IEnumerable<string>>Task<string>
2赞 svick 10/5/2013
@Aybe我想你还是不明白。想象一下,您正在下载两个页面,一个包含,另一个包含 .如果你要返回,你希望它包含什么字符串?鉴于您的代码,如果它返回 .foobarForEachAsync()Task<string>Task<string[]>

答:

31赞 aybe 10/6/2013 #1

我解决了它并将其发布在这里,可能会帮助任何遇到相同问题的人。

我最初需要一个小助手,它可以快速下载图像,但如果服务器没有快速响应,也会断开连接,所有这些都是并行和异步的。

此帮助程序将返回一个元组,其中包含远程路径、本地路径和异常(如果发生);非常有用,因为知道为什么错误的下载出错总是好的。我想我忘记了下载可能发生的任何情况,但欢迎您发表评论。

  • 指定要下载的 URL 列表
  • 您可以指定保存该文件名的本地文件名(如果没有,则为您生成一个本地文件名)
  • (可选)取消下载的持续时间(对于速度较慢或无法访问的服务器很方便)

您可以只使用自身或使用帮助程序进行并行和异步下载。DownloadFileTaskAsyncForEachAsync

带有如何使用它的示例的代码:

private async void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
    IEnumerable<string> enumerable = your urls here;
    var results = new List<Tuple<string, string, Exception>>();
    await enumerable.ForEachAsync(s => DownloadFileTaskAsync(s, null, 1000), (url, t) => results.Add(t));
}

/// <summary>
///     Downloads a file from a specified Internet address.
/// </summary>
/// <param name="remotePath">Internet address of the file to download.</param>
/// <param name="localPath">
///     Local file name where to store the content of the download, if null a temporary file name will
///     be generated.
/// </param>
/// <param name="timeOut">Duration in miliseconds before cancelling the  operation.</param>
/// <returns>A tuple containing the remote path, the local path and an exception if one occurred.</returns>
private static async Task<Tuple<string, string, Exception>> DownloadFileTaskAsync(string remotePath,
    string localPath = null, int timeOut = 3000)
{
    try
    {
        if (remotePath == null)
        {
            Debug.WriteLine("DownloadFileTaskAsync (null remote path): skipping");
            throw new ArgumentNullException("remotePath");
        }

        if (localPath == null)
        {
            Debug.WriteLine(
                string.Format(
                    "DownloadFileTaskAsync (null local path): generating a temporary file name for {0}",
                    remotePath));
            localPath = Path.GetTempFileName();
        }

        using (var client = new WebClient())
        {
            TimerCallback timerCallback = c =>
            {
                var webClient = (WebClient) c;
                if (!webClient.IsBusy) return;
                webClient.CancelAsync();
                Debug.WriteLine(string.Format("DownloadFileTaskAsync (time out due): {0}", remotePath));
            };
            using (var timer = new Timer(timerCallback, client, timeOut, Timeout.Infinite))
            {
                await client.DownloadFileTaskAsync(remotePath, localPath);
            }
            Debug.WriteLine(string.Format("DownloadFileTaskAsync (downloaded): {0}", remotePath));
            return new Tuple<string, string, Exception>(remotePath, localPath, null);
        }
    }
    catch (Exception ex)
    {
        return new Tuple<string, string, Exception>(remotePath, null, ex);
    }
}

public static class Extensions
{
    public static Task ForEachAsync<TSource, TResult>(
        this IEnumerable<TSource> source,
        Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor)
    {
        var oneAtATime = new SemaphoreSlim(5, 10);
        return Task.WhenAll(
            from item in source
            select ProcessAsync(item, taskSelector, resultProcessor, oneAtATime));
    }

    private static async Task ProcessAsync<TSource, TResult>(
        TSource item,
        Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor,
        SemaphoreSlim oneAtATime)
    {
        TResult result = await taskSelector(item);
        await oneAtATime.WaitAsync();
        try
        {
            resultProcessor(item, result);
        }
        finally
        {
            oneAtATime.Release();
        }
    }
}

我没有更改签名来选择并行度级别,我会让您根据需要进行调整。ForEachAsync

输出示例:

DownloadFileTaskAsync (null local path): generating a temporary file name for http://cache.thephoenix.com/secure/uploadedImages/The_Phoenix/Music/CD_Review/main_OTR_Britney480.jpg
DownloadFileTaskAsync (null local path): generating a temporary file name for http://ssimg.soundspike.com/artists/britneyspears_femmefatale_cd.jpg
DownloadFileTaskAsync (null local path): generating a temporary file name for http://a323.yahoofs.com/ymg/albumreviewsuk__1/albumreviewsuk-526650850-1301400550.jpg?ymm_1xEDE5bu0tMi
DownloadFileTaskAsync (null remote path): skipping
DownloadFileTaskAsync (time out due): http://hangout.altsounds.com/geek/gars/images/3/9/8/5/2375.jpg
DownloadFileTaskAsync (time out due): http://www.beat.com.au/sites/default/files/imagecache/630_315sr/images/article/header/2011/april/britney-spears-femme-fatale.jpg
DownloadFileTaskAsync (time out due): http://cache.thephoenix.com/secure/uploadedImages/The_Phoenix/Music/CD_Review/main_OTR_Britney480.jpg
DownloadFileTaskAsync (downloaded): http://newblog.thecmuwebsite.com/wp-content/uploads/2009/12/britneyspears1.jpg
DownloadFileTaskAsync (downloaded): http://newblog.thecmuwebsite.com/wp-content/uploads/2009/12/britneyspears1.jpg
DownloadFileTaskAsync (downloaded): http://static.guim.co.uk/sys-images/Music/Pix/site_furniture/2011/3/22/1300816812640/Femme-Fatale.jpg
DownloadFileTaskAsync (downloaded): http://www.sputnikmusic.com/images/albums/72328.jpg

过去需要长达 1 分钟才能获得相同的结果,现在只需 10 秒即可获得相同的结果:)

非常感谢这 2 篇文章的作者:

http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx

http://blogs.msdn.com/b/pfxteam/archive/2012/03/04/10277325.aspx

评论

1赞 nullable 12/5/2013
很棒的扩展方法!
2赞 BornToCode 9/24/2017
不应该写在 ?否则,您只会为“源”的每个项目无限运行任务,而基本上无视信号量限制(一次仅运行 x 个任务)?await oneAtATime.WaitAsync();TResult result = await taskSelector(item);
3赞 Jake H 5/3/2018
@BornToCode是绝对正确的。上面的代码,正如所写的那样,实际上将启动与没有信号量的 TPL 一样多的任务,而是只允许 具有最大 DOP。如果要剪切和粘贴上述代码,请移动 .WaitAsync() 在任务选择器的执行之上。此外,在不调用信号量的情况下,只有(不)并发任务才会运行。我建议将信号量创建为resultProcessor.Release(n)510... = new SemaphoneSlim(10,10)
-2赞 alelom 4/10/2023 #2

对于像我这样最近来到这里的人,我刚刚发现 a 是在 .NET 6 中实现的:https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0Parallel.ForEachAsync

它的用法正如您所期望的那样,并且它的优点是允许指定并行度。例如:

var someInputData = new []
{
    "someData1",
    "someData2",
    "someData3"
};

ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(someInputData, parallelOptions, 
    async (input, cancellationToken) => 
    {
       // Some async Func. 
       // The Func can make use of:
       // - the `input` variable, which will contain the element in the `someInputData` list;
       // - the `cancellationToken` variable, usable to cancel the async operation.
    });

另请参见 https://www.hanselman.com/blog/parallelforeachasync-in-net-6

评论

0赞 Theodor Zoulias 4/10/2023
我的理解是,OP 想要修改他们在互联网上找到的方法,以便它传播异步操作的结果。您的答案没有满足此要求。ForEachAsync
0赞 Theodor Zoulias 4/10/2023
Alelom 对于我的投票,我考虑到您在不同的问题中发布了相同的答案。你的答案在另一个问题中是主题的,所以我对这个答案投了赞成票,对这个答案投了反对票。不幸的是,另一个答案已被版主删除。如果你编辑这个答案,让它在这里也成为主题,我可能会改变我的投票。仅供参考,我也对这个问题和其他现有答案投了反对票,因为恕我直言,这是一个令人困惑的问题,自我回答很糟糕。