异步LINQ查询怎么写?

How to write Asynchronous LINQ query?

提问人:Morgan Cheng 提问时间:10/31/2008 最后编辑:John SaundersMorgan Cheng 更新时间:12/28/2017 访问量:52586

问:

在我看了一堆 LINQ 相关的东西后,我突然意识到没有文章介绍如何编写异步 LINQ 查询。

假设我们使用 LINQ to SQL,下面的语句很清楚。但是,如果 SQL 数据库响应缓慢,则使用此代码块的线程将受到阻碍。

var result = from item in Products where item.Price > 3 select item.Name;
foreach (var name in result)
{
    Console.WriteLine(name);
}

似乎当前的LINQ查询规范不支持这一点。

有没有办法对 LINQ 进行异步编程?它的工作方式就像有回调一样 当结果可供使用时发出通知,在 I/O 上没有任何阻塞延迟。

C# LINQ-to-SQL 异步

评论

0赞 TheSoftwareJedi 10/31/2008
下面的答案对你来说还行吗?
0赞 Richard Anthony Hein 3/24/2010
查看 msdn.microsoft.com/en-us/devlabs/ee794896.aspx 上的 Reactive Extensions for .NET,它是为异步 Linq 查询而设计的。
2赞 Tom Lianza 1/17/2012
确实如此,但不是作者要问的 Linq to SQL 查询。
1赞 Christian C. Salvadó 10/31/2008
这是一篇有趣的文章: LINQ to SQL:异步执行查询 这是关于使用 ThreadPool 以及使用 DataContext 的 GetCommand 方法
0赞 Alexei Levenkov 9/11/2014
如果要查找非 Linq-to-SQL 的就绪版本,请参阅 stackoverflow.com/a/25789815/477420async.Select

答:

38赞 TheSoftwareJedi 10/31/2008 #1

虽然 LINQ 本身并没有真正做到这一点,但框架本身确实......您可以轻松地在 30 行左右的时间内滚动自己的异步查询执行器...事实上,我只是把这个放在一起给你:)

编辑:通过写这篇文章,我发现了为什么他们没有实现它。它无法处理匿名类型,因为它们的作用域是本地的。因此,您无法定义回调函数。这是一件非常重要的事情,因为很多 linq to sql 的东西都在 select 子句中创建它们。以下任何一个建议都遭受了同样的命运,所以我仍然认为这个是最容易使用的!

编辑:唯一的解决方案是不使用匿名类型。您可以将回调声明为仅采用 IEnumerable(无类型参数),并使用反射来访问字段 (ICK!!)。另一种方法是将回调声明为“动态”......哦。。。等。。。那还没出来。:)这是如何使用动态的另一个不错的例子。有些人可能称之为虐待。

把它扔到你的实用程序库中:

public static class AsynchronousQueryExecutor
{
    public static void Call<T>(IEnumerable<T> query, Action<IEnumerable<T>> callback, Action<Exception> errorCallback)
    {
        Func<IEnumerable<T>, IEnumerable<T>> func =
            new Func<IEnumerable<T>, IEnumerable<T>>(InnerEnumerate<T>);
        IEnumerable<T> result = null;
        IAsyncResult ar = func.BeginInvoke(
                            query,
                            new AsyncCallback(delegate(IAsyncResult arr)
                            {
                                try
                                {
                                    result = ((Func<IEnumerable<T>, IEnumerable<T>>)((AsyncResult)arr).AsyncDelegate).EndInvoke(arr);
                                }
                                catch (Exception ex)
                                {
                                    if (errorCallback != null)
                                    {
                                        errorCallback(ex);
                                    }
                                    return;
                                }
                                //errors from inside here are the callbacks problem
                                //I think it would be confusing to report them
                                callback(result);
                            }),
                            null);
    }
    private static IEnumerable<T> InnerEnumerate<T>(IEnumerable<T> query)
    {
        foreach (var item in query) //the method hangs here while the query executes
        {
            yield return item;
        }
    }
}

你可以这样使用它:

class Program
{

    public static void Main(string[] args)
    {
        //this could be your linq query
        var qry = TestSlowLoadingEnumerable();

        //We begin the call and give it our callback delegate
        //and a delegate to an error handler
        AsynchronousQueryExecutor.Call(qry, HandleResults, HandleError);

        Console.WriteLine("Call began on seperate thread, execution continued");
        Console.ReadLine();
    }

    public static void HandleResults(IEnumerable<int> results)
    {
        //the results are available in here
        foreach (var item in results)
        {
            Console.WriteLine(item);
        }
    }

    public static void HandleError(Exception ex)
    {
        Console.WriteLine("error");
    }

    //just a sample lazy loading enumerable
    public static IEnumerable<int> TestSlowLoadingEnumerable()
    {
        Thread.Sleep(5000);
        foreach (var i in new int[] { 1, 2, 3, 4, 5, 6 })
        {
            yield return i;
        }
    }

}

现在要把它放在我的博客上,非常方便。

评论

0赞 Richard Nienaber 10/31/2008
难道不能加入某种铸造吗?像这个链接:tomasp.net/articles/....
3赞 Andrew Harry 7/18/2009
实际上,您只是将块传递给线程池中的另一个线程?还是这里有什么魔力在发生?
26赞 Chris Moschini 1/24/2011
这是 .Net 中的一个常见错误 - 在同步代码之上应用 IAsync 模式,而不是委托给利用 Windows I/O 完成端口来传递等待时间的基础异步调用,因此它不会阻塞任何线程。正如 Harry 所指出的,这会将块交给另一个 ThreadPool 线程。这只有一个小好处,即启动线程可以自由地执行其他辅助工作。如果有足够多的并发请求传入导致长查询,则此方法可能会阻塞所有 ThreadPool 线程。
4赞 Guillaume86 8/16/2012
我没有看到任何解决Linq-to-sql特定问题(非线程安全连接)的内容
16赞 Michael Freidgeim 7/3/2011 #2

TheSoftwareJedi 和 ulrikb(又名 user316318)解决方案适用于任何 LINQ 类型,但(正如 Chris Moschini 所指出的)不会委托给利用 Windows I/O 完成端口的基础异步调用。

Wesley Bakker 的异步 DataContext 文章(由 Scott Hanselman 的博客文章触发)描述了使用 sqlCommand.BeginExecuteReader/sqlCommand.EndExecuteReader 的 LINQ to SQL 类,该类利用 Windows I/O 完成端口。

I/O 完成端口为在多处理器系统上处理多个异步 I/O 请求提供了高效的线程模型。

评论

0赞 Søren Boisen 5/13/2015
这应该是公认的答案,因为它实际上指定了如何以适当的异步方式执行操作,这将使程序受益,因为它不会占用等待操作完成的线程。
4赞 James Dunne 8/17/2011 #3

我启动了一个名为 Asynq 的简单 github 项目来执行异步 LINQ-to-SQL 查询。这个想法很简单,尽管在这个阶段是“脆弱的”(截至 2011 年 8 月 16 日):

  1. 让 LINQ-to-SQL 完成“繁重”的工作,将 .IQueryableDbCommandDataContext.GetCommand()
  2. 对于 SQL 200[058],从你从中获取的抽象实例进行强制转换,以获得一个 .如果您使用的是 SQL CE,那么您就不走运了,因为它没有公开 和 的异步模式。DbCommandGetCommand()SqlCommandSqlCeCommandBeginExecuteReaderEndExecuteReader
  3. 使用和关闭使用标准 .NET Framework 异步 I/O 模式,让自己在传递给该方法的完成回调委托中获取一个委托。BeginExecuteReaderEndExecuteReaderSqlCommandDbDataReaderBeginExecuteReader
  4. 现在我们有一个 它,我们不知道它包含哪些列,也不知道如何将这些值映射回 's(在连接的情况下很可能是匿名类型)。当然,在这一点上,你可以手写你自己的列映射器,将其结果具体化到你的匿名类型或其他类型中。您必须为每个查询结果类型编写一个新的查询结果类型,具体取决于 LINQ-to-SQL 如何处理 IQueryable 以及它生成的 SQL 代码。这是一个非常讨厌的选择,我不推荐它,因为它不可维护,也不会总是正确的。LINQ-to-SQL 可以根据传入的参数值更改查询窗体,例如,生成与 不同的 SQL,并且可能生成不同的结果集架构。最好的办法是以编程方式处理此具体化问题:DbDataReaderIQueryableElementTypequery.Take(10).Skip(0)query.Take(10).Skip(10)
  5. “重新实现”一个简单的运行时对象具体化器,该化器根据 Type 的 LINQ-to-SQL 映射属性按定义的顺序将列从 中提取出来。正确实现这一点可能是该解决方案中最具挑战性的部分。DbDataReaderElementTypeIQueryable

正如其他人所发现的,该方法不处理匿名类型,只能直接映射到正确属性化的 LINQ-to-SQL 代理对象。由于大多数值得在 LINQ 中编写的查询都将涉及复杂的联接,这些联接不可避免地最终需要匿名类型作为最终的 select 子句,因此无论如何,使用这种提供的淡化方法都是毫无意义的。DataContext.Translate()DbDataReaderDataContext.Translate()

在利用现有的成熟 LINQ-to-SQL IQueryable 提供程序时,此解决方案存在一些小缺点:

  1. 不能将单个对象实例映射到 ,在 的最后一个 select 子句中将多个匿名类型属性映射到 ,例如 .LINQ-to-SQL 在内部跟踪哪个列序号映射到哪些属性;它不会向最终用户公开此信息,因此您不知道 中的哪些列是重用的,哪些是“不同的”。IQueryablefrom x in db.Table1 select new { a = x, b = x }DbDataReader
  2. 您不能在最终的 select 子句中包含常量值 - 这些值不会被翻译成 SQL,并且不会出现在 因此,您必须构建自定义逻辑才能从 的树中提取这些常量值,这将是相当麻烦的,而且根本不合理。DbDataReaderIQueryableExpression

我确信还有其他查询模式可能会中断,但这是我能想到的可能导致现有 LINQ-to-SQL 数据访问层出现问题的两种最大的查询模式。

这些问题很容易解决 - 只是不要在查询中执行它们,因为这两种模式都不会为查询的最终结果提供任何好处。希望这个建议适用于所有可能导致对象具体化问题的查询模式:-P。无法访问 LINQ-to-SQL 的列映射信息是一个很难解决的问题。

解决该问题的更“完整”的方法是有效地重新实现几乎所有的 LINQ-to-SQL,这有点耗时:-P。从高质量的开源 LINQ-to-SQL 提供程序实现开始将是一个很好的方法。您需要重新实现它的原因是,您可以访问用于将结果具体化到对象实例的所有列映射信息,而不会丢失任何信息。DbDataReader

评论

1赞 James Dunne 8/17/2011
忘了提到这个 github 项目是一项正在进行的工作。我积极劝阻您不要在任何生产代码中使用它。在这个阶段,这是一个简单的研究项目,用来证明它是可以做到的。
0赞 Chris Moschini 1/26/2012
具有类似解决方案的代码示例。hanselman.com/blog/......
7赞 Nenad 7/31/2016 #4

根据 Michael Freidgeim 的回答Scott Hansellman 提到的博客文章以及您可以使用 / 的事实,您可以实现可重用的方法,该方法异步执行底层:asyncawaitExecuteAsync<T>(...)SqlCommand

protected static async Task<IEnumerable<T>> ExecuteAsync<T>(IQueryable<T> query,
    DataContext ctx,
    CancellationToken token = default(CancellationToken))
{
    var cmd = (SqlCommand)ctx.GetCommand(query);

    if (cmd.Connection.State == ConnectionState.Closed)
        await cmd.Connection.OpenAsync(token);
    var reader = await cmd.ExecuteReaderAsync(token);

    return ctx.Translate<T>(reader);
}

然后你可以像这样(重新)使用它:

public async Task WriteNamesToConsoleAsync(string connectionString, CancellationToken token = default(CancellationToken))
{
    using (var ctx = new DataContext(connectionString))
    {
        var query = from item in Products where item.Price > 3 select item.Name;
        var result = await ExecuteAsync(query, ctx, token);
        foreach (var name in result)
        {
            Console.WriteLine(name);
        }
    }
}

评论

0赞 ogggre 10/28/2017
这是迄今为止查询对象的最干净的解决方案,它涵盖了大多数场景,而且非常简洁。但是有没有办法实现异步 SubmitChanges?例如,为了将新记录插入到表中。