Подтвердить что ты не робот

Изменение коллекции внутри Spark RDD foreach

Я пытаюсь добавить элементы к карте, итерации элементов RDD. Я не получаю никаких ошибок, но изменений не происходит.

Все работает отлично, добавляя напрямую или итерируя другие коллекции:

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Но когда я пытаюсь сделать то же самое из RDD:

scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Я пробовал распечатать содержимое карты так же, как и перед foreach, чтобы убедиться, что переменная такая же, и она печатает правильно:

fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

Я также напечатал измененный элемент карты внутри кода foreach, и он печатает как измененный, но когда операция завершена, карта кажется немодифицированной.

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Преобразование RDD в массив (сбор) также отлично работает:

fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

Является ли это проблемой контекста? Получаю ли я доступ к копии данных, которые меняются где-то еще?

4b9b3361

Ответ 1

Он становится яснее при работе на кластере Spark (не на одной машине). RDD теперь распространяется по нескольким машинам. Когда вы вызываете foreach, вы сообщаете каждой машине, что делать с частью RDD, которую она имеет. Если вы ссылаетесь на любые локальные переменные (например, myMap), они становятся сериализованными и отправляются на машины, поэтому они могут его использовать. Но ничего не возвращается. Таким образом, ваша оригинальная копия myMap не изменяется.

Я думаю, что это отвечает на ваш вопрос, но, очевидно, вы пытаетесь что-то сделать, и вы не сможете туда добраться. Не стесняйтесь объяснять здесь или в отдельном вопросе, что вы пытаетесь сделать, и я постараюсь помочь.