Подтвердить что ты не робот

Организация и наилучшая практика искрового кода

Итак, проведя много лет в объектно-ориентированном мире с повторным использованием кода, шаблоны проектирования и лучшие практики всегда учитываются, я нахожу, что немного борюсь с организацией кода и повторным использованием кода в мире Spark.

Если я пытаюсь написать код в многоразовом режиме, он почти всегда приходит с ценой производительности, и я в конечном итоге переписываю его в то, что оптимально для моего конкретного варианта использования. Эта константа "написать то, что оптимально для этого конкретного случая использования" также влияет на организацию кода, потому что разделение кода на разные объекты или модули затруднено, когда "все это действительно принадлежит вместе", и поэтому я получаю очень мало объектов "Бог", содержащих длинные цепей сложных преобразований. На самом деле, я часто думаю, что если бы я взглянул на большую часть кода искры, который я пишу сейчас, когда работаю в объектно-ориентированном мире, я бы вздрогнул и отклонил его как "код спагетти".

Я занимаюсь серфингом в интернете, пытаясь найти какой-то эквивалент лучшим практикам объектно-ориентированного мира, но без большой удачи. Я могу найти некоторые "лучшие практики" для функционального программирования, но Spark просто добавляет дополнительный слой, потому что производительность здесь является таким важным фактором.

Итак, мой вопрос к вам: есть ли у вас кто-нибудь из гуру Spark, который нашел некоторые рекомендации по написанию кода Spark, который вы можете порекомендовать?

ИЗМЕНИТЬ

Как написано в комментарии, я действительно не ожидал, что кто-нибудь опубликует ответ о том, как решить эту проблему, но я надеялся, что кто-то из этого сообщества столкнулся с типом Мартина Фаулера, написавшим som articles или где-то в блогах, как решить проблемы с организацией кода в мире Spark.

@DanielDarabos предложил мне привести пример ситуации, когда организация и производительность кода противоречивы. Хотя я обнаружил, что у меня часто возникают проблемы с этим в моей повседневной работе, мне немного сложно сварить его до хорошего минимального примера;) но я постараюсь.

В объектно-ориентированном мире я большой поклонник принципа единой ответственности, поэтому я буду следить за тем, чтобы мои методы отвечали только за одну вещь. Это делает их многоразовыми и легко проверяемыми. Так что, если бы мне пришлось, скажем, рассчитать сумму некоторых чисел в списке (соответствующие некоторым критериям), и мне пришлось вычислять среднее значение того же числа, я бы определенно создал два метода - один, который рассчитал сумму и тот, который вычислил среднее значение. Вот так:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

Я могу, конечно, продолжать уважать SRP в Spark:

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

Но поскольку мой df может содержать миллиарды строк, мне бы не пришлось дважды выполнять filter. Фактически, производительность напрямую связана с затратами на ЭМИ, поэтому я ДЕЙСТВИТЕЛЬНО не хочу этого. Чтобы преодолеть это, я, таким образом, решил нарушить SRP и просто поместить две функции в одну и убедиться, что я вызываю упор на фильтрацию по стране DataFrame, например:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

Теперь, этот пример, если, конечно, огромное упрощение того, что встречалось в реальной жизни. Здесь я мог бы просто решить эту проблему, отфильтровывая и сохраняя df перед тем, как передать ее в функции sum и avg (что также будет больше SRP), но в реальной жизни может быть проведен ряд промежуточных вычислений, которые необходимы снова и еще раз. Другими словами, функция filter здесь представляет собой просто попытку сделать простой пример того, что будет полезно для сохранения. На самом деле, я думаю, что призывы к persist являются ключевыми словами здесь. Вызов persist значительно ускорит мою работу, но стоимость в том, что мне приходится тесно связывать весь код, который зависит от сохраненного DataFrame - даже если они логически разделены.

4b9b3361

Ответ 1

Я думаю, что вы можете подписаться на Apache Spark, databricks канал на YouTube, слушать больше и узнать больше, особенно для опытов и уроков других.

вот несколько рекомендуемых видео:

и я опубликовал и все еще обновляю его в своем github и блоге:

надеюсь, что это поможет вам ~