Подтвердить что ты не робот

Play 2.x: загрузка реактивного файла с помощью Iteratees

Я начну с вопроса: как использовать Scala API Iteratee для загрузки файла в хранилище облаков (в моем случае это хранилище Azure Blob, но я не думаю, что это самое главное сейчас)

Фон:

Мне нужно записать в блок размером около 1 МБ для хранения больших медиафайлов (300 МБ +) в качестве Azure BlockBlobs. К сожалению, мои знания Scala все еще плохие (мой проект основан на Java, и единственное использование для Scala в нем будет контроллером загрузки).

Я попытался с помощью этого кода: Почему вы вызываете ошибку или выполняете в Iteratee BodyParser запрос зависает в Play Framework 2.0? (как Input Iteratee) - он работает достаточно хорошо, но каждый Element, который я мог использовать, имеет размер 8192 байта, поэтому он слишком мал для отправки в облако файлов сотен мегабайт.

Я должен сказать, что совершенно новый подход ко мне, и, скорее всего, я что-то неправильно понял (не хочу сказать, что я все неправильно понял) >

Я буду благодарен за любой намек или ссылку, которая поможет мне в этой теме. Если есть какой-либо образец аналогичного использования, это был бы лучший вариант для меня, чтобы получить эту идею.

4b9b3361

Ответ 1

В основном то, что вам нужно, это повторный ввод в виде более крупных блоков, 1024 * 1024 байта.

Сначала давайте иметь Iteratee, который будет потреблять до 1 м байт (ok, чтобы последний кусок меньше)

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

Используя это, мы можем построить Enumeratee (адаптер), который будет перегруппировать куски, используя API под названием grouped:

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

Здесь grouped использует Iteratee, чтобы определить, сколько нужно положить в каждый кусок. Для этого он использует наш consumeAMB. Это означает, что результатом является Enumeratee, который переписывает ввод в Array[Byte] 1 МБ.

Теперь нам нужно написать BodyParser, который будет использовать метод Iteratee.foldM для отправки каждого фрагмента байтов:

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

foldM передает состояние и использует его в своей переданной функции (S,Input[Array[Byte]]) => Future[S], чтобы вернуть новое состояние будущего. foldM не будет вызывать функцию снова до тех пор, пока Future не будет завершен, и есть доступный фрагмент ввода.

И анализатор тела будет повторять ввод и вставлять его в хранилище:

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

Возврат вправо указывает, что вы возвращаете тело к концу разбора тела (который здесь является обработчиком).

Ответ 2

Если ваша цель состоит в потоке на S3, вот помощник, который я реализовал и протестировал:

def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
                (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
  import scala.collection.JavaConversions._

  val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  val initResponse = s3.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
    Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
  }

  val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
    val uploadRequest = new UploadPartRequest()
      .withBucketName(bucket)
      .withKey(key)
      .withPartNumber(etags.length + 1)
      .withUploadId(uploadId)
      .withInputStream(new ByteArrayInputStream(bytes))
      .withPartSize(bytes.length)

    val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
    etag.map(etags :+ _)
  }

  val futETags = enum &> rechunker |>>> uploader

  futETags.map { etags =>
    val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
    s3.completeMultipartUpload(compRequest)
  }.recoverWith { case e: Exception =>
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
    Future.failed(e)
  }

}

Ответ 3

Для тех, кто также пытается найти решение этой проблемы с потоками, вместо того, чтобы писать совершенно новый BodyParser, вы также можете использовать то, что уже было реализовано в parse.multipartFormData. Вы можете реализовать что-то вроде ниже, чтобы перезаписать обработчик по умолчанию handleFilePartAsTemporaryFile.

def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
  handleFilePart {
    case FileInfo(partName, filename, contentType) =>

      (rechunkAdapter &>> writeToS3).map {
        _ =>
          val compRequest = new CompleteMultipartUploadRequest(...)
          amazonS3Client.completeMultipartUpload(compRequest)
          ...
      }
  }
}

def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)

Я могу выполнить эту работу, но я по-прежнему не уверен, что весь процесс загрузки потоковым. Я попробовал несколько больших файлов, кажется, что загрузка S3 начинается только тогда, когда весь файл был отправлен с клиентской стороны.

Я посмотрел на реализацию вышеописанного анализатора, и я думаю, что все связано с Iteratee, поэтому файл должен быть потоковым. Если у кого-то есть некоторое представление об этом, это будет очень полезно.

Ответ 4

добавить в конфигурационный файл следующее

play.http.parser.maxMemoryBuffer = 256K