Как мы знаем, итерация по параллельной коллекции по умолчанию не является потокобезопасной, поэтому нельзя использовать:
Set<E> set = Collections.synchronizedSet(new HashSet<>());
//fill with data
for (E e : set) {
process(e);
}
Это происходит, поскольку данные могут быть добавлены во время итерации, потому что нет исключительной блокировки на set
.
Это описано в javadoc Collections.synchronizedSet
:
public static Set synchronizedSet (Set s)
Возвращает синхронизированный (потокобезопасный) набор, поддерживаемый указанным набором. Чтобы гарантировать последовательный доступ, очень важно, чтобы весь доступ к набору резервных копий выполнялся с помощью возвращаемого набора.
Обязательно, чтобы пользователь вручную синхронизировал возвращаемый набор при его повторении:
Установить s = Collections.synchronizedSet(новый HashSet())
...
synchronized (s) { Iterator i = s.iterator(); // Must be in the synchronized block while (i.hasNext()) foo(i.next()); }
Несоблюдение этого совета может привести к детерминированному поведению.
Однако, это не относится к Set.forEach
, который наследует метод по умолчанию forEach
от Iterable.forEach.
Теперь я посмотрел исходный код, и здесь мы видим, что мы имеем следующую структуру:
- Мы запрашиваем
Collections.synchronizedSet()
. -
Мы получаем один:
public static <T> Set<T> synchronizedSet(Set<T> s) { return new SynchronizedSet<>(s); } ... static class SynchronizedSet<E> extends SynchronizedCollection<E> implements Set<E> { private static final long serialVersionUID = 487447009682186044L; SynchronizedSet(Set<E> s) { super(s); } SynchronizedSet(Set<E> s, Object mutex) { super(s, mutex); } public boolean equals(Object o) { if (this == o) return true; synchronized (mutex) {return c.equals(o);} } public int hashCode() { synchronized (mutex) {return c.hashCode();} } }
-
Он расширяет
SynchronizedCollection
, который имеет следующие интересные методы рядом с очевидными:// Override default methods in Collection @Override public void forEach(Consumer<? super E> consumer) { synchronized (mutex) {c.forEach(consumer);} } @Override public boolean removeIf(Predicate<? super E> filter) { synchronized (mutex) {return c.removeIf(filter);} } @Override public Spliterator<E> spliterator() { return c.spliterator(); // Must be manually synched by user! } @Override public Stream<E> stream() { return c.stream(); // Must be manually synched by user! } @Override public Stream<E> parallelStream() { return c.parallelStream(); // Must be manually synched by user! }
Используемый здесь mutex
- это тот же объект, с которым блокируются все операции Collections.synchronizedSet
.
Теперь мы можем, судя по реализации, сказать, что поточно-безопасным использовать Collections.synchronizedSet(...).forEach(...)
, но также ли он безопасен по потоку по спецификации?
(достаточно смутно, Collections.synchronizedSet(...).stream().forEach(...)
не является потокобезопасным по реализации, и вердикт спецификации также кажется неизвестным.)