Подтвердить что ты не робот

Будущее с таймаутом в Scala

Предположим, что у меня есть функция, которая вызывает блокирующую прерывистую операцию. Я хотел бы запустить его асинхронно с таймаутом. То есть, я хотел бы прервать функцию, когда время ожидания истекло.

Поэтому я пытаюсь сделать что-то вроде этого:

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); aref.get().interrupt} // 1
  Future {aref.set(Thread.currentThread); Try(f())}    // 2
}

Проблема заключается в том, что aref в (1) может быть нулевым, потому что (2) еще не установил его в текущий поток. В этом случае я хотел бы подождать, пока не будет установлен aref. Каков наилучший способ сделать это?

4b9b3361

Ответ 1

Если вы добавите CountDownLatch, вы сможете достичь желаемого поведения. (Обратите внимание, что блокирование (т.е. Застревание в await) во многих партиях Future может привести к голоданию пулов потоков.)

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
  val cdl = new java.util.concurrent.CountDownLatch(1)

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt}   // 1
  Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())}  // 2
}

Ответ 2

Вы можете немного упростить подход, используя Ожидание. Метод Await.result принимает продолжительность тайм-аута в качестве второго параметра и выдает TimeoutException время ожидания.

try {
  import scala.concurrent.duration._
  Await.result(aref, 10 seconds);
} catch {
    case e: TimeoutException => // whatever you want to do.
}

Ответ 3

Мне тоже нужно было такое же поведение, поэтому я решил это. Я в основном создал объект, который создает таймер и не выполняет обещание с исключением TimeoutException, если будущее не завершено в указанную продолжительность.

package mypackage

import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global

object TimeoutFuture {

  val actorSystem = ActorSystem("myActorSystem")
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    val promise = Promise[A]()
    actorSystem.scheduler.scheduleOnce(timeout) {
      promise tryFailure new java.util.concurrent.TimeoutException
    }

    Future {
      try {
        promise success block
      }
      catch {
        case e:Throwable => promise failure e
      } 
    }

    promise.future
  }
}

Ответ 4

Хотя у вас уже есть ответы на вопрос о том, как добиться этого, блокируя дополнительный поток для обработки тайм-аута, я предлагаю вам попробовать по-другому, по причине, которую дал Рекс Керр. Я точно не знаю, что вы делаете в f(), но если это привязка ввода/вывода, я бы предложил вам просто использовать асинхронную библиотеку ввода-вывода. Если это какой-то цикл, вы можете передать значение тайм-аута непосредственно в эту функцию и выбросить TimeoutException там, если оно превышает таймаут. Пример:

import scala.concurrent.duration._
import java.util.concurrent.TimeoutException

def doSth(timeout: Deadline) = {
  for {
    i <- 0 to 10
  } yield {
    Thread.sleep(1000)
    if (timeout.isOverdue)
      throw new TimeoutException("Operation timed out.")

    i
  }
}

scala> future { doSth(12.seconds.fromNow) }
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  [email protected]

scala> Await.result(res3, Duration.Inf)
res6: scala.collection.immutable.IndexedSeq[Int] =
  Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> future { doSth(2.seconds.fromNow) }
res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  [email protected]

scala> Await.result(res7, Duration.Inf)
java.util.concurrent.TimeoutException: Operation timed out.
    at $anonfun$doSth$1.apply$mcII$sp(<console>:17)
    at $anonfun$doSth$1.apply(<console>:13)
    at $anonfun$doSth$1.apply(<console>:13)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    ...

scala> res7.value
res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] =
  Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))

В этом случае будет использоваться только один поток, который будет завершен после тайм-аута + времени выполнения всего одного шага.

Ответ 5

Вы также можете попробовать использовать CountDownLatch следующим образом:

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  val latch = new CountDownLatch(1)
  Future {
    latch.await()
    aref.get().interrupt
  }

  Future {
    aref.set(Thread.currentThread) 
    latch.countDown()
    Try(f())
  }
}

Теперь я жду всегда с моим вызовом latch.await(), но вы можете, конечно, изменить это на:

latch.await(1, TimeUnit.SECONDS)

а затем оберните его Try для обработки, если когда/если он истечет.