Подтвердить что ты не робот

Быстрая параллельная очередь Haskell

Проблема

Здравствуйте! Я пишу библиотеку протоколирования, и мне очень хотелось бы создать регистратор, который будет работать в отдельном потоке, тогда как все потоки приложений просто отправят ему сообщения. Я хочу найти наиболее эффективное решение этой проблемы. Мне нужна простая очередь без очереди.

Подходы

Я создал несколько тестов, чтобы увидеть, как работают доступные решения, и здесь я получаю очень странные результаты. Я протестировал 4 реализации (исходный код приведен ниже) на основе:

Испытания

Вот исходный код, используемый для тестирования:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

Вы можете скомпилировать его с помощью ghc -O2 Main.hs и просто запустить его. Тесты создают 20 производителей сообщений, каждый из которых производит 1000000 сообщений.

Результаты

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

Вопрос

Я хотел бы спросить вас, почему параллельная версия для труб работает так медленно и почему она намного медленнее, чем даже chan. Я очень удивлен, что MVar один из самых быстрых из всех версий - кто-нибудь может рассказать больше, почему мы получаем эти результаты, и можем ли мы сделать лучше в любом случае?

4b9b3361

Ответ 1

Поэтому я могу немного рассказать об одном из анализов Chan и TQueue (который pipes-concurrency используется внутри здесь), что мотивировало некоторые дизайнерские решения, которые вошли в unagi-chan. Я не уверен, ответит ли он на ваш вопрос. Я рекомендую разворачивать разные очереди и играть с вариациями во время бенчмаркинга, чтобы получить реальное представление о том, что происходит.

Chan

Chan выглядит следующим образом:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

Это связанный список MVar s. Два MVar в типе Chan действуют как указатели на текущую головку и хвост списка соответственно. Вот как выглядит запись:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

В 1 автор берет блокировку на конец записи, по 2 наш элемент a предоставляется читателю, а в 3 конец записи разблокируется для других авторов.

Это действительно хорошо работает в сценарии с одним потребителем/с одним продюсером (см. график здесь), потому что чтения и записи не бороться. Но как только у вас будет несколько одновременных авторов, вы можете начать с проблем:

  • писатель, который нажимает 1, когда другой писатель находится в 2, блокирует и будет назначаться (самый быстрый, который я смог измерить контекстным переключателем, составляет ~ 150 нс (довольно быстро), возможно, ситуации, когда это намного медленнее). Поэтому, когда вы получаете много писателей, утверждающих вы в основном делаете большой раунд через планировщик, в очередь ожидания для MVar, а затем, наконец, запись может завершиться.

  • Когда писатель получает запланированный (по истечении времени ожидания), а в 2, он удерживается на блокировке, и никакие записи не могут быть завершены до тех пор, пока они не будут повторно перенесены; это становится больше проблемой, когда мы переубеждены, то есть когда соотношение потоков/ядер высокое.

Наконец, использование MVar -per-item требует некоторых накладных расходов с точки зрения распределения, и что более важно, когда мы накапливаем много изменяемых объектов, мы можем вызвать много давления GC.

TQUEUE

TQueue отлично, потому что STM делает его очень простым, чтобы рассуждать о его правильности. Это функциональная очередь в стиле очереди, а write состоит в простом чтении стека писателей, в том, что касается нашего элемента и записи его:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

Если после того, как writeTQueue напишет свой новый стек обратно, другая чередующаяся запись сделает то же самое, одна из записей будет повторена. Чем больше writeTQueue чередуется, тем усиливается эффект раздора. Однако производительность ухудшается гораздо медленнее, чем в Chan, потому что существует только одна операция writeTVar, которая может лишить конкурирующих writeTQueue s, а транзакция очень мала (просто чтение и a (:)).

Чтение работает путем "декутации" стека со стороны записи, его реверсирования и сохранения перевернутого стека в своей переменной для легкого "выскакивания" (в целом это дает нам амортизацию O (1) push и pop)

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

У читателей есть симметричная умеренная проблема, связанная с писателями. В общем случае читатели и писатели не спорят, однако, когда стек читателя исчерпан, читатели борются с другими читателями и писателями. Я подозреваю, что если вы предварительно загрузили TQueue с достаточными значениями, а затем запустили 4 читателя и 4 автора, вы могли бы вызвать оживление, поскольку реверс должен был завершиться до следующей записи. Интересно также отметить, что в отличие от MVar запись в TVar, на которую ожидают многие читатели, разбуждает их все одновременно (это может быть более или менее эффективно, в зависимости от сценария).

Я подозреваю, что вы не видите значительных недостатков TQueue в своем тесте; прежде всего, вы видите умеренные эффекты конкуренции за запись и накладные расходы на много выделения и GC'ing много изменяемых объектов.

унаги-чан

unagi-chan был разработан, во-первых, для корректной обработки конфликта. Это концептуально очень просто, но реализация имеет некоторые сложности

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

Для чтения и записи сторон очереди Stream, на которой они координируют передаваемые значения (от писателя к считывателю) и указания на блокировку (от читателя к записи), а стороны чтения и записи имеют независимый атомный счетчик. Запись работает как:

  • писатель называет атомный incrCounter на счетчике записи, чтобы получить свой уникальный индекс, на котором координировать его (одно) считыватель

  • писатель находит свою ячейку и выполняет CAS Written a

  • если он успешно завершен, иначе он видит, что читатель побил его и блокирует (или переходит к блокировке), поэтому он выполняет (\Blocking v)-> putMVar v a) и выходит.

Чтение работает аналогичным и очевидным способом.

Первое нововведение состоит в том, чтобы превратить точку соприкосновения в атомарную операцию, которая не деградирует под конфликтом (как цикл CAS/retry или Chan-like lock). На основе простого бенчмаркинга и экспериментов лучше всего подходит примап-выборка и добавление, представленный библиотекой atomic-primops.

Затем в 2 и читателю, и писателю необходимо выполнить только один сравнительный обмен и обмен (быстрый путь для чтения - простое неатомное чтение) для завершения координации.

Чтобы попытаться сделать unagi-chan хорошим, мы

  • использовать fetch-and-add для обработки точки раздора

  • используйте методы безблокировки, так что, когда мы перенаправляем поток, который выполняется в неподходящее время, не блокирует прогресс для других потоков (заблокированный писатель может заблокировать не более, чем читатель, "назначенный" ему счетчиком; прочитайте предупреждения async исключений в unagi-chan docs и обратите внимание, что Chan имеет более приятную семантику здесь)

  • используйте массив для хранения наших элементов, который имеет лучшую локальность (но ниже) ниже накладных расходов на элемент и оказывает меньшее давление на GC

Последняя заметка re. использование массива: одновременная запись в массив, как правило, является плохой идеей для масштабирования, потому что вы вызываете много трафика кеш-когерентности, поскольку кэш-линии недействительны в ваших потоках-писателях. Общий термин - "ложное разделение". Но есть и проблемы, связанные с кешем и минусами, к альтернативным проектам, которые я могу думать о том, что это будет полосатые записи или что-то еще; Я экспериментировал с этим немного, но на данный момент у меня нет ничего убедительного.

Одно место, где мы имеем законное отношение к ложному совместному использованию, находится в нашем счетчике, который мы выравниваем и помещаем в 64 байта; это действительно проявилось в тестах, и единственным недостатком является увеличение использования памяти.

Ответ 2

Если бы мне пришлось угадать, почему pipes-concurrency работает хуже, потому что каждое чтение и запись завершается транзакцией STM, тогда как в других библиотеках используются более эффективные примитивы низкого уровня concurrency.