处理异步调用序列Process Asynchronous Calls in Sequence

- 此内容更新于:2015-01-06
主题:

原文:

I am making a bunch or asynchronous calls to Azure Table Storage. For obvious reasons insertion of these records are not in the same order as they were invoked.

I am planning to introduce ConcurrentQueue to ensure sequence. Following sample code written as a POC seems to achieve desired result.

I am wondering is this the best way I can ensure asynchronous calls will be completed in sequence?

public class ProductService
{
    ConcurrentQueue<string> ordersQueue = new ConcurrentQueue<string>();
    //Place make calls here
    public void PlaceOrder()
    {
        Task.Run(() =>
        {
            Parallel.For(0, 100, (i) =>
            {
                string item = "Product " + i;
                ordersQueue.Enqueue(item);
                Console.WriteLine("Placed Order: " + item);
                Task.Delay(2000).Wait();
            });

        });

    }

    //Process calls in sequence, I am hoping concurrentQueue will be consistent.
    public void Deliver()
    {
        Task.Run(() =>
        {
            while(true)
            {
                string productId;
                ordersQueue.TryDequeue(out productId);
                if (!string.IsNullOrEmpty(productId))
                {
                    Console.WriteLine("Delivered: " + productId);
                }
            }
        });
    }
}
Richard的回复:1。如果你想做的事情,然后使用api并发、并行执行是永远不会成为一个好的开始。2。异步方法是好当你避免阻塞操作,但前提是你把实际的阻塞操作是异步:ie。不要# 39;t块的执行线程锁(ConcurrentQueue<T> .Enqueue)。

(原文:1. If you want to do things in order, then using APIs for concurrent, Parallel, execution is never going to be a good start. 2. Asynchronous approaches are good when you avoid blocking operations, but only if you convert the actual blocking operations to be asynchronous: ie. don't block the thread of execution on a lock (ConcurrentQueue<T>.Enqueue).)

Vimal CK的回复:首先的任务。延迟不是一个好的选择....和数百项队列,这意味着现在你有100件插入,每将一个异步请求Azure。假设您有100核心机和你有任务,不能保证时将完成

(原文:First of All Task.Delay is not a good choice.... and your hundred items will De-Queue one by one, that means now you have 100 items to insert and each will be an async request to Azure. Suppose you have 8 core machine and you have 100 Tasks, you cannot assure which will complete when)

Enigmativity的回复:你能描述一下你# 39;再保险试图在更抽象的级别吗?例如,你只是想异步试图调用一系列顺序的azure插入吗?

(原文:Can you please describe what you're trying to do at a slightly more abstract level? For example, are you just trying to asynchronously trying to call an in-order series of azure inserts?)

解决方案:
如果你想按顺序处理异步记录,这听起来像是一个完美的适合TPL Dataflow ActionBlock。简单地创建一个块的行动来执行和发布记录。它支持异步行为和保持秩序: 它还支持并行处理和有界的能力如果你需要。
原文:

If you want to process records asynchronously and sequentially this sounds like a perfect fit for TPL Dataflow's ActionBlock. Simply create a block with the action to execute and post records to it. It supports async actions and keeps order:

var block = new ActionBlock<Product>(async product =>
{
    await product.ExecuteAsync();
});

block.Post(new Product());

It also supports processing in parallel and bounded capacity if you need.

解决方案:
原文:

Try using Microsoft's Reactive Framework.

This worked for me:

IObservable<Task<string>> query =
    from i in Observable.Range(0, 100, Scheduler.Default)
    let item = "Product " + i
    select AzureAsyncCall(item);

query
    .Subscribe(async x =>
    {
        var result = await x;
        /* do something with result */
    });

The AzureAsyncCall call signature I used was public Task<string> AzureAsyncCall(string x).

I dropped in a bunch of Console.WriteLine(Thread.CurrentThread.ManagedThreadId); calls to ensure I was getting the right async behaviour in my test code. It worked well.

All the calls were asynchronous and serialized one after the other.