Подтвердить что ты не робот

Запуск параллельных загрузок URL-адресов в Haskell

Ниже приведен код Haskell, который (HTTP) загружает файлы, отсутствующие в данном каталоге:

module Main where

import Control.Monad ( filterM
                     , liftM
                     )
import Data.Maybe ( fromJust )
import Network.HTTP ( RequestMethod(GET)
                    , rspBody
                    , simpleHTTP
                    )
import Network.HTTP.Base ( Request(..) )
import Network.URI ( parseURI )
import System.Directory ( doesFileExist )
import System.Environment ( getArgs )
import System.IO ( hClose
                 , hPutStr
                 , hPutStrLn
                 , IOMode(WriteMode)
                 , openFile
                 , stderr
                 )
import Text.Printf ( printf )

indices :: [String]
indices =
  map format1 [0..9] ++ map format2 [0..14] ++ ["40001-41284" :: String]
  where
    format1 index =
      printf "%d-%d" ((index * 1000 + 1) :: Int)
                     (((index + 1) * 1000) :: Int)
    format2 index =
      printf "%d-%d" ((10000 + 2 * index * 1000 + 1) :: Int)
                     ((10000 + (2 * index + 2) * 1000) :: Int)

main :: IO ()
main = do
  [dir] <- getArgs
  updateDownloads dir

updateDownloads :: FilePath -> IO ()
updateDownloads path = do
  let
    fileNames = map (\index ->
      (index, path ++ "/tv_and_movie_freqlist" ++ index ++ ".html")) indices
  missing <-
    filterM (\(_, fileName) -> liftM not $ doesFileExist fileName) fileNames
  mapM_ (\(index, fileName) -> do
    let
      url =
        "http://en.wiktionary.org/wiki/Wiktionary:Frequency_lists/TV/2006/" ++
        index
      request =
        Request
          { rqURI = fromJust $ parseURI url
          , rqMethod = GET
          , rqHeaders = []
          , rqBody = ""
          }
    hPutStrLn stderr $ "Downloading " ++ show url
    resp <- simpleHTTP request
    case resp of
      Left _ -> hPutStrLn stderr $ "Error connecting to " ++ show url
      Right response -> do
        let
          html = rspBody response
        file <- openFile fileName WriteMode
        hPutStr file html
        hClose file
    return ()) missing

Я хотел бы запускать загрузки параллельно. Я знаю о par, но не уверен, что его можно использовать в монаде IO, и если да, то как?

ОБНОВЛЕНИЕ: Вот мой код, переопределенный с помощью Control.Concurrent.Async и mapConcurrently:

module Main where

import Control.Concurrent.Async ( mapConcurrently )
import Control.Monad ( filterM
                     , liftM
                     )
import Data.Maybe ( fromJust )
import Network.HTTP ( RequestMethod(GET)
                    , rspBody
                    , simpleHTTP
                    )
import Network.HTTP.Base ( Request(..) )
import Network.URI ( parseURI )
import System.Directory ( doesFileExist )
import System.Environment ( getArgs )
import System.IO ( hClose
                 , hPutStr
                 , hPutStrLn
                 , IOMode(WriteMode)
                 , openFile
                 , stderr
                 )
import Text.Printf ( printf )

indices :: [String]
indices =
  map format1 [0..9] ++ map format2 [0..14] ++ ["40001-41284" :: String]
  where
    format1 index =
      printf "%d-%d" ((index * 1000 + 1) :: Int)
                     (((index + 1) * 1000) :: Int)
    format2 index =
      printf "%d-%d" ((10000 + 2 * index * 1000 + 1) :: Int)
                     ((10000 + (2 * index + 2) * 1000) :: Int)

main :: IO ()
main = do
  [dir] <- getArgs
  updateDownloads dir

updateDownloads :: FilePath -> IO ()
updateDownloads path = do
  let
    fileNames = map (\index ->
      (index, path ++ "/tv_and_movie_freqlist" ++ index ++ ".html")) indices
  missing <-
    filterM (\(_, fileName) -> liftM not $ doesFileExist fileName) fileNames
  pages <-
    mapConcurrently (\(index, fileName) -> getUrl index fileName) missing
  mapM_ (\(fileName, html) -> do
    handle <- openFile fileName WriteMode
    hPutStr handle html
    hClose handle) pages
  where
    getUrl :: String -> FilePath -> IO (FilePath, String)
    getUrl index fileName = do
      let
        url =
          "http://en.wiktionary.org/wiki/Wiktionary:Frequency_lists/TV/2006/" ++
          index
        request =
          Request
          { rqURI = fromJust $ parseURI url
          , rqMethod = GET
          , rqHeaders = []
          , rqBody = ""
          }
      resp <- simpleHTTP request
      case resp of
        Left _ -> do
          hPutStrLn stderr $ "Error connecting to " ++ show url
          return ("", "")
        Right response ->
          return (fileName, rspBody response)
4b9b3361

Ответ 1

Посмотрите mapConcurrently из библиотеки "async" Саймона Марлоу.

Он сопоставляет действие IO параллельно и асинхронно с элементами контейнера Traversable и ждет всех действий.

Пример:

{-# LANGUAGE PackageImports #-}

import System.Environment (getArgs)

import "async" Control.Concurrent.Async (mapConcurrently)

import "HTTP" Network.HTTP
import "HTTP" Network.Stream (Result)
import "HTTP" Network.HTTP.Base (Response(..))
import System.IO
import "url" Network.URL (encString)

import Control.Monad


getURL :: String -> IO (String, Result (Response String))
getURL url = do
        res <- (simpleHTTP . getRequest) url
        return (url, res)

main = do
     args <- getArgs
     case args of
          [] -> putStrLn "usage: program url1 url2 ... urlN"
          args -> do
                results <- mapConcurrently getURL args
                forM_ results $ \(url, res) -> do
                        case res of
                                Left connError -> putStrLn $ url ++ "; " ++ show connError
                                Right response -> do
                                        putStrLn $ url ++ "; OK"
                                        let content = rspBody response

                                            -- make name from url
                                            fname = encString True (`notElem` ":/") url ++ ".html"
                                        writeFile fname content    

Ответ 3

Поскольку операции связаны с IO, для этого обычно используется /not/use par, поскольку он ничего не делает для операций ввода-вывода.

Вам понадобится явная модель concurrency, чтобы скрыть задержку загрузки.

Я бы порекомендовал MVars или TVars в сочетании с forkIO.

Аббревиатура рабочей очереди часто полезна для этого стиля проблемы: нажимайте все URL-адреса в очередь и фиксируйте набор рабочих потоков (например, N * k) для N ядер, выполняйте задания до завершения. Затем завершенные работы будут добавлены к каналу связи, переданному в основной поток.

Вот пример из параллельной проверки URL, используя каналы.

http://code.haskell.org/~dons/code/urlcheck/Check.hs

Ответ 4

Другая версия, которая использует async mapConcurrently и http-conduit keep- живой менеджер

{-# LANGUAGE PackageImports, FlexibleContexts #-}

import System.Environment (getArgs)

import "http-conduit" Network.HTTP.Conduit
import qualified "conduit" Data.Conduit as C
import "http-types" Network.HTTP.Types.Status (ok200)

import "async" Control.Concurrent.Async (mapConcurrently)
import qualified "bytestring" Data.ByteString.Lazy as LBS
import qualified "bytestring" Data.ByteString as BS
import "transformers" Control.Monad.Trans.Class (lift)
import "transformers" Control.Monad.IO.Class (liftIO)
import "url" Network.URL (encString)
import "failure" Control.Failure (Failure(..))

import Control.Monad
import System.IO

taggedRequest :: Failure HttpException m => String -> m (String, Request m')
taggedRequest url = do
        req <- parseUrl url
        return (url, req)

taggedResult :: (C.MonadBaseControl IO m, C.MonadResource m) => Manager -> (String, Request m) -> m (String, Response LBS.ByteString)
taggedResult manager (url, req) = do
        res <- httpLbs req manager
        return (url, res)

main = do
     args <- getArgs
     case args of
          [] -> putStrLn "usage: program url1 url2 ... urlN"
          args -> do
                requests <- mapM taggedRequest args
                withManager $ \manager -> liftIO $ do

                        results <- mapConcurrently (C.runResourceT . taggedResult manager) requests

                        forM_ results $ \(url, Response status _ _ bsBody) -> do
                             putStrLn $ url ++ " ; " ++ show status   
                             let fileName = encString True (`notElem` ":/") url ++ ".html"
                             when (status == ok200) $ LBS.writeFile fileName bsBody