Подтвердить что ты не робот

Стратегии параллельных конвейеров в Java

Рассмотрим следующую оболочку script:

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 

Это три процесса, которые работают параллельно, чтобы распаковать поток, изменить его и повторно сжать. Запуск time Я вижу, что мое пользовательское время примерно в два раза больше моего реального времени, что указывает на то, что программа эффективно работает параллельно.

Я попытался создать ту же самую программу на Java, поставив каждую задачу в ее собственный поток. К сожалению, многопоточная Java-программа только на 30% быстрее, чем однопоточная для вышеуказанного образца. Я пробовал использовать Exchanger и ConcurrentLinkedQueue. Связанная с ConcurrentLinkedQueue очередь вызывает много конфликтов, хотя все три потока, как правило, остаются занятыми. Обменник имеет более низкую конкуренцию, но более сложный, и, похоже, он не удерживает самого медленного рабочего в 100% случаев.

Я пытаюсь разобраться в чистом решении Java для этой проблемы, не глядя на одну из фреймворков с байтовым кодом или на MPI на основе JNI.

Большинство исследований и API concurrency относятся к алгоритмам divide-and-conquer, предоставляя каждой работе node, которая является ортогональной и не зависит от предыдущих вычислений. Другим подходом к concurrency является подход к трубопроводу, где каждый работник выполняет некоторую работу и передает данные следующему работнику.

Я не пытаюсь найти наиболее эффективный способ sed gzip'd файла, но я смотрю, как эффективно разрушать задачи в конвейере, чтобы сократить время выполнения до самого медленного задача.

Текущие тайминги для файла линии 10 м следующие:

Testing via shell

real    0m31.848s
user    0m58.946s
sys     0m1.694s

Testing SerialTest

real    0m59.997s
user    0m59.263s
sys     0m1.121s

Testing ParallelExchangerTest

real    0m41.573s
user    1m3.436s
sys     0m1.830s

Testing ConcurrentQueueTest

real    0m44.626s
user    1m24.231s
sys     0m10.856s

Я предлагаю щедрость для 10% -ного улучшения Java, измеряемого в реальном времени в четырехъядерной системе с 10-метровыми рядами тестовых данных. Текущие источники доступны на Bitbucket.

4b9b3361

Ответ 1

Я индивидуально проверял время, которое, по-видимому, занимает менее 10% времени, а чтение плюс обработка занимает менее 30% времени. Поэтому я взял ParallelExchangerTest (лучший исполнитель в вашем коде) и изменил его на просто есть 2 потока, первый поток выполняет чтение и замену, а второй поток пишет.

Вот цифры, которые можно сравнить (на моей машине Intel двухъядерный (не core2) работает ubuntu с 1 ГБ оперативной памяти)

Тестирование через оболочку

real 0m41.601s

user 0m58.604s

sys 0m1.032s

Тестирование ParallelExchangerTest

real 1m55.424s

пользователь 2m14.160s

sys 0m4.768s

ParallelExchangerTestMod (2 потока)

real 1m35.524s

пользователь 1m55.319s

sys 0m3.580s

Я знал, что обработка строк занимает больше времени, поэтому я заменю line.repalce с matcher.replaceAll, я получил эти цифры

ParallelExchangerTestMod_Regex (2 потока)

real 1m12.781s

пользователь 1m33.382s

sys 0m2.916s

Теперь я сделал шаг вперед, вместо того, чтобы читать по одной строке за раз, я читал char [] буфера различных размеров и приурочен к нему (с поиском/заменой регулярного выражения) Я получил эти цифры

Тестирование ParallelExchangerTestMod_Regex_Buff (обработка по 100 байт за раз)

real 1m13.804s

пользователь 1m32.494s

sys 0m2.676s

Тестирование ParallelExchangerTestMod_Regex_Buff (обработка 500 байт в момент времени)

real 1m6.286s

пользователь 1m29.334s

sys 0m2.324s

Тестирование ParallelExchangerTestMod_Regex_Buff (обработка 800 байт в момент времени)

real 1m12.309s

пользователь 1m33.910s

sys 0m2.476s

Похоже, что 500 байтов является оптимальным для размера данных.

Я разветкил и получил копию моих изменений здесь

https://bitbucket.org/chinmaya/java-concurrent_response/

Ответ 2

Во-первых, процесс будет только быстрее, чем самая медленная часть. Если временной интервал:

  • gunzip: 1 секунда
  • sed: 5 секунд
  • gzip: 1 секунда

выбрав многопоточность, вы сделаете это в лучшем случае за 5 секунд вместо 7.

Во-вторых, вместо использования очередей, которые вы используете, вместо этого попробуйте воспроизвести функциональность того, что вы копируете, и используйте PipedInputStream и PipedOutputStream для объединения процессов.

Изменить: существует несколько способов обработки связанных задач с помощью Java concurrency utils. Разделите его на потоки. Сначала создайте общий базовый класс:

public interface Worker {
  public run(InputStream in, OutputStream out);
}

То, что делает этот интерфейс, представляет собой произвольное задание, которое обрабатывает ввод и генерирует вывод. Цепляйте их вместе, и у вас есть трубопровод. Вы также можете абстрагироваться от шаблона. Для этого нам нужен класс:

public class UnitOfWork implements Runnable {
  private final InputStream in;
  private final OutputStream out;
  private final Worker worker;

  public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
    if (in == null) {
      throw new NullPointerException("in is null");
    }
    if (out == null) {
      throw new NullPointerException("out is null");
    }
    if (worker == null) {
      throw new NullPointerException("worker is null");
    }
    this.in = in;
    this.out = out;
    this.worker = worker;
  }

  public final void run() {
    worker.run(in, out);
  }
}

Итак, например, Unzip ЧАСТЬ:

public class Unzip implements Worker {
  protected void run(InputStream in, OutputStream out) {
    ...
  }
}

и т.д. для Sed и Zip. То, что затем связывает его, следующее:

public static void pipe(InputStream in, OutputStream out, Worker... workers) {
  if (workers.length == 0) {
    throw new IllegalArgumentException("no workers");
  }
  OutputStream last = null;
  List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
  PipedOutputStream last = null;
  for (int i=0; i<workers.length-2; i++) {
    PipedOutputStream out = new PipedOutputStream();
    work.add(new UnitOfWork(
      last == null ? in, new PipedInputStream(last), out, workers[i]);
    last = out;
  }
  work.add(new UnitOfWork(new PipedInputStream(last),
    out, workers[workers.length-1);
  ExecutorService exec = Executors.newFixedThreadPool(work.size());
  for (UnitOfWork w : work) {
    exec.submit(w);
  }
  exec.shutdown();
  try {
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  } catch (InterruptedExxception e) {
    // do whatever
  }
}

Я не уверен, что вы можете сделать намного лучше, чем это, и для каждого задания есть минимальный код для записи. Затем ваш код будет выглядеть следующим образом:

public static processFile(String inputName, String outputName) {
  pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
    new Zip(), new Sed(), new Unzip());
}

Ответ 3

Вы также можете использовать каналы на Java. Они реализованы как потоки, см. PipedInputStream и PipedOutputStream для более подробной информации.

Чтобы предотвратить блокировку, я бы рекомендовал установить размер трубы для пробки.

Ответ 4

Учитывая, что вы не говорите, как вы измеряете прошедшее время, я предполагаю, что вы используете что-то вроде:

time java org.egge.concurrent.SerialTest < in.gz > out.gz
time java org.egge.concurrent.ConcurrentQueueTest < in.gz > out.gz

Проблема в том, что вы здесь измеряете две вещи:

  • Как долго запускается JVM, и
  • Как долго программа запускается.

Вы можете изменить только второй код с изменениями кода. Используя цифры, которые вы указали:

Testing SerialTest
real    0m6.736s
user    0m6.924s
sys     0m0.245s

Testing ParallelExchangerTest
real    0m4.967s
user    0m7.491s
sys     0m0.850s

Если мы предположим, что запуск JVM занимает три секунды, тогда "время выполнения программы" составляет 3,7 и 1,9 секунды соответственно, это почти 100% ускорение. Я настоятельно рекомендую вам использовать более крупный набор данных для тестирования, чтобы можно было минимизировать влияние запуска JVM на результаты синхронизации.

Изменить: основываясь на ваших ответах на этот вопрос, возможно, вы страдаете от блокировки. Лучший способ разрешить это в java - это, вероятно, использовать читателей и писателей, читаемых из труб, байтов за раз и заменять любые символы '@' во входном потоке с помощью "_at_" в выходном потоке. Возможно, вы страдаете от того, что каждая строка сканируется три раза, и для любой замены требуется создание нового объекта, и строка заканчивается тем, что она снова копируется. Надеюсь, это поможет...

Ответ 5

Уменьшение количества чтений и объектов дает более 10% лучшую производительность.

Но производительность java.util.concurrent все еще немного разочаровывает.

ConcurrentQueueTest:

private static class Reader implements Runnable {

@Override
  public void run() {
   final char buf[] = new char[8192];
   try {

    int len;
    while ((len = reader.read(buf)) != -1) {
     pipe.put(new String(buf,0,len));
    }
    pipe.put(POISON);

   } catch (IOException e) {
    throw new RuntimeException(e);
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }