Подтвердить что ты не робот

Трубы и обратные вызовы в Haskell

Я обрабатываю некоторый звук, используя portaudio. Связывание FK haskell вызывает определенный пользователем обратный вызов всякий раз, когда обрабатываются аудиоданные. Этот обратный вызов следует обрабатывать очень быстро и идеально, без ввода-вывода. Я хотел сохранить аудиовход и быстро вернуться, так как моему приложению не нужно реагировать на звук в реальном времени (прямо сейчас я просто сохраняю аудиоданные в файл, позже я построю простую систему распознавания речи).

Мне нравится идея pipes и я думал, что могу использовать эту библиотеку. Проблема в том, что я не знаю, как создать Producer, который возвращает данные, которые вошли через обратный вызов.

Как мне обрабатывать мой прецедент?


Вот то, с чем я работаю прямо сейчас, в случае, если это помогает (база данных mvar не работает прямо сейчас, но мне не нравится хранить все данные в seq... Я бы скорее обработал ее как он пришел, а не только в конце):

{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses #-}

module Main where

import Codec.Wav

import Sound.PortAudio
import Sound.PortAudio.Base
import Sound.PortAudio.Buffer

import Foreign.Ptr
import Foreign.ForeignPtr
import Foreign.C.Types
import Foreign.Storable

import qualified Data.StorableVector as SV
import qualified Data.StorableVector.Base as SVB

import Control.Exception.Base (evaluate)

import Data.Int
import Data.Sequence as Seq

import Control.Concurrent

instance Buffer SV.Vector a where
  fromForeignPtr fp = return . SVB.fromForeignPtr fp
  toForeignPtr = return . (\(a, b, c) -> (a, c)) . SVB.toForeignPtr

-- | Wrap a buffer callback into the generic stream callback type.
buffCBtoRawCB' :: (StreamFormat input, StreamFormat output, Buffer a input, Buffer b output) =>
    BuffStreamCallback input output a b -> StreamCallback input output    
buffCBtoRawCB' func = \a b c d e -> do
    fpA <- newForeignPtr_ d -- We will not free, as callback system will do that for us   
    fpB <- newForeignPtr_ e -- We will not free, as callback system will do that for us
    storeInp <- fromForeignPtr fpA (fromIntegral $ 1 * c)
    storeOut <- fromForeignPtr fpB (fromIntegral $ 0 * c)
    func a b c storeInp storeOut

callback :: MVar (Seq.Seq [Int32]) -> PaStreamCallbackTimeInfo -> [StreamCallbackFlag] -> CULong 
            -> SV.Vector Int32 -> SV.Vector Int32 -> IO StreamResult
callback seqmvar = \timeinfo flags numsamples input output -> do
  putStrLn $ "timeinfo: " ++ show timeinfo ++ "; flags are " ++ show flags ++ " in callback with " ++ show numsamples ++ " samples."  
  print input
  -- write data to output
  --mapM_ (uncurry $ pokeElemOff output) $ zip (map fromIntegral [0..(numsamples-1)]) datum
  --print "wrote data"

  input' <- evaluate $ SV.unpack input  
  modifyMVar_ seqmvar (\s -> return $ s Seq.|> input')

  case flags of
    [] -> return $ if unPaTime (outputBufferDacTime timeinfo) > 0.2 then Complete else Continue
    _ -> return Complete

done doneMVar = do
  putStrLn "total done dood!"
  putMVar doneMVar True
  return ()

main = do

  let samplerate = 16000

  Nothing <- initialize

  print "initialized"

  m <- newEmptyMVar
  datum <- newMVar Seq.empty

  Right s <- openDefaultStream 1 0 samplerate Nothing (Just $ buffCBtoRawCB' (callback datum)) (Just $ done m)
  startStream s

  _ <- takeMVar m -- wait until our callbacks decide they are done!
  Nothing <- terminate

  print "let see what we've recorded..."

  stuff <- takeMVar datum
  print stuff

  -- write out wav file

  -- let datum = 
  --       audio = Audio { sampleRate = samplerate
  --                   , channelNumber = 1
  --                   , sampleData = datum
  --                   }
  -- exportFile "foo.wav" audio

  print "main done"
4b9b3361

Ответ 1

Самое простое решение - использовать MVar для связи между обратным вызовом и Producer. Вот как:

import Control.Proxy
import Control.Concurrent.MVar

fromMVar :: (Proxy p) => MVar (Maybe a) -> () -> Producer p a IO ()
fromMVar mvar () = runIdentityP loop where
    loop = do
        ma <- lift $ takeMVar mvar
        case ma of
            Nothing -> return ()
            Just a  -> do
                respond a
                loop

Обратный вызов вашего потока будет записывать Just input в MVar, и ваш обратный вызов завершения будет писать Nothing для завершения Producer.

Вот пример ghci, демонстрирующий, как он работает:

>>> mvar <- newEmptyMVar :: IO (MVar (Maybe Int))
>>> forkIO $ runProxy $ fromMVar mvar >-> printD
>>> putMVar mvar (Just 1)
1
>>> putMVar mvar (Just 2)
2
>>> putMVar mvar Nothing
>>> putMVar mvar (Just 3)
>>>

Изменить: Теперь библиотека pipes-concurrency теперь предоставляет эту функцию, и она даже имеет в учебнике, в котором объясняется, как использовать его для получения данных из обратных вызовов.