Подтвердить что ты не робот

Как добавить элементы в источник динамически?

У меня есть пример кода для генерации несвязанного источника и работы с ним:

object Main {

 def main(args : Array[String]): Unit = {

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val source: Source[String] = Source(() => {
     Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
    })

  source.runForeach((item:String) => { println(item) })
  .onComplete{ _ => system.shutdown() }
 }

}

Я хочу создать класс, который реализует:

trait MySources {
    def addToSource(item: String)
    def getSource() : Source[String]
}

И мне нужно использовать его с несколькими потоками, например:

class MyThread(mySources: MySources) extends Thread {
  override def run(): Unit = {
    for(i <- 1 to 1000000) { // here will be infinite loop
        mySources.addToSource(i.toString)
    }
  }
} 

И ожидаемый полный код:

object Main {
  def main(args : Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val sources = new MySourcesImplementation()

    for(i <- 1 to 100) {
      (new MyThread(sources)).start()
    }

    val source = sources.getSource()

    source.runForeach((item:String) => { println(item) })
    .onComplete{ _ => system.shutdown() }
  }
}

Как реализовать MySources?

4b9b3361

Ответ 1

Один из способов иметь неограниченный источник - использовать в качестве источника особый вид актера, который смешивается с признаком ActorPublisher. Если вы создаете один из этих видов участников, а затем завершите вызовом ActorPublisher.apply, вы получите экземпляр "Реактивные потоки" Publisher, и с ним вы можете использовать apply из Source, чтобы создать Source от него. После этого вам просто нужно убедиться, что ваш класс ActorPublisher правильно обрабатывает протокол Reactive Streams для отправки элементов вниз по течению, и вам хорошо идти. Очень тривиальный пример:

import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._

object DynamicSourceExample extends App{

  implicit val system = ActorSystem("test")
  implicit val materializer = ActorFlowMaterializer()

  val actorRef = system.actorOf(Props[ActorBasedSource])
  val pub = ActorPublisher[Int](actorRef)

  Source(pub).
    map(_ * 2).
    runWith(Sink.foreach(println))

  for(i <- 1 until 20){
    actorRef ! i.toString
    Thread.sleep(1000)
  }

}

class ActorBasedSource extends Actor with ActorPublisher[Int]{
  import ActorPublisherMessage._
  var items:List[Int] = List.empty

  def receive = {
    case s:String =>
      if (totalDemand == 0) 
        items = items :+ s.toInt
      else
        onNext(s.toInt)    

    case Request(demand) =>  
      if (demand > items.size){
        items foreach (onNext)
        items = List.empty
      }
      else{
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }


}

Ответ 3

Как я уже упоминал в этом ответе, SourceQueue - это путь, и начиная с Akka 2.5 существует удобный метод preMaterialize, который устраняет необходимость создания составного источника. первый.

Я привожу пример в своем другом ответе.