Подтвердить что ты не робот

Синхронизация Java на основе параметра (с именем mutex/lock)

Я ищу способ синхронизации метода, основанного на полученном параметре, примерно так:

public synchronized void doSomething(name){
//some code
}

Я хочу, чтобы метод doSomething был синхронизирован на основе параметра name следующим образом:

Тема 1: doSomething ( "a" );

Тема 2: doSomething ( "b" );

Тема 3: doSomething ( "c" );

Тема 4: doSomething ( "a" );

Thread 1, Thread 2 и Thread 3 будут выполнять код без синхронизации, но Thread 4 будет ждать завершения Thread 1, потому что он имеет то же самое значение "a" .

Спасибо

UPDATE

Основываясь на объяснении Tudor, я думаю, что у меня другая проблема: здесь приведен пример нового кода:

private HashMap locks=new HashMap();
public void doSomething(String name){
    locks.put(name,new Object());
    synchronized(locks.get(name)) {
        // ...
    }
    locks.remove(name);
}

Причина, по которой я не заполняю карту блокировок, состоит в том, что имя может иметь любое значение.

Исходя из вышеприведенного примера, проблема может возникнуть при добавлении/удалении значений из хэш-карты несколькими потоками в одно и то же время, поскольку HashMap не является потокобезопасным.

Итак, мой вопрос: если я создаю HashMap a ConcurrentHashMap, который является потокобезопасным, синхронизированный блок остановит другие потоки от доступа к locks.get(name)??

4b9b3361

Ответ 1

Используйте карту для связывания строк с объектами блокировки:

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());
// etc.

то

public void doSomething(String name){
    synchronized(locks.get(name)) {
        // ...
    }
}

Ответ 2

Ответ Tudor в порядке, но он статический и не масштабируемый. Мое решение является динамичным и масштабируемым, но оно связано с повышенной сложностью в реализации. Внешний мир может использовать этот класс так же, как с помощью Lock, поскольку этот класс реализует интерфейс. Вы получаете экземпляр параметризованной блокировки с помощью метода factory getCanonicalParameterLock.

package lock;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public final class ParameterLock implements Lock {

    /** Holds a WeakKeyLockPair for each parameter. The mapping may be deleted upon garbage collection
     * if the canonical key is not strongly referenced anymore (by the threads using the Lock). */
    private static final Map<Object, WeakKeyLockPair> locks = new WeakHashMap<>();

    private final Object key;
    private final Lock lock;

    private ParameterLock (Object key, Lock lock) {
        this.key = key;
        this.lock = lock;
    }

    private static final class WeakKeyLockPair {
        /** The weakly-referenced parameter. If it were strongly referenced, the entries of
         * the lock Map would never be garbage collected, causing a memory leak. */
        private final Reference<Object> param;
        /** The actual lock object on which threads will synchronize. */
        private final Lock lock;

        private WeakKeyLockPair (Object param, Lock lock) {
            this.param = new WeakReference<>(param);
            this.lock = lock;
        }
    }

    public static Lock getCanonicalParameterLock (Object param) {
        Object canonical = null;
        Lock lock = null;

        synchronized (locks) {
            WeakKeyLockPair pair = locks.get(param);            
            if (pair != null) {                
                canonical = pair.param.get(); // could return null!
            }
            if (canonical == null) { // no such entry or the reference was cleared in the meantime                
                canonical = param; // the first thread (the current thread) delivers the new canonical key
                pair = new WeakKeyLockPair(canonical, new ReentrantLock());
                locks.put(canonical, pair);
            }
        }

        // the canonical key is strongly referenced now...
        lock = locks.get(canonical).lock; // ...so this is guaranteed not to return null
        // ... but the key must be kept strongly referenced after this method returns,
        // so wrap it in the Lock implementation, which a thread of course needs
        // to be able to synchronize. This enforces a thread to have a strong reference
        // to the key, while it isn't aware of it (as this method declares to return a 
        // Lock rather than a ParameterLock).
        return new ParameterLock(canonical, lock);               
    }

    @Override
    public void lock() {
        lock.lock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock.lockInterruptibly();
    }

    @Override
    public boolean tryLock() {
        return lock.tryLock();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return lock.tryLock(time, unit);
    }

    @Override
    public void unlock() {
        lock.unlock();
    }

    @Override
    public Condition newCondition() {
        return lock.newCondition();
    }
}

Конечно, вам нужен канонический ключ для данного параметра, иначе потоки не будут синхронизированы, так как они будут использовать другой Lock. Канонизация - это эквивалент интернализации строк в решении Тудора. Где String.intern() сам по себе потокобезопасен, мой "канонический пул" - нет, поэтому мне нужна дополнительная синхронизация на WeakHashMap.

Это решение работает для любого типа объекта. Однако убедитесь, что правильно выполняете equals и hashCode в пользовательских классах, потому что если нет, проблемы с потоками будут возникать, поскольку несколько потоков могут использовать разные объекты Lock для синхронизации!

Выбор для WeakHashMap объясняется простотой управления памятью, которую он приносит. Как еще можно было бы знать, что нить не использует конкретный Lock больше? И если это можно было бы узнать, как вы могли бы безопасно удалить запись из Карты? Вам нужно будет синхронизировать при удалении, потому что у вас есть условие гонки между приходящим потоком, который хочет использовать блокировку, и действие по удалению блокировки с карты. Все эти вещи просто решаются с использованием слабых ссылок, поэтому VM делает для вас работу, и это значительно упрощает реализацию. Если вы проверили API WeakReference, вы обнаружите, что использование слабых ссылок является потокобезопасным.

Теперь проверьте эту тестовую программу (вам нужно запустить ее из класса ParameterLock из-за личной видимости некоторых полей):

public static void main(String[] args) {
    Runnable run1 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Runnable run2 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Thread t1 = new Thread(run1);
    Thread t2 = new Thread(run2);

    t1.start();
    t2.start();

    try {
        t1.join();
        t2.join();
        while (locks.size() != 0) {
            System.gc();
            System.out.println(locks);
        }
        System.out.println("FINISHED!");
    } catch (InterruptedException ex) {
        // those threads won't be interrupted
    }
}

private static void sync (Object param) {
    Lock lock = ParameterLock.getCanonicalParameterLock(param);
    lock.lock();
    try {
        System.out.println("Thread="+Thread.currentThread().getName()+", lock=" + ((ParameterLock) lock).lock);
        // do some work while having the lock
    } finally {
        lock.unlock();
    }        
}

Шансы очень высоки, что вы увидите, что оба потока используют один и тот же объект блокировки, и поэтому они синхронизированы. Пример вывода:

Thread=Thread-0, [email protected][Locked by thread Thread-0]
Thread=Thread-1, [email protected][Locked by thread Thread-1]
FINISHED!

Однако, с некоторым шансом, возможно, что 2 потока не перекрываются при выполнении, поэтому не требуется, чтобы они использовали одну и ту же блокировку. Вы можете легко применить это поведение в режиме отладки, установив точки останова в правильных местах, заставляя первый или второй поток останавливаться там, где это необходимо. Вы также заметите, что после коллекции Garbage в основном потоке WeakHashMap будет очищен, что, конечно, правильно, так как основной поток ждал, пока оба рабочих потока закончат свою работу, позвонив Thread.join(), прежде чем вызывать сборщик мусора. Это действительно означает, что сильная ссылка на блокировку параметра не может существовать в рабочем потоке, поэтому ссылка может быть удалена из слабой хэш-карты. Если другой поток теперь хочет синхронизировать один и тот же параметр, в синхронизированной части в getCanonicalParameterLock будет создан новый замок.

Теперь повторите тест с любой парой, которая имеет такое же каноническое представление (= они равны, поэтому a.equals(b)), и увидите, что он все еще работает:

sync("a");
sync(new String("a"))

sync(new Boolean(true));
sync(new Boolean(true));

и др.

В принципе, этот класс предлагает вам следующие функции:

  • Параметрированная синхронизация
  • Управление инкапсулированной памятью
  • Возможность работать с любым типом объекта (при условии правильной реализации equals и hashCode)
  • Реализует интерфейс блокировки

Эта реализация блокировки была протестирована путем изменения ArrayList одновременно с 10 потоками, итерирующими 1000 раз, делая это: добавление 2-х элементов, а затем удаление последней найденной записи списка путем повторения полного списка. За каждую итерацию запрашивается блокировка, поэтому будет запрошено 10 * 1000 блокировок. Не было выбрано исключение ConcurrentModificationException, и после того, как все рабочие потоки закончили, общее количество элементов составило 10 * 1000. При каждой модификации блокировка запрашивалась при вызове ParameterLock.getCanonicalParameterLock(new String("a")), поэтому для проверки правильности канонизации используется новый объект параметра.

Обратите внимание, что вам не следует использовать строковые литералы и примитивные типы для параметров. Поскольку строковые литералы автоматически интернированы, они всегда имеют сильную ссылку, поэтому, если первый поток приходит с литералом String для его параметра, тогда пул блокировки никогда не будет освобожден от записи, что является утечкой памяти. То же самое относится к автобоксирующим примитивам: например. Integer имеет механизм кэширования, который будет повторно использовать существующие объекты Integer во время процесса autoboxing, также вызывая сильную ссылку на существование. Однако, обращаясь к этому, это совсем другая история.

Ответ 3

Проверьте эту структуру. Кажется, вы ищете что-то вроде этого.

public class WeatherServiceProxy {
...
private final KeyLockManager lockManager = KeyLockManagers.newManager();

public void updateWeatherData(String cityName, Date samplingTime, float temperature) {
        lockManager.executeLocked(cityName, new LockCallback() {
                public void doInLock() {
                        delegate.updateWeatherData(cityName, samplingTime, temperature);
                }
        });
}

https://code.google.com/p/jkeylockmanager/

Ответ 4

Я создал tokenProvider на основе IdMutexProvider от McDowell. Менеджер использует WeakHashMap, который выполняет очистку неиспользуемых блокировок.

Вы можете найти мою реализацию здесь.

Ответ 5

Я использовал кеш для хранения объектов блокировки. Мой кеш будет истекать через период, который действительно должен быть только дольше, чем время выполнения синхронизированного процесса

`

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

...

private final Cache<String, Object> mediapackageLockCache = CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();

...

public void doSomething(foo) {
    Object lock = mediapackageLockCache.getIfPresent(foo.toSting());
    if (lock == null) {
        lock = new Object();
        mediapackageLockCache.put(foo.toString(), lock);
    }

    synchronized(lock) {
        // execute code on foo
        ...
    }
}

`

Ответ 6

Я нашел правильный ответ через другой вопрос stackoverflow: Как получить блокировку с помощью ключа

Я скопировал ответ здесь:

У Guava есть что-то подобное, выпущенное в 13.0; вы можете получить его из HEAD, если хотите.

Полосатый более или менее выделяет определенное количество блокировок, а затем назначает строки блокировкам на основе их хэш-кода. API выглядит примерно так:

Striped<Lock> locks = Striped.lock(stripes);
Lock l = locks.get(string);
l.lock();
try {
  // do stuff 
} finally {
  l.unlock();
}

Более или менее контролируемое количество полос позволяет вам торговать concurrency с использованием памяти, поскольку распределение полной блокировки для каждого строкового ключа может стать дорогостоящим; по сути, вы получаете только блокировку, когда получаете хеш-коллизии, которые (как предсказуемо) редки.

Ответ 7

У меня гораздо более простая масштабируемая реализация, аналогичная @timmons post, использующая guavas LoadingCache с weakValues. Вы захотите прочитать файлы справки о "равенстве", чтобы понять предложение, которое я сделал.

Определите следующий слабый ценный кеш.

private final LoadingCache<String,String> syncStrings = CacheBuilder.newBuilder().weakValues().build(new CacheLoader<String, String>() {
    public String load(String x) throws ExecutionException {
        return new String(x);
    }
});

public void doSomething(String x) {
      x = syncStrings.get(x);
      synchronized(x) {
          ..... // whatever it is you want to do
      }
}

Теперь! В результате JVM нам не нужно беспокоиться о том, что кеш становится слишком большим, он сохраняет только кешированные строки, насколько это необходимо, и менеджер мусора /guava делает тяжелый подъем.

Ответ 8

TL; DR:

Я использую ConcurrentReferenceHashMap из Spring Framework. Пожалуйста, проверьте код ниже.


Хотя эта тема старая, она все еще интересная. Поэтому я хотел бы поделиться своим подходом с Spring Framework.

То, что мы пытаемся реализовать, называется mutex/lock. Как следует из ответа Тудора, идея состоит в том, чтобы иметь Map для хранения имени замка и объекта замка. Код будет выглядеть так (я скопировал его из ответа):

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());

Однако у этого подхода есть 2 недостатка:

  1. ОП уже указал первый: как синхронизировать доступ к хэш-карте locks?
  2. Как снять некоторые блокировки, которые больше не нужны? В противном случае карта хешей locks будет продолжать расти.

Первая проблема может быть решена с помощью ConcurrentHashMap. Для второй проблемы у нас есть 2 варианта: вручную проверить и удалить блокировки с карты или каким-то образом сообщить сборщику мусора, какие блокировки больше не используются, и GC удалит их. Я пойду со вторым путем.

Когда мы используем HashMap или ConcurrentHashMap, он создает сильные ссылки. Чтобы реализовать решение, рассмотренное выше, вместо этого следует использовать слабые ссылки (чтобы понять, что такое сильная/слабая ссылка, обратитесь к этой статье или этой публикации).


Итак, я использую ConcurrentReferenceHashMap из Spring Framework. Как описано в документации:

ConcurrentHashMap который использует мягкие или слабые ссылки для ключей и значений.

Этот класс может использоваться в качестве альтернативы Collections.synchronizedMap(new WeakHashMap<K, Reference<V>>()) для поддержки лучшей производительности при одновременном доступе. Эта реализация следует тем же ограничениям дизайна, что и ConcurrentHashMap за исключением того, что поддерживаются нулевые значения и нулевые ключи.

Вот мой код MutexFactory управляет всеми замками с <K> типом ключа.

@Component
public class MutexFactory<K> {

    private ConcurrentReferenceHashMap<K, Object> map;

    public MutexFactory() {
        this.map = new ConcurrentReferenceHashMap<>();
    }

    public Object getMutex(K key) {
        return this.map.compute(key, (k, v) -> v == null ? new Object() : v);
    }
}

Использование:

@Autowired
private MutexFactory<String> mutexFactory;

public void doSomething(String name){
    synchronized(mutexFactory.getMutex(name)) {
        // ...
    }
}

Модульный тест (этот тест использует библиотеку awaitility для некоторых методов, например, await(), atMost(), atMost() until()):

public class MutexFactoryTests {
    private final int THREAD_COUNT = 16;

    @Test
    public void singleKeyTest() {
        MutexFactory<String> mutexFactory = new MutexFactory<>();
        String id = UUID.randomUUID().toString();
        final int[] count = {0};

        IntStream.range(0, THREAD_COUNT)
                .parallel()
                .forEach(i -> {
                    synchronized (mutexFactory.getMutex(id)) {
                        count[0]++;
                    }
                });
        await().atMost(5, TimeUnit.SECONDS)
                .until(() -> count[0] == THREAD_COUNT);
        Assert.assertEquals(count[0], THREAD_COUNT);
    }
}