Подтвердить что ты не робот

ConcurrentHashMap: избежать создания дополнительного объекта с помощью "putIfAbsent"?

Я объединяю несколько значений для ключей в многопоточной среде. Эти ключи не известны заранее. Я думал, что сделаю что-то вроде этого:

class Aggregator {
    protected ConcurrentHashMap<String, List<String>> entries =
                            new ConcurrentHashMap<String, List<String>>();
    public Aggregator() {}

    public void record(String key, String value) {
        List<String> newList =
                    Collections.synchronizedList(new ArrayList<String>());
        List<String> existingList = entries.putIfAbsent(key, newList);
        List<String> values = existingList == null ? newList : existingList;
        values.add(value);
    }
}

Проблема, которую я вижу, заключается в том, что каждый раз, когда этот метод запускается, мне нужно создать новый экземпляр ArrayList, который затем выкидываю (в большинстве случаев). Это кажется необоснованным злоупотреблением сборщиком мусора. Есть ли лучший, потокобезопасный способ инициализации такого типа структуры без использования synchronize метода record? Я несколько удивлен решением о том, чтобы метод putIfAbsent не возвращал вновь созданный элемент, а также отсутствовал способ отложить экземпляр, если он не вызван (так сказать).

4b9b3361

Ответ 1

Java 8 представила API для удовлетворения этой точной проблемы, сделав однострочное решение:

public void record(String key, String value) {
    entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())).add(value);
}

Для Java 7:

public void record(String key, String value) {
    List<String> values = entries.get(key);
    if (values == null) {
        entries.putIfAbsent(key, Collections.synchronizedList(new ArrayList<String>()));
        // At this point, there will definitely be a list for the key.
        // We don't know or care which thread new object is in there, so:
        values = entries.get(key);
    }
    values.add(value);
}

Это стандартный шаблон кода при заполнении ConcurrentHashMap.

Специальный метод putIfAbsent(K, V)) либо поместит ваш объект значения, либо если другой поток попал вам, то он проигнорирует ваше значение объект. В любом случае, после вызова putIfAbsent(K, V)), get(key) гарантированно будет соответствовать между потоками, и поэтому приведенный выше код является потокобезопасным.

Единственные потерянные издержки - если какой-то другой поток добавляет новую запись одновременно для одного и того же ключа: вы можете в конечном итоге выбросить вновь созданное значение, но это произойдет только в том случае, если еще нет записи, и есть что ваша нить теряет, что обычно бывает редко.

Ответ 2

С Java-8 вы можете создавать Multi Maps, используя следующий шаблон:

public void record(String key, String value) { entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())) .add(value); }

Документация ConcurrentHashMap (а не общий контракт) указывает, что ArrayList будет создаваться только один раз для каждого ключа с небольшой начальной задержкой обновлений при создании ArrayList для нового ключа:

http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-

Ответ 3

В конце концов, я внедрил небольшую модификацию ответа @Bohemian. Его предлагаемое решение перезаписывает переменную values вызовом putIfAbsent, что создает ту же проблему, что и раньше. Код, который работает, выглядит следующим образом:

    public void record(String key, String value) {
        List<String> values = entries.get(key);
        if (values == null) {
            values = Collections.synchronizedList(new ArrayList<String>());
            List<String> values2 = entries.putIfAbsent(key, values);
            if (values2 != null)
                values = values2;
        }
        values.add(value);
    }

Это не так элегантно, как хотелось бы, но лучше, чем оригинал, который создает новый экземпляр ArrayList при каждом вызове.

Ответ 4

Созданы две версии, основанные на ответе Gene

public  static <K,V> void putIfAbsetMultiValue(ConcurrentHashMap<K,List<V>> entries, K key, V value) {
    List<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedList(new ArrayList<V>());
        List<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

public  static <K,V> void putIfAbsetMultiValueSet(ConcurrentMap<K,Set<V>> entries, K key, V value) {
    Set<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedSet(new HashSet<V>());
        Set<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

Хорошо работает

Ответ 5

Это проблема, я также искал ответ. Метод putIfAbsent фактически не решает проблему создания дополнительного объекта, он просто гарантирует, что один из этих объектов не заменит другого. Но условия гонки среди потоков могут привести к созданию нескольких объектов. Я мог бы найти 3 решения этой проблемы (и я буду следовать этому порядку предпочтения):

1- Если вы используете Java 8, лучший способ достичь этого - это, вероятно, новый computeIfAbsent метод ConcurrentMap. Вам просто нужно дать ему функцию вычисления, которая будет выполняться синхронно (по крайней мере для реализации ConcurrentHashMap). Пример:

private final ConcurrentMap<String, List<String>> entries =
        new ConcurrentHashMap<String, List<String>>();

public void method1(String key, String value) {
    entries.computeIfAbsent(key, s -> new ArrayList<String>())
            .add(value);
}

Это из javadoc ConcurrentHashMap.computeIfAbsent:

Если указанный ключ еще не связан со значением, попытки вычислить его значение с использованием данной функции отображения и ввести ее на эту карту, если не указано значение null. Выполняется весь вызов метода атомарно, поэтому функция применяется не более одного раза за ключ. Некоторые попытки выполнения обновлений на этой карте другими потоками могут быть блокируется, пока выполняется вычисление, поэтому вычисление должно быть краткие и простые, и не должны пытаться обновлять любые другие сопоставления эта карта.

2- Если вы не можете использовать Java 8, вы можете использовать Guava LoadingCache, который является потокобезопасным. Вы определяете для него функцию загрузки (точно так же, как и функция compute), и вы можете быть уверены, что она будет вызываться синхронно. Пример:

private final LoadingCache<String, List<String>> entries = CacheBuilder.newBuilder()
        .build(new CacheLoader<String, List<String>>() {
            @Override
            public List<String> load(String s) throws Exception {
                return new ArrayList<String>();
            }
        });

public void method2(String key, String value) {
    entries.getUnchecked(key).add(value);
}

3- Если вы не можете использовать Guava, вы можете всегда синхронизировать вручную и делать блокировку с двойным проверкой. Пример:

private final ConcurrentMap<String, List<String>> entries =
        new ConcurrentHashMap<String, List<String>>();

public void method3(String key, String value) {
    List<String> existing = entries.get(key);
    if (existing != null) {
        existing.add(value);
    } else {
        synchronized (entries) {
            List<String> existingSynchronized = entries.get(key);
            if (existingSynchronized != null) {
                existingSynchronized.add(value);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(value);
                entries.put(key, newList);
            }
        }
    }
}

Я сделал пример реализации всех этих трех методов и, кроме того, несинхронизированный метод, который вызывает создание дополнительных объектов: http://pastebin.com/qZ4DUjTr

Ответ 7

Подход с putIfAbsent имеет самое быстрое время выполнения, он от 2 до 50 раз быстрее, чем подход "лямбда" в средах с высоким уровнем конкуренции. Lambda не является причиной этого "powerloss", проблема заключается в обязательной синхронизации внутри computeIfAbsent до оптимизации Java-9.

:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentHashMapTest {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
    private final static int keysSize = 10;
    private final static String[] strings = new String[keysSize];
    static {
        for (int n = 0; n < keysSize; n++) {
            strings[n] = "" + (char) ('A' + n);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testPutIfAbsent();
            testComputeIfAbsentLamda();
        }
    }

    private static void testPutIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.get(s);
                        if (count == null) {
                            count = new AtomicInteger(0);
                            AtomicInteger prevCount = map.putIfAbsent(s, count);
                            if (prevCount != null) {
                                count = prevCount;
                            }
                        }
                        count.incrementAndGet();
                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    private static void testComputeIfAbsentLamda() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
                        count.incrementAndGet();

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

}

Результаты:

Test testPutIfAbsent average time per run: 115.756501 ns
Test testComputeIfAbsentLamda average time per run: 276.9667055 ns
Test testPutIfAbsent average time per run: 134.2332435 ns
Test testComputeIfAbsentLamda average time per run: 223.222063625 ns
Test testPutIfAbsent average time per run: 119.968893625 ns
Test testComputeIfAbsentLamda average time per run: 216.707419875 ns
Test testPutIfAbsent average time per run: 116.173902375 ns
Test testComputeIfAbsentLamda average time per run: 215.632467375 ns
Test testPutIfAbsent average time per run: 112.21422775 ns
Test testComputeIfAbsentLamda average time per run: 210.29563725 ns
Test testPutIfAbsent average time per run: 120.50643475 ns
Test testComputeIfAbsentLamda average time per run: 200.79536475 ns