Подтвердить что ты не робот

Как вычислить процентили в Apache Spark

У меня есть rdd целых чисел (т.е. RDD[Int]), и мне хотелось бы вычислить следующие десять процентилей: [0th, 10th, 20th, ..., 90th, 100th]. Каков наиболее эффективный способ сделать это?

4b9b3361

Ответ 1

Вы можете:

  • Сортировка набора данных через rdd.sortBy()
  • Вычислить размер набора данных через rdd.count()
  • Почтовый индекс с индексом для облегчения поиска процентилей
  • Получить желаемый процентиль через rdd.lookup(), например. для 10-го процентиля rdd.lookup(размер 0,1 *)

Чтобы вычислить медианную и 99-ю процентиль: getPercentiles (rdd, new double [] {0.5, 0.99}, size, numPartitions);

В Java 8:

public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) {
    double[] values = new double[percentiles.length];

    JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions);
    JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap());

    for (int i = 0; i < percentiles.length; i++) {
        double percentile = percentiles[i];
        long id = (long) (rddSize * percentile);
        values[i] = indexed.lookup(id).get(0);
    }

    return values;
}

Обратите внимание, что это требует сортировки набора данных O (n.log(n)) и может быть дорогостоящим на больших наборах данных.

Другой ответ, предлагающий просто вычислить гистограмму, не будет правильно вычислять процентиль: вот пример встречного примера: набор данных, состоящий из 100 чисел, 99 чисел, равный 0, и одно число, равное 1. В итоге вы получаете все 99 0 в первом бункере и 1 в последнем ящике с 8 пустыми ячейками в середине.

Ответ 2

Как насчет t-digest?

https://github.com/tdunning/t-digest

Новая структура данных для точного он-лайн накопления статистических данных рангов, таких как квантили и обрезанные средства. Алгоритм t-digest также очень дружелюбен к параллели, что делает его полезным в приложениях для уменьшения карты и параллельной потоковой передачи.

Алгоритм построения t-дайджеста использует вариант одномерной кластеризации k-средних для создания структуры данных, связанной с Q-дайджестом. Эта структура данных t-digest может использоваться для оценки квантилей или вычисления другой статистики рангов. Преимущество t-digest над Q-дайджестом заключается в том, что t-дайджест может обрабатывать значения с плавающей запятой, а Q-дайджест ограничен целыми числами. С небольшими изменениями t-дайджест может обрабатывать любые значения из любого упорядоченного набора, который имеет нечто похожее на среднее значение. Точность оценок квантилей, создаваемых t-дайджестами, может быть на порядок более точным, чем точность, полученная Q-дайджестами, несмотря на то, что t-дайджесты более компактны при хранении на диске.

Таким образом, особенно интересные характеристики t-дайджеста заключаются в том, что он

  • имеет меньшие суммы, чем Q-дайджест
  • работает как с парными, так и с целыми числами.
  • обеспечивает точность в части на миллион для экстремальных квантилей и, как правило, точность 1000 м.д. для средних квантилей
  • быстро
  • очень просто
  • имеет ссылочную реализацию, которая имеет > 90% охвата тестирования
  • может использоваться с уменьшением карты очень легко, потому что дайджесты могут быть объединены.

Достаточно просто использовать эталонную реализацию Java из Spark.

Ответ 3

Я обнаружил этот gist

https://gist.github.com/felixcheung/92ae74bc349ea83a9e29

который содержит следующую функцию:

  /**
   * compute percentile from an unsorted Spark RDD
   * @param data: input data set of Long integers
   * @param tile: percentile to compute (eg. 85 percentile)
   * @return value of input data at the specified percentile
   */
  def computePercentile(data: RDD[Long], tile: Double): Double = {
    // NIST method; data to be sorted in ascending order
    val r = data.sortBy(x => x)
    val c = r.count()
    if (c == 1) r.first()
    else {
      val n = (tile / 100d) * (c + 1d)
      val k = math.floor(n).toLong
      val d = n - k
      if (k <= 0) r.first()
      else {
        val index = r.zipWithIndex().map(_.swap)
        val last = c
        if (k >= c) {
          index.lookup(last - 1).head
        } else {
          index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
        }
      }
    }
  }

Ответ 4

Вот моя реализация Python на Spark для вычисления процентиля для RDD, содержащего интересующие значения.

def percentile_threshold(ardd, percentile):
    assert percentile > 0 and percentile <= 100, "percentile should be larger then 0 and smaller or equal to 100"

    return ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) \
            .lookup(np.ceil(ardd.count() / 100 * percentile - 1))[0]

# Now test it out
import numpy as np
randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)

print percentile_threshold(ardd,0.001)
print percentile_threshold(ardd,1)
print percentile_threshold(ardd,60.11)
print percentile_threshold(ardd,99)
print percentile_threshold(ardd,99.999)
print percentile_threshold(ardd,100)

# output:
# 1
# 100
# 6011
# 9900
# 10000
# 10000

В отдельности я определил следующую функцию, чтобы получить от 10-го до 100-го процентиля.

def get_percentiles(rdd, stepsize=10):
    percentiles = []
    rddcount100 = rdd.count() / 100 
    sortedrdd = ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0]))


    for p in range(0, 101, stepsize):
        if p == 0:
            pass
            # I am not aware of a formal definition of 0 percentile, 
            # you can put a place holder like this if you want
            # percentiles.append(sortedrdd.lookup(0)[0] - 1) 
        elif p == 100:
            percentiles.append(sortedrdd.lookup(np.ceil(rddcount100 * 100 - 1))[0])
        else:
            pv = sortedrdd.lookup(np.ceil(rddcount100 * p) - 1)[0]
            percentiles.append(pv)

    return percentiles

randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)
get_percentiles(ardd, 10)

# [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]

Ответ 5

Если вы не возражаете преобразовывать свой RDD в DataFrame и используете UDAF для Hive, вы можете использовать percentile. Предполагая, что вы загрузили hiveContext HiveContext в область видимости:

hiveContext.sql("SELECT percentile(x, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9)) FROM yourDataFrame")

Я узнал об этом Hive UDAF в этом ответе.

Ответ 6

Преобразуйте RDD в RDD Double, а затем используйте действие .histogram(10). См. DoubleRDD ScalaDoc

Ответ 7

Если N процентов мало, например 10, 20%, я сделаю следующее:

  1. Вычислите размер набора данных, rdd.count(), пропустите его, может быть, вы уже знаете его и примите в качестве аргумента.

  2. Вместо того, чтобы сортировать весь набор данных, я узнаю top (N) из каждого раздела. Для этого мне нужно выяснить N = что такое N% от rdd.count, затем отсортировать разделы и взять top (N) из каждого раздела. Теперь у вас есть намного меньший набор данных для сортировки.

3.rdd.sortBy

4.zipWithIndex

5.фильтр (индекс & lt; topN)

Ответ 8

Другим альтернативным способом может быть использование верхнего и последнего RDD двойного. Например, val percentile_99th_value = scores.top((count/100).toInt).last

Этот метод больше подходит для отдельных процентилей.

Ответ 9

Основываясь на ответе, приведенном здесь Медианный UDAF в Spark/Scala, я использовал UDAF для вычисления процентилей по искровым окнам (искра 2.1):

Сначала абстрактный обобщенный UDAF, используемый для других агрегатов

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer


abstract class GenericUDAF extends UserDefinedAggregateFunction {

  def inputSchema: StructType =
    StructType(StructField("value", DoubleType) :: Nil)

  def bufferSchema: StructType = StructType(
    StructField("window_list", ArrayType(DoubleType, false)) :: Nil
  )

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new ArrayBuffer[Double]()
  }

  def update(buffer: MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = {
    var bufferVal = buffer.getAs[mutable.WrappedArray[Double]](0).toBuffer
    bufferVal+=input.getAs[Double](0)
    buffer(0) = bufferVal
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = {
    buffer1(0) = buffer1.getAs[ArrayBuffer[Double]](0) ++ buffer2.getAs[ArrayBuffer[Double]](0)
  }

  def dataType: DataType
  def evaluate(buffer: Row): Any

}

Затем Percentile UDAF настроен для децилей:

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer


class DecilesUDAF extends GenericUDAF {

  override def dataType: DataType = ArrayType(DoubleType, false)

  override def evaluate(buffer: Row): Any = {
    val sortedWindow = buffer.getAs[mutable.WrappedArray[Double]](0).sorted.toBuffer
    val windowSize = sortedWindow.size
    if (windowSize == 0) return null
    if (windowSize == 1) return (0 to 10).map(_ => sortedWindow.head).toArray

    (0 to 10).map(i => sortedWindow(Math.min(windowSize-1, i*windowSize/10))).toArray

  }
}

UDAF затем создается и вызывается через секционированное и упорядоченное окно:

val deciles = new DecilesUDAF()
df.withColumn("mt_deciles", deciles(col("mt")).over(myWindow))

Затем вы можете разбить полученный массив на несколько столбцов с помощью getItem:

def splitToColumns(size: Int, splitCol:String)(df: DataFrame) = {
  (0 to size).foldLeft(df) {
    case (df_arg, i) => df_arg.withColumn("mt_decile_"+i, col(splitCol).getItem(i))
  }
}

df.transform(splitToColumns(10, "mt_deciles" ))

UDAF медленнее, чем встроенные функции искры, но если каждый сгруппированный пакет или каждое окно относительно малы и подходят одному исполнителю, все должно быть хорошо. Основным преимуществом является использование искрового параллелизма. Без особых усилий этот код можно распространить на n-квантили.

Я проверил код с помощью этой функции:

def testDecilesUDAF = {
    val window = W.partitionBy("user")
    val deciles = new DecilesUDAF()

    val schema = StructType(StructField("mt", DoubleType) :: StructField("user", StringType) :: Nil)

    val rows1 = (1 to 20).map(i => Row(i.toDouble, "a"))
    val rows2 = (21 to 40).map(i => Row(i.toDouble, "b"))

    val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows1++rows2), schema)

    df.withColumn("deciles", deciles(col("mt")).over(window))
      .transform(splitToColumns(10, "deciles" ))
      .drop("deciles")
      .show(100, truncate=false)
  }

Первые 3 строки вывода:

+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|mt  |user|mt_decile_0|mt_decile_1|mt_decile_2|mt_decile_3|mt_decile_4|mt_decile_5|mt_decile_6|mt_decile_7|mt_decile_8|mt_decile_9|mt_decile_10|
+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|21.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |
|22.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |
|23.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |

Ответ 10

Вот мой легкий подход:

val percentiles = Array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1)
val accuracy = 1000000
df.stat.approxQuantile("score", percentiles, 1.0/accuracy)

выход:

scala> df.stat.approxQuantile("score", percentiles, 1.0/accuracy)
res88: Array[Double] = Array(0.011044141836464405, 0.02022990956902504, 0.0317261666059494, 0.04638145491480827, 0.06498630344867706, 0.0892181545495987, 0.12161539494991302, 0.16825592517852783, 0.24740923941135406, 0.9188197255134583)

точность: параметр точности (по умолчанию: 10000) является положительным числовым литералом, который управляет точностью аппроксимации за счет памяти. Более высокое значение точности дает лучшую точность, 1,0/точность является относительной погрешностью аппроксимации.