Подтвердить что ты не робот

Ожидая наблюдаемого

Итак, в печальные дни С# 4.0 я создал следующий класс "WorkflowExecutor", который позволил асинхронным рабочим потокам в потоке графического интерфейса, взломав продолжения IEnumerable "yield return" для ожидания наблюдаемых. Таким образом, следующий код на кнопке buttonClick просто запустит простой рабочий процесс, который обновит текст, ждет, когда вы нажмете кнопку2, и петли через 1 секунду.

public sealed partial class Form1 : Form {
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();

    public Form1() {
        InitializeComponent();
    }

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
        Text = "Initializing";
        var scheduler = new ControlScheduler(this);
        while (true) {
            yield return scheduler.WaitTimer(1000);
            Text = "Waiting for Click";
            yield return _button2Subject;
            Text = "Click Detected!";
            yield return scheduler.WaitTimer(1000);
            Text = "Restarting";
        }
    }

    void button1_Click(object sender, EventArgs e) {
        _workflowExecutor.Run(CreateAsyncHandler());
    }

    void button2_Click(object sender, EventArgs e) {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e) {
        _workflowExecutor.Stop();
    }
}

public static class TimerHelper {
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) {
        return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default);
    }
}

public sealed class WorkflowExecutor {
    IEnumerator<IObservable<Unit>> _observables;
    IDisposable _subscription;

    public void Run(IEnumerable<IObservable<Unit>> actions) {
        _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator();
        Continue();
    }

    void Continue() {
        if (_subscription != null) {
            _subscription.Dispose();
        }
        if (_observables.MoveNext()) {
            _subscription = _observables.Current.Subscribe(_ => Continue());
        }
    }

    public void Stop() {
        Run(null);
    }
}

Умная часть идеи, использующая продолжения "выхода" для выполнения асинхронной работы, была взята из идеи Даниэля Эрвикера AsyncIOPipe: http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/, тогда я добавил реактивную структуру поверх нее.

Теперь мне сложно переписать это, используя функцию async в С# 5.0, но похоже, что это должно быть просто. Когда я конвертирую наблюдаемые в задачи, они запускаются только один раз, а цикл while сбрасывается во второй раз. Любая помощь в исправлении будет отличной.

Все, что сказало/спросило, что делает механизм асинхронного/ожидающего, дает мне, что WorkflowExecutor не делает? Есть ли что-нибудь, что я могу сделать с async/await, что я не могу просто сделать (учитывая такой же код) с WorkflowExecutor?

4b9b3361

Ответ 1

Как вы заметили, задача - это одноразовая вещь, а не Наблюдаемый "поток событий". Хороший способ думать об этом (IMHO) - это диаграмма 2x2 на Rx team post about 2.0 Beta:

2x2 chart for task vs observable

В зависимости от обстоятельств (одноразовый против "потока" событий) сохранение Observable может иметь больше смысла.

Если вы можете прыгать до бета-версии Reactive 2.0, вы можете "подождать" наблюдаемых с этим. Например, моя собственная попытка "асинхронной/ожидающей" (приблизительной) версии вашего кода будет выглядеть следующим образом:

public sealed partial class Form1 : Form
{
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();

    private bool shouldRun = false;

    public Form1()
    {
        InitializeComponent();
    }

    async Task CreateAsyncHandler()
    {
        Text = "Initializing";
        while (shouldRun)
        {
            await Task.Delay(1000);
            Text = "Waiting for Click";
            await _button2Subject.FirstAsync();
            Text = "Click Detected!";
            await Task.Delay(1000);
            Text = "Restarting";
        }
    }

    async void button1_Click(object sender, EventArgs e)
    {
        shouldRun = true;
        await CreateAsyncHandler();
    }

    void button2_Click(object sender, EventArgs e)
    {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e)
    {
        shouldRun = false;
    }
}

Ответ 2

Как упоминал Джеймс, вы можете подождать IObservable <T> последовательность, начинающаяся с Rx v2.0 Beta. Поведение - вернуть последний элемент (до OnCompleted) или выбросить OnError, который был обнаружен. Если последовательность не содержит элементов, вы получите исключение InvalidOperationException.

Обратите внимание, используя это, вы можете получить все другие желаемые типы поведения:

  • Получить первый элемент, ожидая xs.FirstAsync()
  • Убедитесь, что существует только одно значение, ожидая xs.SingleAsync()
  • Когда вы в порядке с пустой последовательностью, ждите xs.DefaultIfEmpty()
  • Чтобы получить все элементы, ждите xs.ToArray() или ждите xs.ToList()

Вы можете делать еще более интересные вещи, например, вычислять результат агрегации, но наблюдать промежуточные значения с помощью Do и Scan:

var xs = Observable.Range(0, 10, Scheduler.Default);

var res = xs.Scan((x, y) => x + y)
            .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); });

Console.WriteLine("Done! The sum is {0}", await res);