Подтвердить что ты не робот

Spark: как присоединиться к RDD по временному диапазону

У меня есть деликатная проблема Spark, где я просто не могу обернуть голову.

У нас есть два RDD (из Cassandra). RDD1 содержит Actions, а RDD2 содержит данные Historic. Оба имеют идентификатор, по которому их можно сопоставить/объединить. Но проблема состоит в том, что две таблицы имеют отношение N: N. Actions содержит несколько строк с одним и тем же идентификатором, а также Historic. Ниже приведен пример даты из обеих таблиц.

Actions время - это временная метка

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic set_at на самом деле является меткой времени

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75

Как мы можем присоединиться к этим двум таблицам таким образом, чтобы получить такой результат, как этот

1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1   |  50   # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2   |  50   # 125 - 75  for Actions#3 with time 12:30 because H. was in that time at 75

Я не могу придумать хорошее решение, которое кажется правильным, не делая много итераций над огромными наборами данных. Я всегда должен думать о создании диапазона из набора Historic, а затем как-то проверить, подходит ли Actions в диапазоне, например (11:00 - 12:15), для расчета. Но это кажется мне довольно медленным. Есть ли более эффективный способ сделать это? Мне кажется, что такая проблема может быть популярной, но пока я не могу найти никаких намеков на это. Как бы вы решили эту проблему в иске?

Мои текущие попытки до сих пор (на половине кода)

case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)

historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...) 
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))

// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple
4b9b3361

Ответ 1

Это интересная проблема. Я также некоторое время выяснял подход. Вот что я придумал:

Для классов case для Action(id, time, x) и Historic(id, time, y)

  • Присоединитесь к действиям с историей (это может быть тяжело)
  • фильтровать все исторические данные, не относящиеся к данному действию.
  • отображает результаты по (id, time) - различает один и тот же ключ в разное время
  • уменьшить историю с помощью действия до максимального значения, оставив нам соответствующую историческую запись для данного действия

In Spark:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

Используя приведенные выше данные, отчет выглядит следующим образом:

report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(я преобразовал время в секундах, чтобы иметь упрощенную метку времени)

Ответ 2

После нескольких часов размышлений, попыток и неудач я придумал это решение. Я не уверен, что это хорошо, но из-за отсутствия других вариантов это мое решение.

Сначала мы расширим наш case class Historic

case class Historic(id: String, set_at: Long, valueY: Int) {
  val set_at_map = new java.util.TreeMap[Long, Int]() // as it seems Scala doesn't provides something like this with similar operations we'll need a few lines later
  set_at_map.put(0, valueY) // Means from the beginning of Epoch ...
  set_at_map.put(set_at, valueY) // .. to the set_at date

  // This is the fun part. With .getHistoricValue we can pass any timestamp and we will get the a value of the key back that contains the passed date. For more information look at this answer: http://stackoverflow.com/a/13400317/1209327
  def getHistoricValue(date: Long) : Option[Int] = {
    var e = set_at_map.floorEntry(date)                                   
    if (e != null && e.getValue == null) {                                  
      e = set_at_map.lowerEntry(date)                                     
    }                                                                         
    if ( e == null ) None else e.getValue()
  }
}

Класс case готов, и теперь мы приводим его в действие

val historicRDD = sc.cassandraTable[Historic](...)
  .map( row => ( row.id, row ) )
  .reduceByKey( (row1, row2) =>  {
    row1.set_at_map.put(row2.set_at, row2.valueY) // we add the historic Events up to each id
    row1
  })

// Now we load the Actions and map it by id as we did with Historic
val actionsRDD = sc.cassandraTable[Actions](...)
  .map( row => ( row.id, row ) )

// Now both RDDs have the same key and we can join them
val fin = actionsRDD.join(historicRDD)
  .map( row => {
    ( row._1.id, 
      (
        row._2._1.id, 
        row._2._1.valueX - row._2._2.getHistoricValue(row._2._1.time).get // returns valueY for that timestamp
      )
    )
  })

Я совершенно не знаком с Scala, поэтому, пожалуйста, дайте мне знать, можем ли мы улучшить этот код в каком-то месте.

Ответ 3

Я знаю, что на этот вопрос был дан ответ, но я хочу добавить другое решение, которое сработало для меня -

ваши данные -

Actions 
id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic 
id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75
  • Союз Actions и Historic
    Combined
    id  |  time  | valueX | record-type
    1   |  12:05 | 500    | Action
    1   |  12:30 | 500    | Action
    2   |  12:30 | 125    | Action
    1   |  11:00 | 400    | Historic
    1   |  12:15 | 450    | Historic
    2   |  12:20 | 50     | Historic
    2   |  12:25 | 75     | Historic
  1. Напишите пользовательский разделитель и используйте repartitionAndSortWithinPartitions для разделения на id, но отсортируйте его по time.

    Partition-1
    1   |  11:00 | 400    | Historic
    1   |  12:05 | 500    | Action
    1   |  12:15 | 450    | Historic
    1   |  12:30 | 500    | Action
    Partition-2
    2   |  12:20 | 50     | Historic
    2   |  12:25 | 75     | Historic
    2   |  12:30 | 125    | Action
    
  2. Пройдите через записи на раздел.

Если это запись Historical, добавьте ее на карту или обновите карту, если она уже имеет этот идентификатор - отслеживайте последнюю valueY за id, используя карту для каждого раздела.

Если это запись Action, получите значение valueY из карты и вычтите ее из valueX

Отображение M

Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75

Вы можете попробовать разбивать и сортировать по time, но вам нужно объединить разделы позже. И это может добавить к некоторой сложности.

Это я нашел предпочтительным для соединения "многие-ко-многим", которое мы обычно получаем при использовании диапазонов времени для объединения.