Ruby による Streem の実現

2014-12-19 (鈴)

1. はじめに

Ruby の原作者 Yukihiro "Matz" Matsumoto 氏が設計しているストリーム・ベースのコンカレント・スクリプティング言語 Streem [githum.com] は Ruby に似た構文を備えており,LINQ to Objects に似たパイプライン的な動作を基本としている。

設計者の意図はおそらくパイプラインの各構成要素を独立したスレッドとしてコンカレントに動作させることだろう。 しかし,例示されているコードについていえば,パイプラインの左辺に律速されることから,結局 LINQ to Objects と同じ動作になると考えてよい。

したがって,その範囲では前回の "LINQ ライク・メソッドの高速化と Ruby 1.9" (2012年) で示した方法を使って Streem の動作を効率よく実現できる。 ここでは Ruby に元からあるクラス名その他と衝突せず,その振舞を変更することがない Ruby 2.0 の行儀のよい 1ライブラリとして Streem を実現した例を提示する。

このライブラリ (streem.rb) を使ったとき,Streem のコードは次のような Ruby プログラムとして表現される。

STD_IN | STD_OUT
seq(100) .| {|x|
  if x % 15 == 0
    "FizzBuzz"
  elsif x % 3 == 0
    "Fizz"
  elsif x % 5 == 0
    "Buzz"
  else
    x
  end
} | STD_OUT

ここで seq(100){|x| の間にある .| に注意されたい。 Ruby はブロックを二項演算子の右辺として書くことができないため,パイプライン演算子「|」をメソッドとして呼び出している。

以下,streem.rb が定義する各クラス,関数,定数について説明する。

2. Streem クラスと Iter クラス

パイプラインの左辺の値を表すクラスとして Streem を定義する。 このクラスは each メソッドの存在を仮定して Enumerable を mix-in する。

つまり,このクラスは each メソッドの実装を派生クラスに託したいわゆる抽象クラスです。

Streem はパイプライン演算子として演算子「|」を定義する。

この演算子はもともとは数に対する Ruby のビット OR 演算子です。 しかし,新規に作ったクラスには定義されませんから,この演算子のメソッドがあるかどうかで数のクラスかどうかを判定するような作為的なプログラムを除き, ここでの定義が従来の Ruby プログラムの動作に影響を与えることはありません。

ブロックが与えられているときは 前回§3_collect メソッド (これは LINQ to Objects の Select メソッドに相当する) と同じようにして,左辺 (self) の各要素 (e) をそれぞれブロックに与えて得られた値 (yield e) を次段 (y) へと与えるような新しいパイプラインの左辺の値を構築する。

ブロック以外の引数が右辺 (rhs, right-hand side) に与えられているときは単に左辺 (self) を引数として適用する。

class Streem
  include Enumerable

  def |(rhs=nil)
    if rhs.nil?
      return Iter.new {|y|
        each {|e|
          y[yield e]
        }
      }
    else
      return rhs[self]
    end
  end
end

左辺とブロックをパイプライン演算子でつなげた結果を表す Iter クラスは,パイプライン演算子を Streem の派生クラスとして継承している点を除けば,前回§3 の同名のクラスと同じである。 構築時に与えられたブロックを自分自身の each メソッドとして使う。 このクラスがパイプライン演算子を継承しているから,そのインスタンス自身がパイプラインの左辺となって,さらに別の右辺を次段としてつなげることができる。

class Iter < Streem
  def initialize(&block)
    @block = block
  end

  def each(&y)
    @block[y]
    return
  end
end

3. Seq クラスと seq 関数

Seq は Enumerable であるような任意の列 (sequence) をパイプラインの左辺に仕立てるクラスである。 構築時に与えられたもとの列 (source) の each メソッドを自分自身の each メソッドとして使う。 もし構築時に整数 (Integer) が与えられたときは,その整数までの範囲 (Range) をもとの列とする。

class Seq < Streem
  def initialize(source)
    case source
    when Enumerable
      @source = source
    when Integer
      @source = 1..source
    else
      raise ArgumentError.new(source)
    end
  end

  def each(&y)
    @source.each &y
    return
  end
end

Seq.new(100) と書くかわりに単に seq(100) と書けるように seq 関数を定義する。

def seq(x)
  return Seq.new(x)
end

4. 定数 STD_IN, STD_OUT, TO_ARRAY

STD_IN は,コマンド行引数として与えられたファイルまたは標準入力 (ARGF) から,1行ずつ gets で入力して得られる (仮想的には無限の) 列を,パイプラインの左辺値に仕立てた定数である。 Iter インスタンスに与えたブロック ({|y| }) が Iter インスタンス自身の each メソッドとして使われることを思い出されたい。

STD_IN = Iter.new {|y|
  while line = ARGF.gets
    y[line]
  end
}

STD_OUT は典型的にはパイプラインの右辺として使われる定数である。 入力として与えられた左辺の列 (seq) の各要素 (e) を順に printf で印字する。

STD_OUT = ->seq {
  seq.each {|e|
    printf("%s\n", e)
  }
}

TO_ARRAY も典型的にはパイプラインの右辺として使われる定数である。 入力として与えられた左辺の列 (seq) を配列にして返す。

TO_ARRAY = ->seq {              # => [a1, a2, .., an]
  return seq.to_a
}

実行例を示す。

01:~/tmp$ echo -e '額田郡\n宝飯郡' | ruby -r ./streem.rb -e 'STD_IN | STD_OUT'
額田郡

宝飯郡

01:~/tmp$ ruby -r ./streem.rb -e 'p seq(5) .| {|x| x + 10} | TO_ARRAY'
[11, 12, 13, 14, 15]
01:~/tmp$  

5. zip 関数と cat 関数

Streem の開発サイトでの Stream control structure・Issue #16 [githum.com] で言語の設計者が提案している zip 関数と cat 関数は次のように実現できる。

ただし,簡単のため2引数に限りました。

zip 関数は二つのパイプライン左辺値 (実際には,より一般に,二つの Enumerable) を引数にとり,それぞれの要素を組にした列であるような新しいパイプライン左辺値を返す。

def zip(a, b)                   # => [a1, b1], [a2, b2], ..., [an, bn]
  return Iter.new {|y|
    gen = b.to_enum
    a.each {|ai|
      begin
        bi = gen.next
      rescue StopIteration
        break
      end
      y[[ai, bi]]
    }
  }
end

cat 関数は二つのパイプライン左辺値 (実際には,より一般に,二つの Enumerable) を引数にとり,第1のパイプラインの各要素が尽きたら第2のパイプラインの各要素を与えるような新しいパイプライン左辺値を返す。

def cat(a, b)                   # => a1, a2, ..., an, b1, b2, ... bn
  return Iter.new {|y|
    a.each {|e| y[e]}
    b.each {|e| y[e]}
  }
end

実行例を示す。 zip の例では,STD_IN の入力が1行ずつ順にパイプラインを流れていること,zip の第1引数の要素が先に尽きたため STD_IN の入力が途中で打ち切られていることに注意されたい。

01:~/tmp$ irb --simple-prompt -r ./streem.rb
>> zip(seq(1..5) .| {|i| i*10}, STD_IN) | STD_OUT
A
[10, "A\n"]
quick brown   
[20, "quick brown\n"]
fox
[30, "fox\n"]
jumps over
[40, "jumps over\n"]
the
[50, "the\n"]
=> nil
>> cat(seq(1..5), seq(101..105)) | TO_ARRAY
=> [1, 2, 3, 4, 5, 101, 102, 103, 104, 105]
>> 

6. おわりに

ここでは Ruby による Streem の非コンカレントな一つの実現例を示した。 Streem プログラムの多くの部分は結局のところパイプラインの左辺に律速されて,ここで示した実現例のような動作に帰着すると考えられる。 本格的な Streem を少ない個数の実行時スレッドで実装するための基礎としてこの実現例が役立つと期待する。


Copyright (c) 2014 OKI Software Co., Ltd.