Подтвердить что ты не робот

Как скопировать итератор в Scala?

О дубликате

Это не дубликат Как клонировать итератор?

Пожалуйста, не слепо закрывайте этот вопрос, все ответы, заданные в так называемой дублированной DO NOT работе. ОП отвечает за другую проблему, и, очевидно, ответы соответствуют ЕГО проблеме, но не моей.

Не каждый подобный вопрос является дубликатом, есть такая функция, как "вопрос расширения" на SE, единственный способ - попросить снова на одном и том же тему получить разные, рабочие, ответы.

Проблема

У меня есть итератор. Я хотел бы получить копию (дубликат), поэтому я мог бы продолжить оригинал и полностью копировать самостоятельно.

Внимание!

Копирование с помощью отражения или сериализации не работает (штраф за производительность).

Пример

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

var it2 = it1   // (*)
it2.next()

println(it1.next())

Это будет просто ссылаться на it1, поэтому при изменении it1 изменяется it2 и наоборот.

В приведенном выше примере используется Список, в настоящее время я борюсь с HashMap, но вопрос общий, просто итератор.

Подход № 1

Если вы отредактируете строку (*) и напишите:

var it2 = it1.toList.iterator

(это было предложено как решение в связанном вопросе) исключение бросается при выполнении программы.

Подход № 2

"Вы берете список и...". Нет, не знаю. У меня нет списка, у меня есть итератор. В общем, я ничего не знаю о коллекции, которая лежит в основе итератора, единственное, что у меня есть, это итератор. Я должен "разветкить" его.

4b9b3361

Ответ 1

Вы не можете дублировать итератор, не уничтожая его. Контракт для iterator заключается в том, что он может быть пройден только один раз.

Вопрос, на который вы ссылаетесь, показывает, как получить две копии в обмен на тот, который вы уничтожили. Вы не можете продолжать использовать оригинал, но теперь вы можете запускать две новые копии вперед независимо.

Ответ 2

Очень легко создать итератор List, который вы можете дублировать, не уничтожая его: это в основном определение метода iterator, скопированного из источника List с добавленным методом fork:

class ForkableIterator[A] (list: List[A]) extends Iterator[A] {
    var these = list
    def hasNext: Boolean = !these.isEmpty
    def next: A = 
      if (hasNext) {
        val result = these.head; these = these.tail; result
      } else Iterator.empty.next
    def fork = new ForkableIterator(these)
}

Использование:

scala> val it = new ForkableIterator(List(1,2,3,4,5,6))
it: ForkableIterator[Int] = non-empty iterator

scala> it.next
res72: Int = 1

scala> val it2 = it.fork
it2: ForkableIterator[Int] = non-empty iterator

scala> it2.next
res73: Int = 2

scala> it2.next
res74: Int = 3

scala> it.next
res75: Int = 2

Я посмотрел на это для HashMap, но он кажется более сложным (отчасти потому, что в зависимости от размера коллекции существуют разные реализации карт). Поэтому, вероятно, лучше всего использовать приведенную выше реализацию на yourMap.toList.

Ответ 3

Как сказал Рекс, невозможно сделать копию Итератора, не разрушая ее. Тем не менее, в чем проблема с duplicate?

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

val (it1a, it1b) = it1.duplicate
it1 = it1a
var it2 = it1b
it2.next()

println(it1.next())

Ответ 4

Я думаю, что это очень хороший вопрос, жаль, что многие не поняли значение проблемы. В эпоху больших данных существует множество ситуаций, когда у нас есть поток, а не выделенный список данных, которые невозможно собрать или поместить в память. И повторение этого с самого начала обходится слишком дорого. Что мы можем сделать, если нам нужно два (или более) отдельных расчета с данными? Например, нам может понадобиться вычислить min, max, sum, md5 и т.д., Используя уже написанные функции, с одним проходом в разных потоках.

Общее решение - использовать Akka-Stream. Это сделает это. Но возможно ли с помощью Iterator, что является самым простым способом в Java/Scala представлять такой потоковый источник данных? Ответ - да, хотя мы "НЕ МОЖЕМ приступить к оригиналу и копировать полностью независимо", что означает, что мы должны синхронизировать скорости потребления каждого потока потребителя. (Akka-Stream делает это, используя противодавление и некоторые промежуточные буферы).

Итак, вот мое простое решение: использовать Phaser. С его помощью мы можем сделать обертку Iterator по однопроходному источнику. Этот объект должен использоваться в каждом потоке потребителей как простой итератор. Используя его, вы должны знать количество потребляющих потоков заранее. Также каждый потребительский поток ДОЛЖЕН истощать источник до конца, чтобы избежать зависания всех оверов (например, с помощью метода flush()).

import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicBoolean

// it0 - input source iterator
// num - exact number of consuming threads. We have to know it in advance.
case class ForkableIterator[+A]( it0: Iterator[A], num: Int ) extends Phaser(num) with Iterator[A] {

  val it = it0.flatMap( Stream.fill(num)(_) )  // serial replicator

  private var hasNext0 = new AtomicBoolean( it0.hasNext )
  override def hasNext: Boolean = hasNext0.get()

  override def next(): A = {
    arriveAndAwaitAdvance()
    val next = it.synchronized {
      val next = it.next()
      if (hasNext0.get) hasNext0.set(it.hasNext)
      next
    }
    arriveAndAwaitAdvance() // otherwise the tasks locks at the end the last data element
    next
  }

  // In case that a consumer gives up to read before the end of its source data stream occurs
  // it HAVE to drain the last to avoid block others. (Note: Phaser has no "unregister" method?).
  // Calling it may be avoided if all consumers read exactly the same amount of data,
  // e.g. until the very end of it.
  def flush(): Unit = while (hasNext) next()
}

PS Этот "ForkableIterator" был успешно использован мной вместе со Spark для выполнения нескольких независимых агрегаций по длинному потоку исходных данных. В таком случае я не беспокоюсь о создании потоков вручную. Вы также можете использовать Scala Futures/Monix Tasks и т.д.

PSPS Теперь я перепроверил спецификацию JDK Phaser и обнаружил, что в действительности у него есть "незарегистрированный" метод с именем ArriAndDeregister(). Так что используйте его вместо flush(), если потребитель завершит работу.