Проблема
Здравствуйте! Я пишу библиотеку протоколирования, и мне очень хотелось бы создать регистратор, который будет работать в отдельном потоке, тогда как все потоки приложений просто отправят ему сообщения. Я хочу найти наиболее эффективное решение этой проблемы. Мне нужна простая очередь без очереди.
Подходы
Я создал несколько тестов, чтобы увидеть, как работают доступные решения, и здесь я получаю очень странные результаты. Я протестировал 4 реализации (исходный код приведен ниже) на основе:
- pipes-concurrency
- Control.Concurrent.Chan
- Control.Concurrent.Chan.Unagi
- MVar, как описано в книге "Параллельное и параллельное программирование в Haskell" . Обратите внимание, что этот метод дает нам ограниченные очереди с пропускной способностью 1 - он используется только для тестов
Испытания
Вот исходный код, используемый для тестирования:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
Pipes.performGC
pipesHandler max = loop 0
where
loop mnum = do
if mnum == max
then lift $ pure ()
else do event <- await
case event of
Msg _ -> loop (mnum + 1)
Status -> (lift $ putStrLn (show mnum)) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------
chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max
----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------
uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max
----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------
mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max
----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------
handlerIO f max = loop 0 where
loop mnum = do
if mnum == max
then pure ()
else do event <- f
case event of
Msg _ -> loop (mnum + 1)
Status -> putStrLn (show mnum) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------
main = defaultMain [
bench "pipes" $ nfIO $ do
(output, input) <- Pipes.spawn Pipes.Unbounded
replicateM_ prodNum (pipesAddProducer msgNum output)
runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
, bench "Chan" $ nfIO $ do
ch <- newChan
replicateM_ prodNum (chanAddProducer msgNum ch)
chanHandler ch totalMsg
, bench "Unagi-Chan" $ nfIO $ do
(inCh, outCh) <- U.newChan
replicateM_ prodNum (uchanAddProducer msgNum inCh)
uchanHandler outCh totalMsg
, bench "MVar" $ nfIO $ do
m <- newEmptyMVar
replicateM_ prodNum (mvarAddProducer msgNum m)
mvarHandler m totalMsg
]
where
prodNum = 20
msgNum = 1000
totalMsg = msgNum * prodNum
Вы можете скомпилировать его с помощью ghc -O2 Main.hs
и просто запустить его.
Тесты создают 20 производителей сообщений, каждый из которых производит 1000000 сообщений.
Результаты
benchmarking pipes
time 46.68 ms (46.19 ms .. 47.31 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 47.59 ms (47.20 ms .. 47.95 ms)
std dev 708.3 μs (558.4 μs .. 906.1 μs)
benchmarking Chan
time 4.252 ms (4.171 ms .. 4.351 ms)
0.995 R² (0.991 R² .. 0.998 R²)
mean 4.233 ms (4.154 ms .. 4.314 ms)
std dev 244.8 μs (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)
benchmarking Unagi-Chan
time 1.209 ms (1.198 ms .. 1.224 ms)
0.996 R² (0.993 R² .. 0.999 R²)
mean 1.267 ms (1.244 ms .. 1.308 ms)
std dev 102.4 μs (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)
benchmarking MVar
time 1.746 ms (1.714 ms .. 1.774 ms)
0.997 R² (0.995 R² .. 0.998 R²)
mean 1.716 ms (1.694 ms .. 1.739 ms)
std dev 73.99 μs (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)
Вопрос
Я хотел бы спросить вас, почему параллельная версия для труб работает так медленно и почему она намного медленнее, чем даже chan. Я очень удивлен, что MVar один из самых быстрых из всех версий - кто-нибудь может рассказать больше, почему мы получаем эти результаты, и можем ли мы сделать лучше в любом случае?