Я хотел бы использовать SourceQueue для динамического перемещения элементов в источник потока Akka.
Play контроллер нуждается в источнике, чтобы иметь возможность передавать результат с помощью метода chuncked
.
Поскольку Play использует свою собственную Stream Stream Sink под капотом, я не могу материализовать исходную очередь самостоятельно, используя Sink, потому что источник будет потребляться до того, как он будет использоваться методом chunked
(за исключением случаев, когда я использую следующий хак).
Я могу заставить его работать, если я предварительно создаю исходную очередь, используя издатель реактивных потоков, но это своего рода "грязный взлом":
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
Есть ли более простой способ использования SourceQueue потока Akka с PlayFramework?
Спасибо