Подтвердить что ты не робот

Создайте цепочку состояния для разных событий и назначьте глобальный идентификатор в искровом режиме

Мы работаем с spark 1.6, и мы пытаемся сохранить глобальную идентичность для подобных событий. Могут быть несколько "групп" событий с идентичным ID (в примере как число. Буквы добавляются только для уникальности). И мы знаем, что некоторые из этих событий похожи, поэтому мы можем их подключить. Мы хотим сохранить что-то вроде:

Z -> 1, 2, 3
X -> 4

поэтому в будущем, если появятся некоторые события с id 4, мы можем назначить X как глобальный идентификатор.

Пожалуйста, проверьте пример для лучшей иллюстрации:

Скажем, у нас есть некоторые потоковые данные, поступающие в искровую работу.

1a
1b
2c
2d
2e
3f
3g
3h
4i

Поскольку событие 1 является нашим первым появлением, мы хотим назначить 1 to Z. Далее мы знаем, что 1b и 2c аналогичны. поэтому мы хотим сохранить где-нибудь 2->1 отображение. То же самое для 2e и 3f, поэтому нам нужно отобразить 3-2. Итак, на данный момент мы имеем 3 пары 1->Z, 2->1, 3->2.

И мы хотим создать "исторический" путь: Z <- 1 <- 2 <- 3 В конце мы будем иметь все события с ID = Z.

1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> X

Мы пытались использовать mapwithstate, но единственное, что мы могли сделать, это 2->1 и 3->2. С mapwithstate нам не удалось получить состояние для "родителя" в состоянии для текущего события - например. текущее событие 3 с родительским 2 и не может получить 2 -> 1 и ни 1 -> Z.

Возможно ли для этого иметь какое-то глобальное отображение? Мы уже пробовали аккумуляторы и трансляцию, но выглядели не очень подходящими. И мы не смогли заменить события 1 для первого отображения и события 2 для второго отображения с помощью Z.

Если появится новое событие 5 и похоже на 3h, нам нужно снова присвоить отображение 5->Z.

4b9b3361

Ответ 1

Далее следует решение для данной проблемы с использованием изменяемой ссылки на "состояние" RDD, которое мы каждый раз обновляем новыми результатами.

Мы используем transform для того, чтобы пометить поток входящих событий уникальным глобальным идентификатором, выполнив объединение сходства. Это объединение "вручную", где мы используем произведение двух наборов данных и сравниваем каждую парную пару.

Обратите внимание, что это дорогостоящий процесс. Есть много частей, которые можно было бы изменить, в зависимости от конкретных характеристик ожидаемого потока. Например, мы могли бы заменить глобальное состояние RDD на локальный map и применять к ним map-side для более быстрого объединения подобия, но это очень зависит от ожидаемой мощности множества уникальных идентификаторов.

Это было сложнее, чем я ожидал. Возьмите его только как отправную точку в направлении более надежного решения. Например, операция union в состоянии RDD требует регулярной контрольной проверки, чтобы избежать увеличения DAG вне контроля. (Там есть много возможностей для улучшения, но это выходит за рамки разумных усилий, чтобы дать ответ.)

Здесь я набросаю суть решения, для полного тестового ноутбука см. UniqueGlobalStateChains.snb

// this mutable reference points to the `states` that we keep across interations    
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD

// we assume an incoming Event stream. Here we prepare it for the global id-process 
@transient val eventsById = eventStream.map(event => (event.id, event))
@transient val groupedEvents = eventsById.groupByKey()

// this is the core of the solution. 
// We transform the incoming events into tagged events. 
// As a by-product, the mutable `states` reference will get updated with the latest state mapping. 
// the "chain" of events can be reconstructed ordering the states by timestamp

@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => 
    val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}                        
    val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}

    val newEventIds = events.keys // let extract the ids of the incoming (grouped) events
    val similarityJoinMap = newEventIds.cartesian(currentMappings)
        .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}
        .collectAsMap
    //val similarityBC = sparkContext.broadcast(similarityJoinMap)                   
    val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))
    newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids

    val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => 
                                      events.map(event => (event.id,event.payload, globalKey))
                                     }
    val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}
    currentState = newStates              
    states.unpersist(false)                              
    states = newStates.union(states)
    states.cache()                              
    newTaggedEvents
    }

Учитывая эту последовательность ввода:

"1|a,1|b,3|c",  "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h"

Получаем:

Отмеченные события с глобальным идентификатором:

---
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819
---
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180
---
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819
---
5|k: gen-5819
---
1|g: gen-2635,4|f: gen-4180
---
6|h: gen-5819

И мы можем восстановить цепочку событий, которые получены из глобального id:

gen-4180: 1<-2<-3<-4
gen-2635: 1
gen-5819: 3<-4<-5<-6

-o -