Есть ли способ вырваться из цикла foreach? - программирование
Подтвердить что ты не робот

Есть ли способ вырваться из цикла foreach?

Я использую пакет R foreach() с %dopar% для выполнения длинных (~ дней) вычислений параллельно. Мне хотелось бы остановить весь набор вычислений в случае, если одна из них вызывает ошибку. Однако я не нашел способ добиться этого, и из документации и различных форумов я не нашел никаких указаний на то, что это возможно. В частности, break() не работает, а stop() останавливает текущий расчет, а не весь цикл foreach.

Обратите внимание, что я не могу использовать простой цикл for, потому что в конечном итоге я хочу распараллелить это с помощью пакета doRNG.

Вот упрощенная, воспроизводимая версия того, что я пытаюсь (показано здесь в серийном номере с %do%, но у меня такая же проблема при использовании doRNG и %dopar%). Обратите внимание, что на самом деле я хочу параллельно использовать все элементы этого цикла (здесь 10).

library(foreach)
myfunc <- function() {
  x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
    cat("Element ", k, "\n")
    Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
    if(is.element(k, 2:6)) {
      cat("Should stop\n")
      stop("Has stopped")
    }
    k
  }
  return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"

То, что я хотел бы достичь, состоит в том, что весь цикл foreach может быть немедленно удален при некотором условии (здесь, когда встречается stop()).

Я не нашел способа добиться этого с помощью foreach. Кажется, мне нужен способ отправить сообщение всем остальным процессам, чтобы они тоже перестали.

Если это невозможно с foreach, знает ли кто-нибудь об альтернативах? Я также пытался достичь этого с помощью parallel::mclapply, но это тоже не работает.

> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)

locale:
[1] C/UTF-8/C/C/C/C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods base

other attached packages:
[1] foreach_1.4.0

loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0  iterators_1.0.6
4b9b3361

Ответ 1

Похоже, вам нужна нетерпеливая версия обработки ошибок "stop". Это можно реализовать, написав пользовательскую функцию комбинирования и упорядочив foreach, чтобы вызвать ее, как только будет возвращен каждый результат. Для этого вам необходимо:

  • Используйте бэкэнд, который поддерживает вызов combine на лету, например doMPI или doRedis
  • Не включать .multicombine
  • Установите .inorder в FALSE
  • Задайте .init чему-то (например, NULL)

Вот пример, который делает это:

library(foreach)
parfun <- function(errval, n) {
  abortable <- function(errfun) {
    comb <- function(x, y) {
      if (inherits(y, 'error')) {
        warning('This will leave your parallel backend in an inconsistent state')
        errfun(y)
      }
      c(x, y)
    }
    foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
            .combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
      if (i == errval)
        stop('testing abort')
      Sys.sleep(10)
      i
    }
  }
  callCC(abortable)
}

Обратите внимание, что я также установил обработку ошибок "pass", поэтому foreach вызовет функцию объединения с объектом ошибки. Функция callCC используется для возврата из цикла foreach независимо от обработки ошибок, используемой в foreach и бэкэнд. В этом случае callCC вызовет функцию abortable, передав ей объект функции, который используется force callCC, чтобы немедленно вернуться. Вызывая эту функцию из функции объединения, мы можем выйти из цикла foreach при обнаружении объекта ошибки и вернуть callCC этот объект. Подробнее см. ?callCC.

Фактически вы можете использовать parfun без регистрации параллельного бэкэнд и убедиться, что цикл foreach "разрывается", как только он выполняет задачу, которая выдает ошибку, но это может занять некоторое время, поскольку задачи выполняются последовательно. Например, это занимает 20 секунд для выполнения, если бэкэнд не зарегистрирован:

print(system.time(parfun(3, 4)))

При параллельном выполнении parfun нам нужно сделать больше, чем просто вырваться из цикла foreach: нам также необходимо остановить рабочих, иначе они будут продолжать вычислять свои назначенные задачи. С помощью doMPI рабочих можно остановить с помощью mpi.abort:

library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) {
  cat(sprintf('Caught error: %s\n', conditionMessage(r)))
  mpi.abort(cl$comm)
}

Обратите внимание, что объект кластера не может использоваться после того, как цикл прерывается, потому что вещи не были должным образом очищены, поэтому нормальная обработка ошибок "stop" не работает таким образом.

Ответ 2

Это не прямой ответ на ваш вопрос, но с помощью when() вы можете избежать ввода цикла, если условие выполнено:

x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:%
  when( !is.element(k, 2:6) ) %do%
  {
    cat("Element ", k, "\n")
    Sys.sleep(0.5)
    k
  }

EDIT:

Я что-то забыл: я думаю, что по дизайну вы не можете просто остановить цикл foreach. Если вы запускаете цикл параллельно, каждый поворот обрабатывается независимо, что означает, что когда вы останавливаете весь цикл для k=2, он не предсказуем, если процесс для k=1 завершен или все еще запущен. Следовательно, использование условия when() дает вам детерминированный результат.

РЕДАКТИРОВАТЬ 2: Другое решение с учетом вашего комментария.

shouldStop <- FALSE
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do%
  {
    if( !shouldStop ){
      # put your time consuming code here
      cat("Element ", k, "\n")
      Sys.sleep(0.5)
      shouldStop <- shouldStop ||  is.element(k, 2:6)
      k
    }
  }

Используя это решение, процессы, которые работают в то время, когда условие останова становится истинным, все еще вычисляются до конца, но вы избегаете потребления времени для всех предстоящих процессов.

Ответ 3

Ответ, который я получил от технической поддержки REVolution: "no-foreach в настоящее время не имеет возможности остановить все параллельные вычисления при ошибке для любого".

Ответ 4

Мне не очень нравится получать foreach, чтобы делать то, что я хочу, так что вот решение, использующее пакет parallel, который, кажется, делает то, что я хочу. Я использую параметр intermediate в mcparallel() для передачи результатов из моей функции, do.task(), непосредственно в функцию check.res(). Если do.task() выдает ошибку, то это используется в check.res(), чтобы вызвать вызов tools::pskill, чтобы явно убить всех работников. Это может быть не очень элегантно, но оно работает в том смысле, что оно мгновенно останавливает работу. Кроме того, я могу просто наследовать все переменные, которые мне нужны для обработки в do.task() из текущей среды. (В действительности do.task() - гораздо более сложная функция, требующая передачи многих переменных.)

library(parallel)

# do.task() and check.res() inherit some variables from enclosing environment

do.task <- function(x) {
  cat("Starting task", x, "\n")
  Sys.sleep(5*x)
  if(x==stopat) { 
    stop("Error in job", x) # thrown to mccollect() which sends it to check.res()
  }
  cat("  Completed task", x, "\n")
  return(10*x)
}

check.res <- function(r) { # r is list of results so far
  cat("Called check.res\n")
  sendKill <- FALSE
  for(j in 1:Njob) { # check whether need to kill
    if(inherits(r[[j]], 'try-error')) {
      sendKill <- TRUE
    }
  }
  if(sendKill) { # then kill all
    for(j in 1:Njob) {
      cat("Killing job", job[[j]]$pid, "\n") 
      tools::pskill(job[[j]]$pid) # mckill not accessible
    }
  }
}

Tstart <- Sys.time()
stopat <- 3
Njob <- 4
job <- vector("list", length=Njob)
for(j in 1:Njob) {
  job[[j]]<- mcparallel(do.task(j))
}
res <- mccollect(job, intermediate=check.res) # res is in order 1:Njob, regardless of how long jobs took
cat("Collected\n")
Tstop <- Sys.time()
print(difftime(Tstop,Tstart))
for(j in 1:Njob) {
  if(inherits(res[[j]], 'try-error')) {
    stop("Parallel part encountered an error")
  }
}

Это дает следующий экранный дамп и результаты для переменной res

> source("exp5.R")
Starting task 1 
Starting task 2 
Starting task 3 
Starting task 4 
  Completed task 1 
Called check.res
Called check.res
  Completed task 2 
Called check.res
Called check.res
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Collected
Time difference of 15.03558 secs
Error in eval(expr, envir, enclos) : Parallel part encountered an error
> res
$`21423`
[1] 10

$`21424`
[1] 20

$`21425`
[1] "Error in do.task(j) : Error in job3\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in do.task(j): Error in job3>

$`21426`
NULL

Ответ 5

Стив Уэстон оригинальный ответ, по сути, ответил на это. Но вот немного измененная версия его ответа, которая также сохраняет две дополнительные функции в том, как они мне нужны: (1) генерация случайных чисел; (2) диагностика времени выполнения печати. ​​

suppressMessages(library(doMPI))

comb <- function(x, y) {
  if(inherits(y, 'error')) {
    stop(y)
  }
  rbind(x, y) # forces the row names to be 'y'
}

myfunc <- function() {
  writeLines(text="foreach log", con="log.txt")
  foreach(i=1:12, .errorhandling='pass', .combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
    set.seed(100)
    sink("log.txt", append=TRUE)
    if(i==6) {
      stop('testing abort')
    }
    Sys.sleep(10)
    cat("Completed task", i, "\n")
    sink(NULL)
    rnorm(5,mean=i)
  }
}

myerr <- function(e) {
  cat(sprintf('Caught error: %s\n', conditionMessage(e)))
  mpi.abort(cl$comm)
}

cl <- startMPIcluster(4)
registerDoMPI(cl)
r <- tryCatch(myfunc(), error=myerr)
closeCluster(cl)

Когда этот файл будет создан, он завершает работу с сообщением об ошибке

> source("exp2.R")
    4 slaves are spawned successfully. 0 failed.
Caught error: testing abort
[ganges.local:16325] MPI_ABORT invoked on rank 0 in communicator  with errorcode 0

Файлы 'log.txt' обеспечивают правильную диагностику до точки ошибки, а затем предоставляют дополнительную информацию об ошибке. Реально, выполнение всех задач останавливается, как только stop() в цикле foreach встречается: он не дожидается завершения цикла foreach. Таким образом, я вижу сообщение "Completed task" до я = 4. (Обратите внимание, что если Sys.sleep() короче, более поздние задачи могут быть запущены до обработки mpi.abort().

Если я изменил условие останова на "i == 100", то остановка и, следовательно, ошибка не срабатывает. Код успешно существует без сообщения об ошибке, а r представляет собой 2D-массив с размерами 12 * 5.

Кстати, кажется, что мне действительно не нужно .inorder = FALSE (я думаю, что это просто дает мне небольшое увеличение скорости в случае обнаружения ошибки).