Подтвердить что ты не робот

Как использовать SourceQueue потоков Akka с помощью PlayFramework

Я хотел бы использовать SourceQueue для динамического перемещения элементов в источник потока Akka. Play контроллер нуждается в источнике, чтобы иметь возможность передавать результат с помощью метода chuncked.
Поскольку Play использует свою собственную Stream Stream Sink под капотом, я не могу материализовать исходную очередь самостоятельно, используя Sink, потому что источник будет потребляться до того, как он будет использоваться методом chunked (за исключением случаев, когда я использую следующий хак).

Я могу заставить его работать, если я предварительно создаю исходную очередь, используя издатель реактивных потоков, но это своего рода "грязный взлом":

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }

Есть ли более простой способ использования SourceQueue потока Akka с PlayFramework?

Спасибо

4b9b3361

Ответ 1

Решение состоит в том, чтобы использовать mapMaterializedValue в источнике, чтобы получить будущее материализации своей очереди:

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .runForeach (t => queue.offer(t))
    }
    Ok.chunked(queueSource)

  }

  //T is the source type, here String
  //M is the materialization type, here a SourceQueue[String]
  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }