Подтвердить что ты не робот

Неправильный код встречи при сортировке параллельного потока

У меня есть класс Record:

public class Record implements Comparable<Record>
{
   private String myCategory1;
   private int    myCategory2;
   private String myCategory3;
   private String myCategory4;
   private int    myValue1;
   private double myValue2;

   public Record(String category1, int category2, String category3, String category4,
      int value1, double value2)
   {
      myCategory1 = category1;
      myCategory2 = category2;
      myCategory3 = category3;
      myCategory4 = category4;
      myValue1 = value1;
      myValue2 = value2;
   }

   // Getters here
}

Я создаю большой список множества записей. Только второе и пятое значения, i / 10000 и i, используются позже, геттерами getCategory2() и getValue1() соответственно.

List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
    list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}

Обратите внимание, что первые 10 000 записей имеют category2 из 0, затем следующие 10000 имеют 1 и т.д., тогда как значения value1 равны 0-114999 последовательно.

Я создаю Stream, который является как parallel, так и sorted.

Stream<Record> stream = list.stream()
   .parallel()
   .sorted(
       //(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
   )
   //.parallel()
;

У меня есть ForkJoinPool, который поддерживает 8 потоки, которые являются количеством ядер, которые у меня есть на моем ПК.

ForkJoinPool pool = new ForkJoinPool(8);

Я использую трюк , описанный здесь, чтобы отправить задачу обработки потока на мой собственный ForkJoinPool вместо обычного ForkJoinPool.

List<Record> output = pool.submit(() ->
    stream.collect(Collectors.toList()
)).get();

Я ожидал, что параллельная операция sorted будет уважать порядок вызова потока и что он будет устойчивым, потому что Spliterator, возвращаемый ArrayList, равен ORDERED.

Однако простой код, который выводит элементы результирующего List output в порядке, показывает, что это не совсем так.

for (Record record : output)
{
     System.out.println(record.getValue1());
}

Выход, сгущенный:

0
1
2
3
...
69996
69997
69998
69999
71875  // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000  // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062  // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999

size() of output - это 115000, и все элементы, как представляется, находятся там, в немного другом порядке.

Итак, я написал некоторый код проверки, чтобы убедиться, что sort был стабильным. Если он стабильный, то все значения value1 должны оставаться в порядке. Этот код проверяет порядок, печатает любые расхождения.

int prev = -1;
boolean verified = true;
for (Record record : output)
{
    int curr = record.getValue1();
    if (prev != -1)
    {
        if (prev + 1 != curr)
        {
            System.out.println("Warning: " + prev + " followed by " + curr + "!");
            verified = false;
        }
    }
    prev = curr;
}
System.out.println("Verified: " + verified);

Вывод:

Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false

Это условие сохраняется, если я выполняю одно из следующих действий:

  • Замените ForkJoinPool на ThreadPoolExecutor.

    ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
  • Используйте общий ForkJoinPool, обработав Stream напрямую.

    List<Record> output = stream.collect(Collectors.toList());
    
  • Вызов parallel() после вызова sorted.

    Stream<Record> stream = list.stream().sorted().parallel();
    
  • Вызовите parallelStream() вместо stream().parallel().

    Stream<Record> stream = list.parallelStream().sorted();
    
  • Сортировать по Comparator. Обратите внимание, что этот критерий сортировки отличается от того, что "естественный" порядок, который я определил для интерфейса Comparable, хотя начиная с результатов уже по порядку с самого начала, результат должен быть тем же самым.

    Stream<Record> stream = list.stream().parallel().sorted(
        (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
    );
    

Я могу получить это только для сохранения порядка встреч, если я не сделаю одно из следующих действий на Stream:

  • Не вызывайте parallel().
  • Не перегружайте sorted.

Интересно, что parallel() без сортировки сохранил порядок.

В обоих случаях выше выход:

Verified: true

Моя версия Java - 1.8.0_05. Эта аномалия также возникает в Ideone, которая, как представляется, работает на Java 8u25.

Обновление

Я обновил свой JDK до последней версии с этой записью, 1.8.0_45, и проблема не изменилась.

Вопрос

Является ли порядок записи в результирующем List (output) неуправляемым, потому что сортировка как-то нестабильна, потому что порядок встречи не сохраняется или какая-то другая причина?

Как я могу гарантировать, что порядок встречи сохраняется, когда я создаю параллельный поток и сортирую его?

4b9b3361

Ответ 1

Похоже, что Arrays.parallelSort нестабилен в некоторых обстоятельствах. Хорошо подмечено. Параллельная сортировка потока реализована в терминах Arrays.parallelSort, поэтому она также влияет на потоки. Здесь упрощенный пример:

public class StableSortBug {
    static final int SIZE = 50_000;

    static class Record implements Comparable<Record> {
        final int sortVal;
        final int seqNum;

        Record(int i1, int i2) { sortVal = i1; seqNum = i2; }

        @Override
        public int compareTo(Record other) {
            return Integer.compare(this.sortVal, other.sortVal);
        }
    }

    static Record[] genArray() {
        Record[] array = new Record[SIZE];
        Arrays.setAll(array, i -> new Record(i / 10_000, i));
        return array;
    }

    static boolean verify(Record[] array) {
        return IntStream.range(1, array.length)
                        .allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);
    }

    public static void main(String[] args) {
        Record[] array = genArray();
        System.out.println(verify(array));
        Arrays.sort(array);
        System.out.println(verify(array));
        Arrays.parallelSort(array);
        System.out.println(verify(array));
    }
}

На моей машине (2 ядра x 2 потока) это печатает следующее:

true
true
false

Конечно, он должен печатать true три раза. Это относится к текущей версии JDK 9 dev. Я не удивлюсь, если это произойдет во всех выпусках JDK 8 до сих пор, учитывая то, что вы пробовали. Любопытно, что уменьшение размера или делителя изменит поведение. Размер 20 000 и делитель 10 000 стабилен, а размер 50 000 и делитель 1000 также стабильны. Похоже, что проблема связана с достаточно большим количеством значений, сравнивающихся по сравнению с параллельным размером разделения.

Проблема с OpenJDK JDK-8076446 охватывает эту ошибку.