Подтвердить что ты не робот

Akka Dead Letters с шаблоном Ask

Я заранее извиняюсь, если это кажется совсем запутанным, так как я здесь немного сваливаю. В принципе, у меня есть небольшой сервис, захватывающий Json, разбор и извлечение его в класс case, а затем его запись в базу данных. Эта служба должна запускаться по расписанию, которое хорошо обрабатывается планировщиком Akka. Моя база данных не нравится, когда Slick пытается запросить новый идентификатор AutoInc одновременно, поэтому я построил в Await.result, чтобы это не произошло. Все это работает очень хорошо, но моя проблема начинается здесь: есть 7 из этих служб, поэтому я хотел бы заблокировать их, используя аналогичную систему Await.result. Каждый раз, когда я пытаюсь отправить время окончания запроса обратно в ответ (в конце блока else), он отправляется на мертвые буквы, а не на Дистрибьютора. В принципе: почему sender ! time переходит к мертвым буквам, а не к Дистрибьютору. Это долгий вопрос для простой проблемы, но то, как развитие идет...

ClickActor.scala

    import java.text.SimpleDateFormat
    import java.util.Date
    import Message._
    import akka.actor.{Actor, ActorLogging, Props}
    import akka.util.Timeout
    import com.typesafe.config.ConfigFactory
    import net.liftweb.json._
    import spray.client.pipelining._
    import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse, Uri}
    import akka.pattern.ask
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._

case class ClickData(recipient : String, geolocation : Geolocation, tags : Array[String],
                     url : String, timestamp : Double, campaigns : Array[String],
                     `user-variables` : JObject, ip : String,
                     `client-info` : ClientInfo, message : ClickedMessage, event : String)
  case class Geolocation(city : String, region : String, country : String)
  case class ClientInfo(`client-name`: String, `client-os`: String, `user-agent`: String,
                      `device-type`: String, `client-type`: String)
  case class ClickedMessage(headers : ClickHeaders)
    case class ClickHeaders(`message-id` : String)

class ClickActor extends Actor with ActorLogging{

  implicit val formats = DefaultFormats
  implicit val timeout = new Timeout(3 minutes)
  import context.dispatcher

  val con = ConfigFactory.load("connection.conf")
  val countries = ConfigFactory.load("country.conf")
  val regions = ConfigFactory.load("region.conf")

  val df = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss -0000")
  var time = System.currentTimeMillis()
  var begin = new Date(time - (12 hours).toMillis)
  var end = new Date(time)

  val pipeline : HttpRequest => Future[HttpResponse] = (
    addCredentials(BasicHttpCredentials("api", con.getString("mailgun.key")))
      ~> sendReceive
    )

  def get(lastrun : Long): Future[String] = {

    if(lastrun != 0) {
      begin = new Date(lastrun)
      end = new Date(time)
    }

    val uri = Uri(con.getString("mailgun.uri")) withQuery("begin" -> df.format(begin), "end" -> df.format(end),
      "ascending" -> "yes", "limit" -> "100", "pretty" -> "yes", "event" -> "clicked")
    val request = Get(uri)
    val futureResponse = pipeline(request)
    return futureResponse.map(_.entity.asString)
  }

  def receive = {
    case lastrun : Long => {
      val start = System.currentTimeMillis()
      val responseFuture = get(lastrun)
      responseFuture.onSuccess {
        case payload: String => val json = parse(payload)
          //println(pretty(render(json)))
          val elements = (json \\ "items").children
          if (elements.length == 0) {
            log.info("[ClickActor: " + this.hashCode() + "] did not find new events between " +
              begin.toString + " and " + end.toString)
            sender ! time
            context.stop(self)
          }
          else {
            for (item <- elements) {
              val data = item.extract[ClickData]
              var tags = ""
              if (data.tags.length != 0) {
                for (tag <- data.tags)
                  tags += (tag + ", ")
              }
              var campaigns = ""
              if (data.campaigns.length != 0) {
                for (campaign <- data.campaigns)
                  campaigns += (campaign + ", ")
              }
              val timestamp = (data.timestamp * 1000).toLong
              val msg = new ClickMessage(
                data.recipient, data.geolocation.city,
                regions.getString(data.geolocation.country + "." + data.geolocation.region),
                countries.getString(data.geolocation.country), tags, data.url, timestamp,
                campaigns, data.ip, data.`client-info`.`client-name`,
                data.`client-info`.`client-os`, data.`client-info`.`user-agent`,
                data.`client-info`.`device-type`, data.`client-info`.`client-type`,
                data.message.headers.`message-id`, data.event, compactRender(item))
              val csqla = context.actorOf(Props[ClickSQLActor])
              val future = csqla.ask(msg)
              val result = Await.result(future, timeout.duration).asInstanceOf[Int]
              if (result == 1) {
                log.error("[ClickSQLActor: " + csqla.hashCode() + "] shutting down due to lack of system environment variables")
                context.stop(csqla)
              }
              else if(result == 0) {
                log.info("[ClickSQLActor: " + csqla.hashCode() + "] successfully wrote to the DB")
              }
            }
            sender ! time
            log.info("[ClickActor: " + this.hashCode() + "] processed |" + elements.length + "| new events in " +
              (System.currentTimeMillis() - start) + " ms")
          }
      }
    }
  }
}

Distributor.scala

import akka.actor.{Props, ActorSystem}
import akka.event.Logging
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Await

class Distributor {

  implicit val timeout = new Timeout(10 minutes)
  var lastClick : Long = 0

  def distribute(system : ActorSystem) = {
    val log = Logging(system, getClass)

    val clickFuture = (system.actorOf(Props[ClickActor]) ? lastClick)
    lastClick = Await.result(clickFuture, timeout.duration).asInstanceOf[Long]
    log.info(lastClick.toString)

    //repeat process with other events (open, unsub, etc)
  }
}
4b9b3361

Ответ 1

Причина в том, что значение 'sender' (которое является методом, который извлекает значение) больше не действует после выхода из блока приема, но будущее, которое используется в приведенном выше примере, все равно будет запущено и время, когда он закончит актер, оставит блок приема и удар; неверный отправитель приводит к сообщению, отправляющемуся в очередь мертвой буквы.

Исправление состоит в том, чтобы либо не использовать будущее, либо при объединении фьючерсов, действующих лиц и отправителя, затем фиксировать значение отправителя перед запуском будущего.

val s = sender

val responseFuture = get(lastrun)
    responseFuture.onSuccess {    
    ....
    s ! time
}