Подтвердить что ты не робот

Объединение асинхронного вычисления в синхронное (блокирующее) вычисление

похожие вопросы:

У меня есть объект с методом, который я хотел бы представить клиентам библиотеки (особенно клиентам сценариев) как что-то вроде:

interface MyNiceInterface
{
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
    public Future<Baz> doSomething(Foo fooArg, Bar barArg);
    // doSomethingAndBlock is the straightforward way;
    // doSomething has more control but deals with
    // a Future and that might be too much hassle for
    // scripting clients
}

но примитивный "материал", который у меня есть, представляет собой набор управляемых событиями классов:

interface BazComputationSink
{
    public void onBazResult(Baz result);
}

class ImplementingThing
{
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}

где ImplementingThing принимает входные данные, выполняет некоторые непонятные вещи, такие как постановка в очередь задач в очереди задач, а затем, когда возникает результат, sink.onBazResult() в потоке, который может быть или не быть тем же потоком, что ImplementingThing.doSomethingAsync() назывался.

Есть ли способ, которым я могу использовать функции, управляемые событиями, которые у меня есть, наряду с примитивами параллелизма, для реализации MyNiceInterface, чтобы клиенты сценариев могли счастливо ждать в блокирующем потоке?

редактировать: я могу использовать FutureTask для этого?

4b9b3361

Ответ 1

Использование вашей собственной будущей реализации:

public class BazComputationFuture implements Future<Baz>, BazComputationSink {

    private volatile Baz result = null;
    private volatile boolean cancelled = false;
    private final CountDownLatch countDownLatch;

    public BazComputationFuture() {
        countDownLatch = new CountDownLatch(1);
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        if (isDone()) {
            return false;
        } else {
            countDownLatch.countDown();
            cancelled = true;
            return !isDone();
        }
    }

    @Override
    public Baz get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return result;
    }

    @Override
    public Baz get(final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return result;
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public boolean isDone() {
        return countDownLatch.getCount() == 0;
    }

    public void onBazResult(final Baz result) {
        this.result = result;
        countDownLatch.countDown();
    }

}

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    BazComputationFuture future = new BazComputationFuture();
    doSomethingAsync(fooArg, barArg, future);
    return future;
}

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    return doSomething(fooArg, barArg).get();
}

Решение создает встроенный CountDownLatch, который очищается после получения обратного вызова. Если пользователь вызывает get, CountDownLatch используется для блокировки вызывающего потока до завершения вычисления и вызова обратного вызова onBazResult. CountDownLatch гарантирует, что если обратный вызов возникает до того, как get() вызывается, метод get() немедленно вернется с результатом.

Ответ 2

Ну, есть простое решение сделать что-то вроде:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
  final AtomicReference<Baz> notifier = new AtomicReference();
  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
    public void onBazResult(Baz result) {
      synchronized (notifier) {
        notifier.set(result);
        notifier.notify();
      }
    }
  });
  synchronized (notifier) {
    while (notifier.get() == null)
      notifier.wait();
  }
  return notifier.get();
}

Конечно, это предполагает, что ваш результат Baz никогда не будет равен нулю...

Ответ 3

В google guava library есть простой в использовании SettableFuture, который делает эту проблему очень простой (около 10 строк кода).

public class ImplementingThing {

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    try {
        return doSomething(fooArg, barArg).get();
    } catch (Exception e) {
        throw new RuntimeException("Oh dear");
    }
};

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    final SettableFuture<Baz> future = new SettableFuture<Baz>();
    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        @Override
        public void onBazResult(Baz result) {
            future.set(result);
        }
    });
    return future;
};

// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.

public static class Baz {}
public static class Foo {}
public static class Bar {}

public static interface BazComputationSink {
    public void onBazResult(Baz result);
}

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Baz baz = new Baz();
            sink.onBazResult(baz);
        }
    }).start();
};

public static void main(String[] args) {
    System.err.println("Starting Main");
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
    System.err.println("Ending Main");
}

Ответ 4

Очень простой пример: просто понять CountDownLatch без каких-либо дополнительный код.

A java.util.concurrent.CountDownLatch представляет собой конструкцию concurrency, которая позволяет одному или нескольким потокам ждать завершения заданного набора операций.

A CountDownLatch инициализируется данным подсчетом. Этот счет уменьшается с помощью вызовов метода countDown(). Темы, ожидающие, что этот счет достигнет нуля, могут вызвать один из методов await(). Вызов await() блокирует поток до тех пор, пока счетчик не достигнет нуля.

Ниже приведен простой пример. После того, как Decrementer вызвал countDown() 3 раза на CountDownLatch, ожидающий официант освобождается от вызова await().

Вы также можете упомянуть некоторые TimeOut для ожидания.

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

Thread.sleep(4000);
public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

//--------------

public class Decrementer implements Runnable {

    CountDownLatch latch = null;

    public Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Ссылка

Если вы не хотите использовать CountDownLatch, или ваше требование - это то же самое, что и Facebook, и в отличие от функциональности. Значит, если вызывается один метод, не вызывайте другой метод.

В этом случае вы можете объявить

private volatile Boolean isInprocessOfLikeOrUnLike = false;

а затем вы можете проверить начало вызова метода, если он false, тогда метод вызова иначе возвращает.. зависит от вашей реализации.

Ответ 5

Это очень просто с RxJava 2.x:

try {
    Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
            doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
            .toFuture().get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

Или без лямбда-нотации:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                @Override
                public void subscribe(SingleEmitter<Baz> emitter) {
                    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                        @Override
                        public void onBazResult(Baz result) {
                            emitter.onSuccess(result);
                        }
                    });
                }
            }).toFuture().get();

Еще проще:

Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                .blockingGet();

Ответ 6

Здесь более общее решение, основанное на Поле Вагланде, отвечает:

public abstract class AsyncRunnable<T> {
    protected abstract void run(AtomicReference<T> notifier);

    protected final void finish(AtomicReference<T> notifier, T result) {
        synchronized (notifier) {
            notifier.set(result);
            notifier.notify();
        }
    }

    public static <T> T wait(AsyncRunnable<T> runnable) {
        final AtomicReference<T> notifier = new AtomicReference<>();

        // run the asynchronous code
        runnable.run(notifier);

        // wait for the asynchronous code to finish
        synchronized (notifier) {
            while (notifier.get() == null) {
                try {
                    notifier.wait();
                } catch (InterruptedException ignore) {}
            }
        }

        // return the result of the asynchronous code
        return notifier.get();
    }
}

Вот пример того, как его использовать::

    String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
        @Override
        public void run(final AtomicReference<String> notifier) {
            // here goes your async code, e.g.:
            new Thread(new Runnable() {
                @Override
                public void run() {
                    finish(notifier, "This was a asynchronous call!");
                }
            }).start();
        }
    });

Более подробную версию кода можно найти здесь: http://pastebin.com/hKHJUBqE

EDIT: Пример, связанный с вопросом:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
        @Override
        protected void run(final AtomicReference<Baz> notifier) {
            doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                public void onBazResult(Baz result) {
                    synchronized (notifier) {
                        notifier.set(result);
                        notifier.notify();
                    }
                }
            });
        }
    });
}

Ответ 7

Самый простой способ (который работает для меня) заключается в

  1. Создать очередь блокировки
  2. Вызовите асинхронный метод - используйте обработчик, который предлагает результат в эту очередь блокировки.
  3. Опрос очереди (то, где вы блокируете) для результата.

    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException {
        final BlockingQueue<Baz> blocker = new LinkedBlockingQueue();
        doSomethingAsync(fooArg, barArg, blocker::offer);
        // Now block until response or timeout
        return blocker.poll(30, TimeUnit.SECONDS);
    }