Подтвердить что ты не робот

Обновление Scala коллекций поточно-безопасно

Мне было интересно, есть ли "простой" способ безопасно обновлять неизменяемые коллекции scala. Рассмотрим следующий код:

class a {
   private var x = Map[Int,Int]()

   def update(p:(Int,Int)) { x = x + (p) }
}

Этот код не является потокобезопасным, правильно? Под этим я подразумеваю, что если у нас есть два потока, вызывающих метод обновления, и мы можем сказать, что x - это карта, содержащая {1 = > 2}, а поток A вызывает обновление ((3,4)) и ему только удается выполнить x + (p) часть кода. Затем происходит перенастройка и поток B вызывает обновление ((13,37)) и успешно обновляет переменную x. Нить А продолжается и заканчивается.

После того, как все это закончится, значение x будет равно карте, содержащей {1 = > 2, 3 = > 4}, правильно? Вместо желаемого {1 = > 2, 3 = > 4, 13 = > 37}. Есть ли простой способ исправить это? Я надеюсь, что это неудивительно, что я прошу:)

Кстати, я знаю, что есть такие решения, как Akka STM, но я бы предпочел не использовать их, если это необходимо.

Спасибо за любой ответ!

edit: Кроме того, я бы предпочел решение без блокировки. Eeeew:)

4b9b3361

Ответ 1

В вашем случае, как писал Маурисио, ваша коллекция уже потокобезопасна, потому что она неизменна. Единственная проблема заключается в переназначении var, который может быть не атомной операцией. Для этой конкретной проблемы самым простым вариантом является использование хороших классов в java.util.concurrent.atomic, а именно AtomicReference.

import java.util.concurrent.atomic.AtomicReference

class a {
  private val x = new AtomicReference(Map[Int,Int]())

  def update(p:(Int,Int)) {
    while (true) {
      val oldMap = x.get // get old value
      val newMap = oldMap + p // update
      if (x.compareAndSet(oldMap, newMap))
        return // exit if update was successful, else repeat
    }
  }
}

Ответ 3

Сама коллекция является потокобезопасной, поскольку она не имеет общего измененного состояния, но ваш код отсутствует, и нет возможности исправить это без блокировки, так как у вас есть общее изменяемое состояние. Ваш лучший вариант - заблокировать сам метод, обозначая его как синхронизированный.

Другим решением будет использование изменчивой параллельной карты, возможно java.util.concurrent.ConcurrentMap.

Ответ 4

Re. Ответ Jean-Philippe Pellet: вы можете сделать это немного более удобным для использования:

def compareAndSetSync[T](ref: AtomicReference[T])(logic: (T => T)) {
  while(true) {
    val snapshot = ref.get
    val update = logic(snapshot)
    if (ref.compareAndSet(snapshot, update)) return
  }
}

def compareSync[T,V](ref: AtomicReference[T])(logic: (T => V)): V = {
  var continue = true
  var snapshot = ref.get
  var result = logic(snapshot)
  while (snapshot != ref.get) {
    snapshot = ref.get
    result = logic(snapshot)
  }
  result
}