Подтвердить что ты не робот

Отмена с будущим и обещание в Scala

Это продолжение моего предыдущего вопроса.

Предположим, что у меня есть задача, которая выполняет прерывистый блокирующий вызов. Я хотел бы запустить его как Future и отменить его с помощью метода failure Promise.

Я хотел бы, чтобы отмена работала следующим образом:

  • Если кто-то отменил задачу до ее завершения, мне бы хотелось, чтобы задача завершилась "немедленно", прерывая блокирующий вызов, если он уже запущен, и я хотел бы, чтобы Future вызывал onFailure.

  • Если отменить задачу после завершения задачи, я хотел бы получить статус, заявляющий, что отмена завершилась с момента завершения задачи.

Имеет ли смысл? Можно ли реализовать в Scala? Есть ли примеры таких реализаций?

4b9b3361

Ответ 1

scala.concurrent.Future доступен только для чтения, поэтому один читатель не может испортить вещи другим читателям.

Похоже, что вы должны реализовать то, что хотите:

def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
  val p = Promise[T]()
  val f = p.future
  p tryCompleteWith Future(fun(f))
  (f, () => p.tryFailure(new CancellationException))
}

val (f, cancel) = cancellableFuture( future => {
  while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag

  result  // when we're done, return some result
})

val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)

Ответ 2

Вот прерванная версия кода Виктора на его комментарии (Виктор, пожалуйста, исправьте меня, если я неверно истолковал).

object CancellableFuture extends App {

  def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
    val p = Promise[T]()
    val f = p.future
    val aref = new AtomicReference[Thread](null)
    p tryCompleteWith Future {
      val thread = Thread.currentThread
      aref.synchronized { aref.set(thread) }
      try fun() finally {
        val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
        //Deal with interrupted flag of this thread in desired
      }
    }

    (f, () => {
      aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
      p.tryFailure(new CancellationException)
    })
  }

  val (f, cancel) = interruptableFuture[Int] { () =>
    val latch = new CountDownLatch(1)

    latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
    println("latch timed out")

    42  // Completed
  }

  f.onFailure { case ex => println(ex.getClass) }
  f.onSuccess { case i => println(i) }

  Thread.sleep(6000)   // Set to less than 5000 to cancel

  val wasCancelled = cancel()

  println("wasCancelled: " + wasCancelled)
}

С Thread.sleep(6000) вывод:

latch timed out
42
wasCancelled: false

С Thread.sleep(1000) вывод:

wasCancelled: true
class java.util.concurrent.CancellationException