У меня есть rdd целых чисел (т.е. RDD[Int]
), и мне хотелось бы вычислить следующие десять процентилей: [0th, 10th, 20th, ..., 90th, 100th]
. Каков наиболее эффективный способ сделать это?
Как вычислить процентили в Apache Spark
Ответ 1
Вы можете:
- Сортировка набора данных через rdd.sortBy()
- Вычислить размер набора данных через rdd.count()
- Почтовый индекс с индексом для облегчения поиска процентилей
- Получить желаемый процентиль через rdd.lookup(), например. для 10-го процентиля rdd.lookup(размер 0,1 *)
Чтобы вычислить медианную и 99-ю процентиль: getPercentiles (rdd, new double [] {0.5, 0.99}, size, numPartitions);
В Java 8:
public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) {
double[] values = new double[percentiles.length];
JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions);
JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap());
for (int i = 0; i < percentiles.length; i++) {
double percentile = percentiles[i];
long id = (long) (rddSize * percentile);
values[i] = indexed.lookup(id).get(0);
}
return values;
}
Обратите внимание, что это требует сортировки набора данных O (n.log(n)) и может быть дорогостоящим на больших наборах данных.
Другой ответ, предлагающий просто вычислить гистограмму, не будет правильно вычислять процентиль: вот пример встречного примера: набор данных, состоящий из 100 чисел, 99 чисел, равный 0, и одно число, равное 1. В итоге вы получаете все 99 0 в первом бункере и 1 в последнем ящике с 8 пустыми ячейками в середине.
Ответ 2
Как насчет t-digest?
https://github.com/tdunning/t-digest
Новая структура данных для точного он-лайн накопления статистических данных рангов, таких как квантили и обрезанные средства. Алгоритм t-digest также очень дружелюбен к параллели, что делает его полезным в приложениях для уменьшения карты и параллельной потоковой передачи.
Алгоритм построения t-дайджеста использует вариант одномерной кластеризации k-средних для создания структуры данных, связанной с Q-дайджестом. Эта структура данных t-digest может использоваться для оценки квантилей или вычисления другой статистики рангов. Преимущество t-digest над Q-дайджестом заключается в том, что t-дайджест может обрабатывать значения с плавающей запятой, а Q-дайджест ограничен целыми числами. С небольшими изменениями t-дайджест может обрабатывать любые значения из любого упорядоченного набора, который имеет нечто похожее на среднее значение. Точность оценок квантилей, создаваемых t-дайджестами, может быть на порядок более точным, чем точность, полученная Q-дайджестами, несмотря на то, что t-дайджесты более компактны при хранении на диске.
Таким образом, особенно интересные характеристики t-дайджеста заключаются в том, что он
- имеет меньшие суммы, чем Q-дайджест
- работает как с парными, так и с целыми числами.
- обеспечивает точность в части на миллион для экстремальных квантилей и, как правило, точность 1000 м.д. для средних квантилей
- быстро
- очень просто
- имеет ссылочную реализацию, которая имеет > 90% охвата тестирования
- может использоваться с уменьшением карты очень легко, потому что дайджесты могут быть объединены.
Достаточно просто использовать эталонную реализацию Java из Spark.
Ответ 3
Я обнаружил этот gist
https://gist.github.com/felixcheung/92ae74bc349ea83a9e29
который содержит следующую функцию:
/**
* compute percentile from an unsorted Spark RDD
* @param data: input data set of Long integers
* @param tile: percentile to compute (eg. 85 percentile)
* @return value of input data at the specified percentile
*/
def computePercentile(data: RDD[Long], tile: Double): Double = {
// NIST method; data to be sorted in ascending order
val r = data.sortBy(x => x)
val c = r.count()
if (c == 1) r.first()
else {
val n = (tile / 100d) * (c + 1d)
val k = math.floor(n).toLong
val d = n - k
if (k <= 0) r.first()
else {
val index = r.zipWithIndex().map(_.swap)
val last = c
if (k >= c) {
index.lookup(last - 1).head
} else {
index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
}
}
}
}
Ответ 4
Вот моя реализация Python на Spark для вычисления процентиля для RDD, содержащего интересующие значения.
def percentile_threshold(ardd, percentile):
assert percentile > 0 and percentile <= 100, "percentile should be larger then 0 and smaller or equal to 100"
return ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) \
.lookup(np.ceil(ardd.count() / 100 * percentile - 1))[0]
# Now test it out
import numpy as np
randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)
print percentile_threshold(ardd,0.001)
print percentile_threshold(ardd,1)
print percentile_threshold(ardd,60.11)
print percentile_threshold(ardd,99)
print percentile_threshold(ardd,99.999)
print percentile_threshold(ardd,100)
# output:
# 1
# 100
# 6011
# 9900
# 10000
# 10000
В отдельности я определил следующую функцию, чтобы получить от 10-го до 100-го процентиля.
def get_percentiles(rdd, stepsize=10):
percentiles = []
rddcount100 = rdd.count() / 100
sortedrdd = ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0]))
for p in range(0, 101, stepsize):
if p == 0:
pass
# I am not aware of a formal definition of 0 percentile,
# you can put a place holder like this if you want
# percentiles.append(sortedrdd.lookup(0)[0] - 1)
elif p == 100:
percentiles.append(sortedrdd.lookup(np.ceil(rddcount100 * 100 - 1))[0])
else:
pv = sortedrdd.lookup(np.ceil(rddcount100 * p) - 1)[0]
percentiles.append(pv)
return percentiles
randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)
get_percentiles(ardd, 10)
# [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]
Ответ 5
Если вы не возражаете преобразовывать свой RDD в DataFrame и используете UDAF для Hive, вы можете использовать percentile. Предполагая, что вы загрузили hiveContext HiveContext в область видимости:
hiveContext.sql("SELECT percentile(x, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9)) FROM yourDataFrame")
Я узнал об этом Hive UDAF в этом ответе.
Ответ 6
Преобразуйте RDD в RDD Double, а затем используйте действие .histogram(10)
. См. DoubleRDD ScalaDoc
Ответ 7
Если N процентов мало, например 10, 20%, я сделаю следующее:
Вычислите размер набора данных, rdd.count(), пропустите его, может быть, вы уже знаете его и примите в качестве аргумента.
Вместо того, чтобы сортировать весь набор данных, я узнаю top (N) из каждого раздела. Для этого мне нужно выяснить N = что такое N% от rdd.count, затем отсортировать разделы и взять top (N) из каждого раздела. Теперь у вас есть намного меньший набор данных для сортировки.
3.rdd.sortBy
4.zipWithIndex
5.фильтр (индекс & lt; topN)
Ответ 8
Другим альтернативным способом может быть использование верхнего и последнего RDD двойного. Например, val percentile_99th_value = scores.top((count/100).toInt).last
Этот метод больше подходит для отдельных процентилей.
Ответ 9
Основываясь на ответе, приведенном здесь Медианный UDAF в Spark/Scala, я использовал UDAF для вычисления процентилей по искровым окнам (искра 2.1):
Сначала абстрактный обобщенный UDAF, используемый для других агрегатов
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
abstract class GenericUDAF extends UserDefinedAggregateFunction {
def inputSchema: StructType =
StructType(StructField("value", DoubleType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("window_list", ArrayType(DoubleType, false)) :: Nil
)
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new ArrayBuffer[Double]()
}
def update(buffer: MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = {
var bufferVal = buffer.getAs[mutable.WrappedArray[Double]](0).toBuffer
bufferVal+=input.getAs[Double](0)
buffer(0) = bufferVal
}
def merge(buffer1: MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = {
buffer1(0) = buffer1.getAs[ArrayBuffer[Double]](0) ++ buffer2.getAs[ArrayBuffer[Double]](0)
}
def dataType: DataType
def evaluate(buffer: Row): Any
}
Затем Percentile UDAF настроен для децилей:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class DecilesUDAF extends GenericUDAF {
override def dataType: DataType = ArrayType(DoubleType, false)
override def evaluate(buffer: Row): Any = {
val sortedWindow = buffer.getAs[mutable.WrappedArray[Double]](0).sorted.toBuffer
val windowSize = sortedWindow.size
if (windowSize == 0) return null
if (windowSize == 1) return (0 to 10).map(_ => sortedWindow.head).toArray
(0 to 10).map(i => sortedWindow(Math.min(windowSize-1, i*windowSize/10))).toArray
}
}
UDAF затем создается и вызывается через секционированное и упорядоченное окно:
val deciles = new DecilesUDAF()
df.withColumn("mt_deciles", deciles(col("mt")).over(myWindow))
Затем вы можете разбить полученный массив на несколько столбцов с помощью getItem:
def splitToColumns(size: Int, splitCol:String)(df: DataFrame) = {
(0 to size).foldLeft(df) {
case (df_arg, i) => df_arg.withColumn("mt_decile_"+i, col(splitCol).getItem(i))
}
}
df.transform(splitToColumns(10, "mt_deciles" ))
UDAF медленнее, чем встроенные функции искры, но если каждый сгруппированный пакет или каждое окно относительно малы и подходят одному исполнителю, все должно быть хорошо. Основным преимуществом является использование искрового параллелизма. Без особых усилий этот код можно распространить на n-квантили.
Я проверил код с помощью этой функции:
def testDecilesUDAF = {
val window = W.partitionBy("user")
val deciles = new DecilesUDAF()
val schema = StructType(StructField("mt", DoubleType) :: StructField("user", StringType) :: Nil)
val rows1 = (1 to 20).map(i => Row(i.toDouble, "a"))
val rows2 = (21 to 40).map(i => Row(i.toDouble, "b"))
val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows1++rows2), schema)
df.withColumn("deciles", deciles(col("mt")).over(window))
.transform(splitToColumns(10, "deciles" ))
.drop("deciles")
.show(100, truncate=false)
}
Первые 3 строки вывода:
+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|mt |user|mt_decile_0|mt_decile_1|mt_decile_2|mt_decile_3|mt_decile_4|mt_decile_5|mt_decile_6|mt_decile_7|mt_decile_8|mt_decile_9|mt_decile_10|
+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|21.0|b |21.0 |23.0 |25.0 |27.0 |29.0 |31.0 |33.0 |35.0 |37.0 |39.0 |40.0 |
|22.0|b |21.0 |23.0 |25.0 |27.0 |29.0 |31.0 |33.0 |35.0 |37.0 |39.0 |40.0 |
|23.0|b |21.0 |23.0 |25.0 |27.0 |29.0 |31.0 |33.0 |35.0 |37.0 |39.0 |40.0 |
Ответ 10
Вот мой легкий подход:
val percentiles = Array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1)
val accuracy = 1000000
df.stat.approxQuantile("score", percentiles, 1.0/accuracy)
выход:
scala> df.stat.approxQuantile("score", percentiles, 1.0/accuracy)
res88: Array[Double] = Array(0.011044141836464405, 0.02022990956902504, 0.0317261666059494, 0.04638145491480827, 0.06498630344867706, 0.0892181545495987, 0.12161539494991302, 0.16825592517852783, 0.24740923941135406, 0.9188197255134583)
точность: параметр точности (по умолчанию: 10000) является положительным числовым литералом, который управляет точностью аппроксимации за счет памяти. Более высокое значение точности дает лучшую точность, 1,0/точность является относительной погрешностью аппроксимации.