Подтвердить что ты не робот

Java-потоковая передача объектов коллекции из одного потока в другой

У меня есть приложение Java, которое имеет рабочие потоки для обработки заданий. Рабочий создает объект результата, например:

class WorkerResult{
    private final Set<ResultItems> items;
    public Worker(Set<ResultItems> pItems){
         items = pItems;
    }
}

Когда работник заканчивает работу, он выполняет эту операцию:

 ...
 final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>();
 for(Item producedItem : ...){
      items.add(item);
 }
 passToGatherThread(items);

Набор items здесь является "единицей работы". Метод passToGatherThread передает набор items в поток сбора, из которого только один существует во время выполнения.

Синхронизация здесь не требуется, поскольку условия гонки не могут возникнуть, потому что только один поток (Gather-thread) читает набор items. AFAICS, Gather-thread может не видеть все элементы, потому что набор не является потокобезопасным, правильно?

Предположим, я не могу синхронизировать passToGatherThread, скажем, потому что это сторонняя библиотека. Я в основном опасаюсь, что поток сборок не видит всех элементов из-за кеширования, оптимизации виртуальной машины и т.д. Итак, возникает вопрос: как передать элементы, заданные поточно-безопасным способом, так что поток Gather "видит", правильный набор элементов?

4b9b3361

Ответ 1

Я много думал о (и обсуждал) этот вопрос, и я придумал еще один ответ, который, я надеюсь, будет лучшим решением.

Передача синхронизированной коллекции не является хорошей с точки зрения эффективности, поскольку каждая последующая операция в этой коллекции будет синхронизирована - если есть много операций, это может оказаться недостатком.

К моменту: допустим некоторые предположения (с которыми я не согласен):

  • указанный метод passToGatherThread действительно небезопасен, каким бы невероятным он ни казался
  • компилятор может изменить порядок событий в коде, чтобы вызвать passToGatherThread до заполнения коллекции.

Самый простой, самый чистый и, возможно, самый эффективный способ гарантировать, что коллекция, переданная методу сборщика, готова и завершена, заключается в том, чтобы поместить коллекцию в синхронизированный блок, например:

synchronized(items) {
  passToGatherThread(items);
}

Таким образом мы обеспечиваем синхронизацию памяти и действительную последовательность событий перед передачей коллекции, таким образом, чтобы все объекты были переданы правильно.

Ответ 2

Здесь, похоже, нет проблемы с синхронизацией. Вы создаете новый объект Set для каждого passToGatherThread и выполняете его после изменения набора. Объекты не будут потеряны.

Множество (и большинство коллекций Java) можно получить одновременно несколькими потоками при условии, что никакая модификация коллекции не будет выполнена. Для чего Collections.unmodifiableCollection.

Поскольку упомянутый метод passToGatherThread служит для связи с другим потоком, он должен использовать какую-то синхронизацию - и каждая синхронизация обеспечивает согласованность между потоками.

Также - обратите внимание, что все записи в объекты в переданной коллекции выполняются до того, как они передаются в другой поток. Даже если память скопирована в локальный кэш потока, она имеет такое же неизмененное значение, что и в другом потоке.

Ответ 3

Вы можете просто использовать одну из поточно-безопасных реализаций Set, которую Java предоставляет для вашего WorkerResult. См. Например:

Другой вариант - использовать Collections.synchronizedSet().

Ответ 4

Рабочий реализует вызываемые и возвращает WorkerResult:

class Worker implements Callable<WorkerResult> {
    private WorkerInput in;

    public Worker(WorkerInput in) {
        this.in = in;
    }

    public WorkerResult call() {
        // do work here
    }
}

Затем мы используем ExecutorService для управления пулом потоков и собираем результаты с помощью Future.

public class PooledWorkerController {

    private static final int MAX_THREAD_POOL = 3;
    private final ExecutorService pool = 
       Executors.newFixedThreadPool(MAX_THREAD_POOL);

    public Set<ResultItems> process(List<WorkerInput> inputs) 
           throws InterruptedException, ExecutionException{         
        List<Future<WorkerResult>> submitted = new ArrayList<>();
        for (WorkerInput in : inputs) {
            Future<WorkerResult> future = pool.submit(new Worker(in));
            submitted.add(future);
        }
        Set<ResultItems> results = new HashSet<>();
        for (Future<WorkerResult> future : submitted) {
            results.addAll(future.get().getItems());
        }
        return results;
    }
}