Подтвердить что ты не робот

Комбинатор потока Java 8 никогда не назывался

Я пишу пользовательский сборщик java 8, который должен вычислять среднее значение POJO, которое имеет метод getValue(). Здесь код:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };

Все работает хорошо в непараллельном случае. Однако, когда я использую parallelStream(), он иногда не работает. Например, учитывая значения от 1 до 10, он вычисляет (53/9 вместо 55/10). При отладке отладчик никогда не ударяет точку останова в функции combiner(). Есть ли какой-то флаг, который мне нужно установить?

4b9b3361

Ответ 1

Похоже, проблема заключается в характеристике CONCURRENT, которая делает что-то еще, чем вы могли бы подумать:

Указывает, что этот коллектор является параллельным, что означает, что   контейнер результата может поддерживать функцию аккумулятора, являющуюся   вызываемый одновременно с одним и тем же контейнером результата из нескольких   потоки.

Вместо вызова объединителя аккумулятор вызывается одновременно, используя тот же BigDecimal[] a для всех потоков. Доступ к a не является атомарным, поэтому он идет не так:

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]

Создание значения a[0] 7, когда оно должно быть 10. То же самое может произойти с a[1], поэтому результаты могут быть непоследовательными.


Если вы удалите атрибут CONCURRENT, вместо этого будет использоваться комбайнер.

Ответ 2

Ну, это именно то, что вы запрашиваете при указании Characteristics.CONCURRENT:

Указывает, что этот коллектор является параллельным, что означает, что контейнер результатов может поддерживать функцию аккумулятора, вызываемую одновременно с одним и тем же контейнером результатов из нескольких потоков.

Если это не так, как с вашим Collector, вы не должны указывать этот флаг.


В качестве побочного примечания, new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED)); является довольно неэффективным для задания характеристик. Вы можете просто использовать EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED). Когда вы удаляете неправильный параллельный признак, вы можете использовать либо EnumSet.of(Characteristics.UNORDERED), либо Collections.singleton(Characteristics.UNORDERED), но HashSet определенно перебор.