Подтвердить что ты не робот

Создание временной привязки Enumeratee

Я хочу создать Play 2 Enumeratee, который принимает значения и выводит их, разбивая вместе, каждые x секунды/миллисекунды. Таким образом, в многопользовательской среде websocket с большим количеством пользовательского ввода можно было ограничить количество принимаемых кадров в секунду.

Я знаю, что можно группировать множество элементов вместе следующим образом:

val chunker = Enumeratee.grouped(
  Traversable.take[Array[Double]](5000) &>> Iteratee.consume()
)

Есть ли встроенный способ сделать это на основе времени, а не на основе количества элементов?

Я думал об этом как-то с запланированной работой Akka, но на первый взгляд это кажется неэффективным, и я не уверен, что возникнут проблемы с соглашением.

4b9b3361

Ответ 1

Как насчет этого? Надеюсь, это поможет вам.

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.fromCallback { () =>
       Promise.timeout(Some(queue), 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }

И этот документ также полезен для вас. http://www.playframework.com/documentation/2.0/Enumerators

UPDATE Это для версии play2.1.

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise
 import scala.concurrent._
 import ExecutionContext.Implicits.global

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.repeatM{
       Promise.timeout(queue, 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }

Ответ 2

Здесь я быстро определил итерацию, которая будет принимать значения из ввода для фиксированной длины времени t, измеренной в миллисекундах, и счетчика, который позволит вам группировать и дополнительно обрабатывать входной поток, разделенный на сегменты, построенные в такой длине t, Он полагается на JodaTime, чтобы отслеживать, сколько времени прошло с момента начала итерации.

def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = {
  var startTime = new Instant()

  def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = {
    val timePassed = new Interval(startTime, new Instant()).toDurationMillis

    input match {
      case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) }
      case Input.Empty => Cont[E, List[E]](i => step(state)(i))
      case Input.El(e) =>
        if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) }
        else Cont[E, List[E]](i => step(e::state)(i))
    }
  }

  Cont(step(List[E]()))
}

def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis))