Подтвердить что ты не робот

Как регистрировать запросы клиента Akka HTTP

Мне нужно регистрировать запросы клиента akka http, а также их ответы. Несмотря на то, что, как представляется, существует API-интерфейс для регистрации этих запросов, нет четкой документации о том, как это сделать. Мой подход заключался в создании зарегистрированного запроса, который прозрачно обертывает Http().singleRequest(req) следующим образом:

def loggedRequest(req: HttpRequest)
                  (implicit system: ActorSystem, ctx: ExecutionContext, m: Materializer): Future[HttpResponse] = {

  Http().singleRequest(req).map { resp ⇒
    Unmarshal(resp.entity).to[String].foreach{s ⇒
      system.log.info(req.toString)
      system.log.info(resp.toString + "\n" + s)
    }
    resp
  }
}

К сожалению, я должен захватить будущее либо через немаршаль, либо просто запросить resp.entity.dataBytes, чтобы восстановить тело ответа. Я получаю регистрацию, но обещание завершается, и я больше не могу развязывать сущность с фактическими данными. Рабочее решение регистрирует запрос и ответ и передает этот тестовый пример без IllegalStateException с броском "Promise уже завершено":

describe("Logged rest requests") {

  it("deliver typed responses") {
    val foo = Rest.loggedRequest(Get(s"http://127.0.0.1:9000/some/path"))
    val resp = foo.futureValue(patience)
    resp.status shouldBe StatusCodes.OK
    val res = Unmarshal(resp.entity).to[MyClass].futureValue
  }
}

Идеи приветствуются.

4b9b3361

Ответ 1

Одним из решений, которое я нашел, является использование:

import akka.http.scaladsl.server.directives.DebuggingDirectives

val clientRouteLogged = DebuggingDirectives.logRequestResult("Client ReST", Logging.InfoLevel)(clientRoute)
Http().bindAndHandle(clientRouteLogged, interface, port)

Что может легко записать запрос и привести к необработанному (байтам) формату. Проблема в том, что эти журналы полностью нечитаемы. И вот место, где стало сложно.

Вот мой пример, который кодирует объект запроса/ответа и записывает его в регистратор.

Вы можете передать функцию:

DebuggingDirectives.logRequestResult

def logRequestResult(magnet: LoggingMagnet[HttpRequest ⇒ RouteResult ⇒ Unit])

Это функция, написанная с использованием магнитного шаблона:

LoggingMagnet[HttpRequest ⇒ RouteResult ⇒ Unit]

Где:

LoggingMagnet[T](f: LoggingAdapter ⇒ T)

Благодаря этому мы имеем доступ ко всем частям, которые нам нужны для регистрации запроса и результата. У нас есть LoggingAdapter, HttpRequest и RouteResult

В моем случае я создаю внутреннюю функцию. Я не хочу снова передавать все параметры.

def logRequestResult(level: LogLevel, route: Route)
                      (implicit m: Materializer, ex: ExecutionContext) = {
  def myLoggingFunction(logger: LoggingAdapter)(req: HttpRequest)(res: Any): Unit = {
    val entry = res match {
      case Complete(resp) =>
        entityAsString(resp.entity).map(data ⇒ LogEntry(s"${req.method} ${req.uri}: ${resp.status} \n entity: $data", level))
      case other =>
        Future.successful(LogEntry(s"$other", level))
    }
    entry.map(_.logTo(logger))
  }
  DebuggingDirectives.logRequestResult(LoggingMagnet(log => myLoggingFunction(log)))(route)
}

Самая важная часть - последняя строка, в которую я помещаю myLoggingFunction в logRequestResult.

Функция myLoggingFunction, простая совпадающая с результатом вычисления сервера и создающая на ней LogEntry.

Последнее, что есть метод, который позволяет декодировать объект результата из потока.

def entityAsString(entity: HttpEntity)
                   (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
entity.dataBytes
  .map(_.decodeString(entity.contentType().charset().value))
  .runWith(Sink.head)
}

Этот метод можно легко добавить к любому маршруту akka-http.

val myLoggedRoute = logRequestResult(Logging.InfoLevel, clinetRoute)
Http().bindAndHandle(myLoggedRoute, interface, port)

Ответ 2

У Akka есть директивы, которые регистрируют запросы, ответы или оба, и позволяют вам создавать собственные регистраторы как DebugDirectives. Документация охватывает детали. TL; DR для регистрации запросов и ответов:

import akka.http.scaladsl.server.directives.DebuggingDirectives

DebuggingDirectives.logRequestResult("http", Logging.InfoLevel) {
  someRoute ~ someOtherRoute
}

Ответ 3

Для другого решения этот код регистрирует IP-адрес запроса и связывает случайное число с каждым запросом и ответом, чтобы они могли быть связаны в журналах. Он также записывает время ответа.

Поскольку запрос может потребоваться некоторое время для обработки и может выйти из строя, я хотел немедленно просмотреть запрос и увидеть ответ, если и когда он вернется.

RequestFields - это только данные, о которых я забочусь о запросе. По умолчанию много шума.

val logRequestResponse: Directive0 =
  extractRequestContext flatMap { ctx =>
    extractClientIP flatMap { ip =>
      val id = scala.math.abs(rand.nextLong).toString
      onSuccess(RequestFields.fromIdIpAndRequest(id, ip, ctx.request)) flatMap { req =>
        logger.info("request", req.asJson)
        val i = Instant.now()
        mapRouteResultWith { result => 
          Result.fromIdStartTimeAndRouteResult(id, i, result) map { res =>
            logger.info("response", res.asJson)
            result
        }
      }
    }
  }
}

Ответ 4

Мое полное решение, вдохновленное @seanmcl

trait TraceDirectives extends LazyLogging {

  private val counter: AtomicLong = new AtomicLong(0)

  private def log: Directive0 = count flatMap { requestId =>
    mapInnerRoute(addLoggingToRoute(requestId, _))
  }

  private def count: Directive1[Long] = Directive { innerRouteSupplier =>
    ctx =>
      innerRouteSupplier(Tuple1(counter.incrementAndGet()))(ctx)
  }

  private def addLoggingToRoute(requestId: Long, innerRoute: Route): Route = {
    ctx => {
      val requestStopwatch = Stopwatch.createStarted()
      extractClientIP { ip =>
        logger.info("Http request, id: {}, uri: {}, forwarded ip: {}", requestId, ctx.request.uri, ip)
        mapResponse(httpResponse => {
          logger.info("Http response, id: {}, code: {}, time: {}", requestId, httpResponse.status.intValue(), requestStopwatch.toString)
          httpResponse
        })(innerRoute)
      }(ctx)
    }
  }
}

object TraceDirectives extends TraceDirectives