資料室へ

Dart のイテレータと非同期処理

2015-03-05 (鈴)

1. はじめに

ここでは Dart のイテレータと非同期処理について概説し,Dart がいわゆるコールバック地獄に陥ることなく非同期入力を扱えることを示す。 自明でないプログラム例として Lisp の read 関数の簡易な実装を与える。

Dart の実装として Dart Download Archive [dartlang.org] で配布されている Dart SDK 1.9.0-dev.9.1 を使う。

2. sync*/yield と Iterable とイテレータ

Dart は Python や C# と同じように yield 文を使ってイテレータを構成できる。 関数を sync* 修飾すると戻り値が Iterable<T> 型になるとともに関数内で yield 文が使えるようになる。 ここで T は yield 文に与える式の型から決まる。下記はこれを使って定義したフィボナッチ数列の無限列である。

Iterable<int> fib() sync* {
  int a = 0;
  int b = 1;
  for (;;) {
    yield a;
    int c = a;
    a = b;
    b += c;
  }
}

Dart は Iterable クラスに対して C# の LINQ to Objects と同じようなメソッドをひととおり用意しており,無限の列を破綻なく扱える。

for (int e in fib().take(9))
  print(e);

Iterable オブジェクトの iterator プロパティを使えば,Ruby の用語法でいう外部イテレータとして使うこともできる。

Iterator<int> iter = fib().take(9).iterator;
while (iter.moveNext())
  print(iter.current);
つまり,Dart は標準で LINQ to Objects を提供しているもう一つの言語です。 C# で解く「なぜ関数プログラミングは重要か」 の議論をそのまま Dart にも適用できますから,Dart もまた C# と同じく Haskell などの遅延評価的な関数型言語の特長である「問題を分解する新しい概念的道具」を備えていると主張できます。

OS X 上での実行例を示す。dart-sdkbin ディレクトリに PATH が通してあると仮定する。

01:~/tmp$ cat fib_test.dart
#!/usr/bin/env dart

Iterable<int> fib() sync* {
  int a = 0;
  int b = 1;
  for (;;) {
    yield a;
    int c = a;
    a = b;
    b += c;
  }
}

main() {
  for (int e in fib().take(9))
    print(e);

  print("--");

  Iterator<int> iter = fib().take(9).iterator;
  while (iter.moveNext())
    print(iter.current);
}
01:~/tmp$ dartanalyzer fib_test.dart
Analyzing [fib_test.dart]...
No issues found
01:~/tmp$ ./fib_test.dart
0
1
1
2
3
5
8
13
21
--
0
1
1
2
3
5
8
13
21
01:~/tmp$  
Iterable クラスと Iterator クラスはともに "dart:core" ライブラリにありますから import せずに使えます。

3. イベント・ループと Future と async/await

ここでいったん Dart プログラムの実行のされかたに立ち戻ろう。 The Event Loop and Dart [dartlang.org] で説明されているように Dart は main 関数を実行した後,イベント・キューが空になるまでイベント・ループを繰り返す。 Future クラスはイベント・ループで非同期に処理される個々のタスクを表現する。

Dart の標準ライブラリは一部を除きファイル入力などをこの Future クラス等による非同期処理として提供している。 例えば,ファイルの内容を文字列として返す readAsString メソッドは Future<String> を戻り値とし,イベント・ループの中で非同期的に結果を与える。 外部から入力を受け取る実用的な Dart プログラムの多くにとっては main 関数が終わってイベント・ループに入ってからが実質的な処理を遂行する「本番」である。

async 修飾した関数の中で await 式を使えば,コールバック関数に分断することなく,ひとつづきの処理としてこの非同期待ち合わせを記述できる。 これにより,ファイルから内容を読み取って表示する関数は次のように書ける。

Future printFile(String fileName) async {
   File file = new File(fileName);
   String text = await file.readAsString();
   print(text);
}
Future クラスは "dart:async" ライブラリから,File クラスは "dart:io" ライブラリからそれぞれ import できます。

async 修飾された関数はイベント・ループから実行されるタスクを表現する。 したがって戻り値の型は Future または Future<T> になる。 ここで T の型は (もしあれば) return 文に与える式の型から決まる。

await 式はオペランドとして Future 型の式をとる。 例示したような「await (Future<String>型の式)」の評価では,イベント・ループにいったん制御を渡して String 型の結果が得られるまで非同期に待ち合わせる。 結果が得られたら元の場所に制御を戻して String 型の結果を await 式の値として返す。

実行例を示す。

01:~/tmp$ dartanalyzer file_test.dart
Analyzing [file_test.dart]...
No issues found
01:~/tmp$ ./file_test.dart
#!/usr/bin/env dart

import "dart:async";
import "dart:io";

Future printFile(String fileName) async {
   File file = new File(fileName);
   String text = await file.readAsString();
   print(text);
}

main() {
  printFile("file_test.dart");
}

01:~/tmp$  
ここで main 関数から直接 printFile 関数の本体を実行しているわけではないことに注意してください。 main 関数は printFile 関数の呼び出しによって Future オブジェクトを構築した後,何もせずにすぐに終了します。

しかし Future オブジェクトは構築とともにイベント・キューに入れられますから,main 関数の終了後にその処理,つまり printFile 関数の本体がイベント・ループから実行されます。 この本体は await file.readAsString() の式の評価でいったんイベント・ループに制御を戻します。 String 型の結果が得られたらまた本体に戻って変数 text に結果を代入し,print(text) で表示して終わります。 イベント・キューにはもうタスクがなくなりますから,イベント・ループも終わり,プログラムが終了します。

4. Stream と StreamIterator

ここで前の二節をあわせて,もしも Iterable オブジェクトが表現するような列の要素がイベント・ループの中で非同期的に与えられたらどうなるか考えてみよう。 はたして Iterable クラスのような非同期列のクラスはあり得るだろうか?

実は Dart は Stream クラスでこのような非同期列を実現している。 例えば File オブジェクトを openRead した結果や標準入力 stdin は,バイト値の並びの (概念的には無限に続き得る) 非同期列の型 Stream<List<int>> として扱われる。

標準入力を UTF8 のバイト列と見なして Unicode 文字列に変換し,改行ごとに分割した (概念的には無限に続き得る) Stream<String> 型の非同期列は次のように書ける。

import "dart:async";
import "dart:convert";
import "dart:io";

Stream<String> lineStream
  = stdin.transform(UTF8.decoder).transform(const LineSplitter());

同様に,イベント・ループの中で一つずつ列の要素を与えるフィボナッチ数列は次のように書ける。

Stream<int> fibStream() {
  Iterable<int> fib() sync* {
    int a = 0;
    int b = 1;
    for (;;) {
      yield a;
      int c = a;
      a = b;
      b += c;
    }
  }
  return new Stream<int>.fromIterable(fib());
}
Dart Programming Language Specification, 2nd Edition の記述によれば関数の async* 修飾と yield 文によってじかに Stream<int> を戻り値にできます。 しかし,残念ながら Dart 1.9.0-dev.9.1 ではまだその仕様は実現されていませんから,sync* 修飾と yield 文でいったん Iterable<int> 値を作ってから Stream<int>.fromIterable コンストラクタを使って Stream<int> 型の戻り値にします。

Iterable<T> クラスと同じく Stream<T> クラスにも LINQ to Objects に相当するメソッドがひととおり用意されている。 Stream<T> 型の列の個々の T 型の要素が非同期的に与えられたとき,それを受ける手段として Stream 型自身は listen メソッドを用意している。

下記の式はフィボナッチ数列の最初の九つの要素 e についてイベント・ループの中で print(e) を行う。

fibStream().take(9).listen((int e) => print(e));

実行例を示す。

01:~/tmp$ cat fstream_test.dart
#!/usr/bin/env dart

import "dart:async";

Stream fibStream() {
  Iterable fib() sync* {
    int a = 0;
    int b = 1;
    for (;;) {
      yield a;
      int c = a;
      a = b;
      b += c;
    }
  }
  return new Stream.fromIterable(fib());
}

main() {
  fibStream().take(9).listen((int e) => print(e));
}
01:~/tmp$ ./fstream_test.dart
0
1
1
2
3
5
8
13
21
01:~/tmp$  

Stream<T> には Iteralble<T> クラスの iterator プロパティに該当するプロパティはないが,同様の処理を行うクラス StreamIterator<T> が標準で用意されている。 StreamIterator<T> は moveNext メソッドが Future<bool> を返す非同期メソッドであること以外は iterator プロパティの型である Iterator<T> と同様である。 ともにイテレータの現在値は T 型の current プロパティで得られる。

StreamIterator のようなクラスは Stream に対する listen メソッドの戻り値である StreamSubscription オブジェクトの pause メソッドと resume メソッドで処理の進行を制御することで利用者定義することもできます。 実ははじめ StreamIterator の存在に気付かず,似たようなクラスを自分で書こうとしていました。 とりあえず動くものを作るところまでは簡単でしたが,正しい実装はかなり複雑になりそうでした。 いろいろな場合に対して正しく動作させようと苦心しはじめたところで StreamIterator の存在に気付き,よろこんで最初の試みを放棄しました。
正解ともいえる StreamIterator の定義と実装は dart-sdklib/async ディレクトリの stream.dartstream_impl.dart にあります。 これを読んで,うっかりこんな込み入った車輪の再発明にはまらずに本当によかったと思いました……。

このクラスは次のように使うことができる。

main() async {
  StreamIterator<int> iter = new StreamIterator(fibStream().take(9));
  while (await iter.moveNext())
     print(iter.current);
}

実行結果は listen メソッドを使った例と変わらない。

main 関数を async 修飾した場合,(少なくとも概念的には) main 関数それ自体は main 関数本体から構成された Future オブジェクトを構築してすぐに終了します。 そして,イベント・ループの中で Future オブジェクトにくるまれた main 関数本体が実行されます。

5. Lisp Reader

Dart では StreamIterator を async/await と組み合わせることにより,非同期的なイベント・ループの中でファイルの1行入力のような処理を従来の同期的なプログラムの組み方そのままに実現できる。 自明でない例として Lisp の S 式を再帰下降法で標準入力から読み取るプログラムを示す。

必要なライブラリを import する。

import "dart:async";
import "dart:io";
import "dart:convert";

Lisp の cons セルを定義する。 簡単のため文字列表現にはドット対の表記を使い,循環リストなどの対策はしない。

class Cell {
  var car;
  var cdr;

  Cell(this.car, this.cdr);

  @override String toString() {
    return "($car . $cdr)";
  }
}

Lisp の read 関数を Reader クラスの read メソッドとして与える。 これは七年前に書いた L2 Lisp 7.2Python スクリプトの Reader クラス や L2 Lisp 7.3Ruby スクリプトの Reader クラスを簡略に Dart に翻訳したものである。 async/await による非同期メソッドであるにもかかわらず,元の Python/Ruby スクリプトとプログラムの字面上の構造が同じであることに注意されたい。

class Reader {
  StreamIterator<String> _rf;
  var _token;
  Iterator<String> _tokens = <String>[].iterator;

  static final _tokenPat = new RegExp('\\s+|;.*\$|(".*?"|,@?|[^()\'`~ ]+|.)');

  Reader(this._rf);

  read() async {
    await _readToken();
    return await _parseExpression();
  }

  _parseExpression() async {
    if (_token == "(") {
      await _readToken();
      return await _parseListBody();
    } else {
      return _token;
    }
  }

  _parseListBody() async {
    if (_token == ")") {
      return null;
    } else {
      var e1 = await _parseExpression();
      await _readToken();
      var e2 = await _parseListBody();
      return new Cell(e1, e2);
    }
  }

  _readToken() async {
    while (! _tokens.moveNext()) {
      bool ok = await _rf.moveNext();
      if (! ok) {
        _token = #EOF;
        return;
      }
      var line = _rf.current;
      var matches = _tokenPat.allMatches(line);
      _tokens = matches
        .map((m) => m.group(1))
        .where((s) => s != null)
        .iterator;
    }
    _token = _tokens.current;
  }
}

再帰呼び出しを含む呼び出し階層を積み上げた上で呼び出される _readToken メソッドでは,その呼び出し階層を維持しながら await _rf.moveNext() でイベント・ループにいったん戻る。 新しい1行が入力されると,await 式の値を true にして ok 変数に代入するところから実行を再開する。

実装の実態はともかく概念的には Scheme の call/cc に見られるような「継続」をイベント・ループごしに渡していると見ることができる。

_readToken メソッドはこうして得た新しい文字列である line を _tokenPat の正規表現オブジェクトでトークン単位に分解します。 このとき,この結果を文字列イテレータ _tokens へと仕上げるためにいわゆるメソッド構文の LINQ を使っている点にも注目してください。 Dart の標準ライブラリは Iteralble クラスや Stream クラスをふんだんに活用したモダンな設計になっていますから,(Dart の用語法では LINQ とは言いませんが) LINQ のような式を利用する機会,利用すべき機会が豊富にあります。

非同期メソッドとして定義した read 関数を await 式で呼び出すために main 関数も async 修飾する。

Future main() async {
  Stream<String> lineStream
    = stdin.transform(UTF8.decoder).transform(const LineSplitter());
  var lines = new StreamIterator(lineStream);
  var reader = new Reader(lines);
  for (;;) {
    stdout.write("> ");
    var s = await reader.read();
    if (s == #EOF) break;
    print(s);
  }
  print("Goodbye");
}

実行例を示す。 開きかっこに対応する閉じかっこが入力されるまで read メソッドから戻らずに次の行入力へと続く点に注目されたい。

01:~/tmp$ ./lisp_read.dart
> 99
99
> (+ a b c)
(+ . (a . (b . (c . null))))
> (a quick brown fox
   (jumps over) (the
     lazy dogs)
  )
(a . (quick . (brown . (fox . ((jumps . (over . null)) . ((the . (lazy . (dogs .
 null))) . null))))))
> Goodbye
01:~/tmp$  

Control-D を打鍵して標準入力を EOF にすると Goodbye と表示してプログラムを終了する。

6. おわりに

従来の非同期プログラミングでは,listen メソッドなどに与えたコールバック関数が終わるとき関数の呼び出し階層が無に帰するため, 再帰下降法のようなアルゴリズムをそのまま素直には実現できなかった。 従来どおりのアルゴリズムを使えず,処理が細切れのコールバックに寸断されることがプログラミングに困難をもたらしていた。 この惨状はしばしば「コールバック地獄」とも呼ばれている。

Dart は単一スレッドでの非同期プログラミングを言語仕様と標準ライブラリで全面的にサポートしながら,この問題を解決している点において優れている。 単一スレッドによるイベント・ループという近年要求されることが多い文脈で Dart が普及するか,または普及した他の有力な言語が Dart のような機能を備えることが望まれる。


Copyright (c) 2015 OKI Software Co., Ltd.