Подтвердить что ты не робот

Деятельность рабочего процесса сервисной шины

Я хотел бы получить доступ к очередям служебной шины и темам из Workflows с некоторыми конкретными действиями.

Я не смог найти ничего подходящего для этого сценария (эта статья MSDN и эта статья Романа Kiss) являются ближайшими.

Я хотел бы создать пользовательскую активность, которая использует QueueClient для асинхронного получения брокерских сообщений с использованием метода BeginReceive, реализованного с использованием шаблона async/await (см. мой вопрос об этом).

Прежде всего, я хотел бы спросить, есть ли у меня какие-либо причины, почему я предпочитаю предлагаемый подход (адаптированный WCF) вместо моего желаемого (используя QueueClient).

Тогда, я был бы признателен за помощь в его разработке настойчивым образом.

Update:

Это то, что я пробовал до сих пор:

public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
    [RequiredArgument]
    public InArgument<string> ConnectionString { get; set; }

    [RequiredArgument]
    public InArgument<string> Path { get; set; }

    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var connectionString = this.ConnectionString.Get(context);
        var path = this.Path.Get(context);
        var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
        var cts = new CancellationTokenSource();
        context.UserState = new ReceiveState
                                {
                                    CancellationTokenSource = cts,
                                    QueueClient = queueClient
                                };
        var task = ExecuteAsync(context, cts.Token);
        var tcs = new TaskCompletionSource<BrokeredMessage>(state);
        task.ContinueWith(
            t =>
                {
                    if (t.IsFaulted)
                    {
                        tcs.TrySetException(t.Exception.InnerExceptions);
                    }
                    else if (t.IsCanceled)
                    {
                        tcs.TrySetCanceled();
                    }
                    else
                    {
                        tcs.TrySetResult(t.Result);
                    }

                    if (callback != null)
                    {
                        callback(tcs.Task);
                    }
                });

        return tcs.Task;
    }

    protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = (Task<BrokeredMessage>)result;
        try
        {
            return task.Result;
        }
        catch (OperationCanceledException)
        {
            if (context.IsCancellationRequested)
            {
                context.MarkCanceled();
            }
            else
            {
                throw;
            }

            return null; // or throw?
        }
        catch (AggregateException exception)
        {
            if (exception.InnerException is OperationCanceledException)
            {
                if (context.IsCancellationRequested)
                {
                    context.MarkCanceled();
                }
                else
                {
                    throw;
                }

                return null; // or throw?
            }

            ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
            throw;
        }
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        var state = (ReceiveState)context.UserState;
        state.CancellationTokenSource.Cancel();
    }

    private async Task<BrokeredMessage> ExecuteAsync(
        AsyncCodeActivityContext context, CancellationToken cancellationToken)
    {
        var receiveState = context.UserState as ReceiveState;
        var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
            receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
        var completionTask = receiveTask.ContinueWith(
             t =>
                 {
                     BrokeredMessage result;
                     if (t.IsCanceled)
                     {
                         context.MarkCanceled();
                         result = null;
                     }
                     else if (t.IsFaulted)
                     {
                         result = null;
                     }
                     else
                     {

                         t.Result.Complete();
                         result = t.Result;
                     }

                     receiveState.QueueClient.Close();
                     return result;
                 },
             cancellationToken);
        return await completionTask;
    }

    private class ReceiveState
    {
        public CancellationTokenSource CancellationTokenSource { get; set; }

        public QueueClient QueueClient { get; set; }
    }
}

И протестировал этот путь (используя локальную служебную шину Windows Server):

var connectionString = new Variable<string>
                                   {
                                       Default = connectionStringValue
                                   };
        var path = new Variable<string>
                       {
                           Default = pathValue
                       };
        var test = new While
                       {
                           Body =
                               new Pick
                                   {
                                       Branches =
                                           {
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new AsyncReceiveBrokeredMessage
                                                               {
                                                                   ConnectionString = new InArgument<string>(connectionString),
                                                                   Path = new InArgument<string>(path)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Received message"
                                                               }
                                                   },
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new Delay
                                                               {
                                                                   Duration = TimeSpan.FromSeconds(10)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Timeout!"
                                                               }
                                                   }
                                           }
                                   },
                           Condition = true,
                           Variables = { connectionString, path }
                       };
        WorkflowInvoker.Invoke(test);

Я получаю сообщения, как ожидалось, если я их постоянно отправляю. Проблемы возникают после первого таймаута, потому что тогда я больше не получаю никакого сообщения. Любое разъяснение оценивается.

4b9b3361

Ответ 1

Сначала вам нужно знать некоторые важные вещи: 1) Рабочие процессы - это длительные процессы, которые должны быть восстановлены и восстановлены позже. 2) То, как рабочие процессы пробуждаются и восстанавливаются, - это закладки. 3) Обычно люди, как и их рабочие процессы, сохраняются, а также приостанавливаются. (Если вы не заботитесь о стойкости, почему вы используете WF в любом случае - только для визуального дизайна?)

Логическая проблема:

Если все ваши рабочие процессы и их действия сохраняются и приостанавливаются, то ни один из ваших кодов активности не загружается, поэтому кто выполняет прослушивание? Ответ: что-то другое, а не активность, должно быть тем, что слушает очередь ServiceBus и берет на себя ответственность за возобновление закладок, чтобы разбудить ваши рабочие процессы.

Что-то является рабочим процессом "Хост" или некоторым его расширением. Вот несколько сообщений в блоге о том, как вы можете настроить хост для прослушивания сообщений [с помощью кнопки GUI] и просыпать активность рабочего процесса.

http://blogs.msdn.com/b/tilovell/archive/2011/02/26/wf4-workflow-4-0-hosting-extensions-redux.aspx

http://blogs.msdn.com/b/tilovell/archive/2010/06/08/wf4-workflow-4-0-hosting-extensions.aspx

Что вы можете сделать, это взять этот код и адаптировать его для прослушивания в очереди ServiceBus вместо кнопки GUI и просыпать свою собственную операцию ReceiveFromServiceBus, которая аналогична функции PageActivity - обратите внимание, что вы должны писать NativeActivity в порядке для правильной работы с закладками.

Все довольно громоздко... но я верю, что "правильный" способ сделать это с WF.

Ответ 2

Объекты очереди предоставляют следующие возможности:    "Возможность указать время, в которое сообщение будет добавлено в очередь".

После некоторого тайм-аута вы не можете получить из-за этого правила?

Разрешение мая:

Обнаружение дубликатов входящих сообщений, позволяющих клиентам отправлять одно и то же сообщение несколько раз без неблагоприятных последствий.

Ответ 3

Возможно, проблема заключается в свойства DefaultMessageTimeToLive от TimeToLive.

NamespaceManager.CreateSubscription(
        new SubscriptionDescription(TopicName, SubscriptionName)
            {
                LockDuration = TimeSpan.FromMinutes(5),
                DefaultMessageTimeToLive = TimeSpan.FromDays(7),
                EnableDeadLetteringOnMessageExpiration = true
            });