Subscribed unsubscribe Subscribe Subscribe

Haskellでなぜストリーム処理ライブラリが必要なのか

関数型ストリーム処理勉強会で発表したので、ブログにも書いておきます。

元になった資料はこちら

Haskellでストリームデータを扱う方法としては、Handleを使う方法、Lazy IOがありました。しかし、それぞれに問題があり、再利用性が高く堅牢なコードを書くことは困難でした。

この状況を解決すべく、ConduitPipesio-streamsなどのストリーム処理ライブラリが現れたのですが、そもそもそれらが必要とされた経緯は具体的にはどのようなものだったのでしょうか?

元ネタ: http://okmij.org/ftp/Haskell/Iteratee/talk-FLOPS.pdf

ソースコード: https://github.com/fujimura/functional-stream-processing-meetup-sample

そもそもストリームとは

必要に応じて処理されるデータの連なりのこと。具体的には標準入力、ファイル、ソケットなど。

上記の定義はScheme実装提案書より。 この定義では入力側をストリームと読んでいますが、出力側(標準出力への書き込みなど)もストリームとして扱われてる場合が多い気がします。

ストリームは多くの場合、IOです。

具体例

200MBのテキストファイルに含まれる空白文字の数を調べる、という例で比較します。200MBのファイルを一気にメモリ上に読み込むのは効率的とは言えません。ストリーム処理の出番です。

Lazy IOの場合

run :: FilePath -> IO ()
run fname = do
  file <- Text.Lazy.readFile fname
  print (countWhiteSpace file)

countWhiteSpace :: Text -> Int
countWhiteSpace = fromIntegral . Text.Lazy.length . Text.Lazy.filter isSpace
  • 良いところ
    • 単純でわかりやすい
  • 悪いところ
    • 純粋な関数で例外が起こる
      • readFileで読んだファイルに不正なバイト列が含まれていた場合は例外が起こります。そして、Haskellは遅延評価なので、値は必要になった時に評価され、例外は評価時に発生します。つまり、評価されるまで例外は発生しないのです!この性質により、なんと純粋な関数で例外が起こることがあります。これのデバッグは非常に難しいです。
      • 具体的には上記のコードの場合、Text.Lazy.filterで不正なバイト列が評価される時に発生します。
    • いつ、どれだけファイルが開かれ、開放されるか予測しにくい
      • 上記と同じ理由でファイルが開放されるタイミングも単純には予測できません*1。また大量のファイルが同時に開かれてしまう場合もあります。

Handleの場合

run :: FilePath -> IO ()
run fname = do
  handle <- openFile fname ReadMode
  i <- countWhiteSpace handle
  print i

countWhiteSpace :: Handle -> IO Int
countWhiteSpace h = loop 0
  where
    loop i = do
      eof <- hIsEOF h
      if eof
        then return i
        else do
          c <- hGetChar h
          if isSpace c then loop (i+1) else loop i
  • 良い所
    • 素朴で理解しやすい
  • 悪いところ
    • ローレベル
      • 素朴さの裏返しですが、Olegさん言う所の「Haskellで書かれた、典型的なCのコード」*2で、実にローレベルです。関数プログラミングとは何だったのか!
      • ロジックを書いている部分より、Handleの世話をしている部分が多い
        • 実際、ロジックが複雑になると大変なことになります。コード量が増えると、それだけバグの入る可能性も増えます。

io-streamsの場合

run :: FilePath -> IO ()
run fname = do
  i <- Streams.withFileAsInput fname countWhiteSpace
  print i

countWhiteSpace :: InputStream ByteString -> IO Int
countWhiteSpace is = Streams.decodeUtf8 is >>=
                     Streams.map (Text.length . Text.filter isSpace) >>=
                     Streams.fold (+) 0
  • 良い所
    • ハイレベルでコンポーザブル
      • ストリーム自体の扱いはio-streamsが隠蔽しています。
      • ストリームを受け取る関数を、Unixのパイプのように >>=で処理を繋ぎます。繋がれたそれぞれ処理は抜き差しがしやすいので、部品化も簡単です。io-streamsで入力を処理する場合、 InputStream a -> IO (InputStream a) を繋いでいくことになります。
    • 安全な例外
      • それぞれの部品はIO (InputStream -> a)を返すので、bracketで例外を捕捉できます。
    • 少ないメモリ使用量
      • io-streamsが面倒見てくれます。
  • 悪いところ
    • \(^o^)/ない

まとめ

既存の解決方法には問題がある。

  • Lazy IO: 純粋な関数で例外が起こる、リソース管理が難しい

  • Handle: 関数プログラミングっぽくないローレベルなコードになる

ストリーム処理ライブラリで、それら問題を解決できる。

  • io-streams: サプライズの少ない例外*3、ハイレベルでコンポーザブル
    • ただ、リソース管理は自分でやらないといけない。

ということで、ストリーム処理ライブラリを活用すると、効率がよく、見通しがよく、サプライズが少ないプログラムが書けるようになります。

参考資料&リンク集

*1:seqやBangPatternsを使ってサンクを潰して回る羽目になります

*2:http://okmij.org/ftp/Haskell/Iteratee/talk-FLOPS.pdfの7ページ参照

*3:実はこれHandleでもwithFileを使えば実現できます。