У меня есть деликатная проблема Spark, где я просто не могу обернуть голову.
У нас есть два RDD (из Cassandra). RDD1 содержит Actions
, а RDD2 содержит данные Historic
. Оба имеют идентификатор, по которому их можно сопоставить/объединить. Но проблема состоит в том, что две таблицы имеют отношение N: N. Actions
содержит несколько строк с одним и тем же идентификатором, а также Historic
. Ниже приведен пример даты из обеих таблиц.
Actions
время - это временная метка
id | time | valueX
1 | 12:05 | 500
1 | 12:30 | 500
2 | 12:30 | 125
Historic
set_at на самом деле является меткой времени
id | set_at| valueY
1 | 11:00 | 400
1 | 12:15 | 450
2 | 12:20 | 50
2 | 12:25 | 75
Как мы можем присоединиться к этим двум таблицам таким образом, чтобы получить такой результат, как этот
1 | 100 # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1 | 50 # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2 | 50 # 125 - 75 for Actions#3 with time 12:30 because H. was in that time at 75
Я не могу придумать хорошее решение, которое кажется правильным, не делая много итераций над огромными наборами данных. Я всегда должен думать о создании диапазона из набора Historic
, а затем как-то проверить, подходит ли Actions
в диапазоне, например (11:00 - 12:15), для расчета. Но это кажется мне довольно медленным. Есть ли более эффективный способ сделать это? Мне кажется, что такая проблема может быть популярной, но пока я не могу найти никаких намеков на это. Как бы вы решили эту проблему в иске?
Мои текущие попытки до сих пор (на половине кода)
case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)
historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...)
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))
// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple