JRuby で使う Tomcat 6 の 'Comet'

2008.2/8 (鈴)



1. はじめに

前回「JRuby 1.1 と ERB サーブレット」の続編として, Tomcat 6 が用意する Comet 技術の JRuby 1.1 による利用を試みる。 サンプルとして,実時間で更新される上図のような chat アプリケーションを作成する。 前回の改訂版である下記の構成要素を,前回と同じく GNU LGPL の下におく。

ファイルの基本的な構成,JRuby, Tomcat, Java の各バージョン,使用マシン等は前回と同じである。 2008 年 2 月 7 日にリリースされた Tomcat 6.0.16 での動作も確認した。

2. Comet と Tomcat 6

Comet とは,HTTP サーバで発生したイベントを即座に Web ブラウザに送信するための技術である。 サーバは HTTP レスポンスを即座に閉じずに保留する。 実装技術としては以前から知られていたが,A. Russel が 2006 年 3 月 3 日に著した Comet: Low Latency Data for the Browser により,技術として固有の名前が与えられた。

Apache Tomcat 6.0 のドキュメント Advanced IO and Tomcat に記されているように,現在の Tomcat は Servlet API の範囲外の独自の仕様として Comet 技術をサポートしている。

上記 Tomcat 6.0 のドキュメントに部分的な実装が示されているように,Comet の典型的な用途は,いわゆる チャット (chat) アプリケーションである。 他の参加者からの書込みという不定期なイベントを, HTTP サーバが各参加者のブラウザに実時間的に伝播するために Comet が利用される。

この部分的な実装をもとに Tomcat 6.0 と Ajax/JavaScript でチャット・アプリケーション を作成した例が,昨年末の @IT の記事 Tomcat 6 で実現! Ajax を超える通信技術 Comet にある。 Comet を利用して実用化された代表的なチャット・アプリケーションとしては Lingr がある。 一昨年の CNET の記事 Lingr and Comet - 技術解説編 によれば,その実装は Java 上のもう一つの代表的なサーブレット・コンテナ Jetty によっている。

3. server.xml

実用的に Comet を実装するには,少数のスレッドで多数のソケットを維持する必要がある。 conf/server.xml を下記のように変更して,HTTP コネクタに New I/O を使用する実装クラスを指定する。

$ diff server.xml~orig server.xml
50c50
<     <Connector port="8080" protocol="HTTP/1.1" 
---
>     <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" 
$ 

なお, Mac OS X 10.4.11 の場合,Java に -Djava.net.preferIPv4Stack=true オプションを渡す必要がある。例えば,あらかじめ下記のように環境変数をセットすればよい。

$ export JAVA_OPTS=-Djava.net.preferIPv4Stack=true

オプションを渡さないと下記のような例外が発生し,ポート 8080 での通信が全くできない。 この例外はログ・ファイル logs/catalina.out に記録される。

java.net.SocketException: Invalid argument
        at sun.nio.ch.Net.setIntOption0(Native Method)
        at sun.nio.ch.Net.setIntOption(Net.java:152)
        at sun.nio.ch.SocketChannelImpl$1.setInt(SocketChannelImpl.java:372)
        at sun.nio.ch.SocketOptsImpl.setInt(SocketOptsImpl.java:46)
        at sun.nio.ch.SocketOptsImpl$IP.typeOfService(SocketOptsImpl.java:249)
        at sun.nio.ch.OptionAdaptor.setTrafficClass(OptionAdaptor.java:158)
        at sun.nio.ch.SocketAdaptor.setTrafficClass(SocketAdaptor.java:330)
        at org.apache.tomcat.util.net.SocketProperties.setProperties(SocketProperties.java:171)
        at org.apache.tomcat.util.net.NioEndpoint.setSocketOptions(NioEndpoint.java:956)
        at org.apache.tomcat.util.net.NioEndpoint$Acceptor.run(NioEndpoint.java:1169)
        at java.lang.Thread.run(Thread.java:613)

4. web.xml

webapps/test/WEB-INF/web.xml下記のようにする。

<?xml version="1.0" encoding="ISO-8859-1"?>

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
   version="2.5"> 

   <servlet>
     <servlet-name>Rb</servlet-name>
     <servlet-class>JRubyServlet</servlet-class>
   </servlet>

   <servlet>
     <servlet-name>RbComet</servlet-name>
     <servlet-class>JRubyCometServlet</servlet-class>
   </servlet>

   <servlet-mapping>
     <servlet-name>Rb</servlet-name>
     <url-pattern>*.rb</url-pattern>
     <url-pattern>*.erb</url-pattern>
   </servlet-mapping>

   <servlet-mapping>
     <servlet-name>RbComet</servlet-name>
     <url-pattern>/comet/*</url-pattern>
   </servlet-mapping>
</web-app>

この設定により,前回の対応付けに加えて, http://localhost:8080/test/comet/ 以下をアクセスしたとき,新設の JRubyCometServlet が起動される。

5. JRubySingleton.java

今までと同じようにサーブレット間で JRuby のグローバルな状態を共有するには, 新しく設ける JRubyCometServlet のインスタンスと, 今までの JRubyServlet のインスタンスで,JRuby プロセッサを共有する必要がある。 下記のような JRubySingleton は,そのためのクラスである。

import java.util.*;

import org.jruby.Ruby;
import org.jruby.javasupport.JavaEmbedUtils;

/** 普通の JRuby サーブレットと Comet 用 JRuby サーブレットで
 *  共用される1個の JRuby プロセッサ
 */
public class JRubySingleton
{
    private static Ruby ruby = null; // JRuby 1.1 プロセッサ
    private static int count = 0;    // 参照カウンタ

    /** 初期化する。最初の呼び出しだけが実際に初期化を行う。
     * 初期化では require '_init' を行う。
     * @param path WEB-INF へのパス
     * @return JRuby プロセッサ
     */
    public static synchronized Ruby init(String path)
    {
        if (count == 0) {
            System.setProperty("jruby.home", path);
            ruby = JavaEmbedUtils.initialize(new ArrayList<String> ());
            ruby.setCurrentDirectory(path);
            ruby.evalScriptlet("require '_init'");
        }
        count++;
        return ruby;
    }

    /** 破棄する。最後の呼び出しだけが実際に破棄を行う。
     */
    public static synchronized void destroy()
    {
        count--;
        if (count == 0) {
            JavaEmbedUtils.terminate(ruby);
            ruby = null;
        }
    }
}

6. JRubyServlet.java

前々回以来,下記のように JRubyServlet の処理自体に変更はない。ただし, JRuby プロセッサの構築と破棄を JRubySingleton にくくり出したから,より簡素になっている。

import java.io.*;
import javax.servlet.*;
import javax.servlet.http.*;

import org.jruby.Ruby;
import org.jruby.javasupport.JavaEmbedUtils;
import org.jruby.runtime.builtin.IRubyObject;

public class JRubyServlet extends HttpServlet
{
    protected Servlet inner; // an inner servlet on the JRuby processor

    @Override public void init(ServletConfig config) 
        throws ServletException
    {
        super.init(config);
        ServletContext context = config.getServletContext();
        String path = context.getRealPath("/WEB-INF");
        Ruby ruby = JRubySingleton.init(path);
        IRubyObject ro = ruby.evalScriptlet("InnerServlet.new");
        inner = (Servlet) JavaEmbedUtils.rubyToJava(ruby, ro, Servlet.class);
        inner.init(config);
    }

    @Override public void destroy()
    {
        inner.destroy();
        JRubySingleton.destroy();
        super.destroy();
    }

    @Override protected void doGet(HttpServletRequest req,
                                   HttpServletResponse res)
        throws ServletException, IOException
    {
        inner.service(req, res);
    }

    @Override protected void doPost(HttpServletRequest req,
                                    HttpServletResponse res)
        throws ServletException, IOException
    {
        inner.service(req, res);
    }
}

7. JRubyCometServlet.java

Tomcat 6 で Comet を実現するには,サーブレット・クラスで org.apache.catalina.CometProcessor インタフェースを実装すればよい。 Web ブラウザからの GETPOST のリクエストに対し, event(CometEvent) メソッドが起動される。

そこで CometProcessor インタフェースを実装した JRubyServlet の派生クラスとして 下記のような JRubyCometServlet を定義する。 event(CometEvent) メソッドでは, JRuby による内部サーブレット inner を CometProcessor にキャストして,その event メソッドに処理を丸投げする。

import java.io.*;
import javax.servlet.*;

import org.apache.catalina.CometEvent;
import org.apache.catalina.CometProcessor;

public class JRubyCometServlet extends JRubyServlet implements CometProcessor
{
    public void event(CometEvent ce) throws ServletException, IOException {
        CometProcessor cp = (CometProcessor) inner;
        cp.event(ce);
    }
}

ここで内部サーブレット inner が, 通常のリクエストを処理する JRubyServlet インスタンスの inner とは,(Java オブジェクトとしても Ruby オブジェクトとしても) 別個のオブジェクトであることに注意しよう。 それぞれ同じ Ruby 式 InnserServlet.new を別々に評価した結果である。 別個の ServletConfig 引数で init され, 別個のタイミングで destroy されるから, 内部サーブレットをシングルトンとして共有することは適切ではない。

一方,inner が動作する Ruby のグローバルな環境は, JRubySingleton インスタンスが管理するシングルトンとして共有される。

JRubySingleton.java, JRubyServlet.java, JRubyCometServlet.java の3ファイルをコンパイルするには,(例えば) それらを webapps/test/WEB-INF/classes に置いて下記を行う。

$ cd webapps/test/WEB-INF/classes
$ javac -cp ../../../../lib/catalina.jar:../../../../lib/servlet-api.jar:../lib/jruby.jar JRuby*.java

Windows の Cygwin 上でコンパイルするには,上記クラスパスのコロンをセミコロンに変え, クラスパス全体をクォートで囲めばよい。

8. _init.rb

JRuby スクリプト webapps/test/WEB-INF/_init.rb が定義する内部サーブレット InnerServlet では,JRubyCometServlet でのキャスト演算を成功させるために CometProcessor を include する。

COMMON_LOCK = java.lang.Object.new

class InnerServlet < javax.servlet.GenericServlet
  include org.apache.catalina.CometProcessor

  attr :servlets

  def init(config)
    super
    @servlets = {}
  end

  def destroy
    log("destroy: %p" % @servlets)
    @servlets.each_value {|s| s.destroy}
    super
  end

  def servlet_for(spath, dir="")
    COMMON_LOCK.synchronized {
      dpath = dir + spath
      servlet = @servlets[dpath]
      if servlet.nil?
        log("load: %p" % dpath)
        case spath[-3..-1]
        when ".rb"
          path = "." + dpath    # "/Poi.rb" => "./Poi.rb"
          script = open(path) {|rf| rf.read}
          mod = Module.new
          mod.module_eval(script, path)
          name = spath[1..-4]   # "/Poi.rb" => "Poi"
          servlet = mod.const_get(name).new
        when "erb"
          path = ".." + dpath   # "/Poi.erb" => "../Poi.erb"
          script = open(path) {|rf| rf.read}
          erb = ERB.new(script, nil, "%")
          servlet = erb.def_class(ErbServlet, "expand(req, res)").new
          servlet.inner_servlet = self
        else
          raise("unexpected servlet path %p" % dpath)
        end
        servlet.init(self)
        @servlets[dpath] = servlet
      end
      return servlet
    }
  end

  def service(req, res)
    begin
      log("request %p: %p %p from: %p" %
          [req.method, req.requestURI, req.query_string, req.remote_host])
      servlet = servlet_for(req.servlet_path)
      case req.method
      when "GET" then servlet.doGet(req, res)
      when "POST" then servlet.doPost(req, res)
      else raise("unexpected method %p" % req.method)
      end
    rescue => ex
      log(ex.inspect + " at " + ex.backtrace.join("\n\tfrom "))
      raise
    end
  end

  def event(ev)
    begin
      req = ev.http_servlet_request
      ev_type = ev.event_type
      log("event %s: %p %p from: %p" %
          [ev_type, req.requestURI, req.query_string, req.remote_host])
      servlet = servlet_for(req.path_info, req.servlet_path)
      servlet.event(ev)
    rescue => ex
      log(ex.inspect + " at " + ex.backtrace.join("\n\tfrom "))
      raise
    end
  end
end

HTTP リクエストの受け口として service(req, res)event(ev) の二つのメソッドをもつ。前者は JRubyServlet インスタンスから呼び出され,後者は JRubyCometServlet インスタンスから呼び出される。処理の重複を避けるため, 両者の Ruby サーブレット取得処理を servlet_for(spath, dir="") メソッドに共通化する。

servlet_for メソッド内で Ruby スクリプト (普通,class 定義からなる) を評価するとき,スクリプト内の各 Ruby 式,とりわけ require メソッドの呼出しは,マルチスレッドに対して必ずしも安全ではない。 JRubyServlet インスタンスと JRubyCometServlet インスタンスがそれぞれ inner フィールドに保持する InnerServlet インスタンスどうしで排他するために, 共通のオブジェクト COMMON_LOCKsynchronized を行う。

9. ERB サーブレットの互換性

_init.rb ではさらに, 前回 導入した ERB サーブレットの共通基底クラス ErbServlet に下記のように event(ev) メソッドと require(feature) メソッドを追加する。

import javax.servlet.http.HttpServlet # for convenience
EventType = org.apache.catalina.CometEvent::EventType # for convenience
class ErbServlet < HttpServlet
  include ERB::Util
  attr_accessor :inner_servlet

  def doGet(req, res)
    s = expand(req, res); res.writer.print(s)
  end

  def doPost(req, res)
    s = expand(req, res); res.writer.print(s)
  end

  def event(ev)                 # simulate doGet
    req = ev.http_servlet_request
    res = ev.http_servlet_response
    if ev.event_type == EventType::BEGIN
      s = expand(req, res); res.writer.print(s)
      ev.close
    end
  end

  def require(feature)          # overriding the built-in method
    COMMON_LOCK.synchronized {
      super(feature)
    }
  end
end

web.xml の設定により,webapps/test/comet/ の下に *.erb ファイル を置いたときは, JRubyCometServlet 経由で event(ev) が起動される。 そして上記メソッド定義により,この場合でも doGet(req, res) が呼び出されたのと同じように振舞う。 つまり,ERB サーブレットについては,どの場合でも,動作は前回のシステムと互換である。

取得済みサーブレットの表 @servlets が,InnerServlet インスタンスごとに維持されていることを思い出そう。 初期化引数の相違などにより,これをシングルトンにすることは妥当ではない。 しかし,このままでは,前回導入したサーブレット管理サーブレットmanage.erbJRubyServlet 配下のサーブレットだけ管理でき, JRubyCometServlet 配下のサーブレットを管理できないことになる。 ここで上記の互換性が役に立つ。 同じ manage.erb を comet 下にコピーして webapps/test/comet/manage.erb として置き, http://localhost:8080/test/comet/manage.erb としてアクセスすれば,それらのサーブレットを管理できる。

10. comet/Sample.rb

下記のような Sample.rb を webapps/test/WEB-INF/comet/ に置いて, http://localhost:8080/test/comet/Sample.rb をアクセスすると, 右図のように1行ずつ表示される。

class Sample < HttpServlet
  def event(ev)
    req = ev.http_servlet_request
    res = ev.http_servlet_response
    case ev.event_type
    when EventType::BEGIN
      res.content_type = "text/html; charset=UTF-8"
      wf = res.writer
      wf.println("<title>comet sample</title>")
      wf.println("<p>1 から順に数え上げます。")
      wf.flush
      @th = Thread.new {
        i = 0
        loop {
          i += 1
          wf.println("<p>#{i}: #{"*" * i}</p>")
          wf.flush
          sleep(1)
        }
      }
    when EventType::ERROR, EventType::END
      @th.kill
      ev.close
    when EventType::READ
    end
  end
end

これはリクエストに対し,それぞれスレッドを設けているという点で著しく 非模範的 である。 Comet 実装の主要な論点は,多数のリクエストをいかにして少ないスレッドで効率良く処理するかである。 この例はそれを全く考慮していない。 模範的には,周期的に処理を行うスレッドをたった1個 (または少数個) だけ作成し, その (またはそれらの) スレッドがすべてのリクエストの周期的な書き込み処理を受け持つべきである。

11. chat アプリケーション

もう少し大きな例として,簡単な chat アプリケーションを構築する。 この chat アプリケーションは Web ブラウザに対してプラグインも JavaScript もクッキーも要求しない。 ユーザがセキュリティを考慮して Web ブラウザでこれらの機能を無効にしている場合でも正常に動作する。 Mac 上の Camino 1.5.4 と Firefox 2.0.0.11, Windows 上の Firefox 2.0.0.11 と IE5.5, IE6SP2 で動作を確認した。

webapps/test/chat.html として下記を置く。 冒頭の図に示すようにブラウザ画面を上下二つのフレームに分ける。

<title>chat</title>
<frameset rows="*, 150">
  <frame name="display" src="comet/ChatDisplay.rb"> 
  <frame name="input" src="chat-input.erb">
</frameset>

下フレームにあたる webapps/test/chat-input.erb を下記に示す。

<%
require 'chat_module'

if req.method == "POST"
  req.character_encoding = "UTF-8"
  nickname = req.get_parameter("nn") || ""
  text = req.get_parameter("text")
  if text
    log("chat-input.erb: name=%p text=%p" % [nickname, text])
    Chat::write(nickname, text, self)
  end
end

res.content_type = "text/html; charset=UTF-8"
%>
<title>chat - input</title>
<form action="chat-input.erb" method="POST">
  名前:
  <input type="TEXT" name="nn" value="<%=h nickname %>">
  <br>
  <textarea name="text" rows="4" cols="64"></textarea>
  <input type="SUBMIT" value="送信">
</form>

関数 Chat::write(name, text, source_servlet) は 下記の webapps/test/WEB-INF/chat_module.rb で定義されている。 モジュール Chatwrite(name, text, source_servlet) 関数は, register(res) 関数で登録された各 HTTP レスポンス・オブジェクトに書込み内容を出力する。

require 'thread'
require 'cgi'

module Chat
  module_function

  Mutex = Mutex.new
  Observers = {}                # a set of http servlet responses
  @@sequence_No = 0

  def register(res)
    Mutex.synchronize {
      Observers[res] = true
      return Observers.length
    }
  end

  def unregister(res)
    Mutex.synchronize {
      Observers.delete(res) ? Observers.length : nil
    }
  end

  def write(name, text, source_servlet)
    time_stamp = Time.new.strftime("%m/%d %H:%M")
    keys = nil
    current_No = 0
    Mutex.synchronize {
      keys = Observers.keys
      @@sequence_No += 1
      current_No = @@sequence_No
    }
    h_name = CGI.escapeHTML(name)
    h_text = CGI.escapeHTML(text)
    h_text.gsub!("\s", "&nbsp;")
    h_text.gsub!("\n", "<br />")
    keys.each {|res|
      begin
        res.synchronized {
          wf = res.writer
          wf.println("<dt>%4d: %s <small>%s</small></dt>" %
                     [current_No, name, time_stamp])
          wf.println("<dd>")
          wf.println(h_text)
          wf.println("</dd>")
          wf.flush
        }
      rescue => ex
        source_servlet.log("%p at Chat::write for %p" % [ex, res])
        Chat::unregister(res)
      end
    }
  end
end

上フレームにあたる webapps/test/WEB-INF/comet/ChatDisplay.rb を示す。 基本的な構造は前述のサンプル comet/Sample.rb と同じである。 参加者の増減通知は,一般の書込みと同じく Chat::write を使って Tomcat 自身が (天の声として) 書き込む。

require 'chat_module'

class ChatDisplay < HttpServlet
  def event(ev)
    req = ev.http_servlet_request
    res = ev.http_servlet_response
    case ev.event_type
    when EventType::BEGIN
      ev.timeout = 60 * 60 * 1000 # 1 hour
      res.content_type = "text/html; charset=UTF-8"
      wf = res.writer
      wf.println("<title>chat - display</title>")
      wf.println("<dl>")
      wf.println("<dt>説明</dt>")
      wf.println("<dd>現在以降の書込みが表示されます。")
      wf.println("参加者増減時には Tomcat 自身が人数を書き込みます。</dd>")
      n = Chat::register(res)
      voice_of_overlord(n, :INC)
    when EventType::ERROR, EventType::END
      n = Chat::unregister(res) or raise "failed to unregister"
      ev.close
      voice_of_overlord(n, :DEC)
    end
  end

  def voice_of_overlord(n, op)
    if n == 1
      s = "今,参加者はあなた一人です。"
    else
      s = "参加者が #{n} 人に" + 
        case op
        when :INC then "増えました"
        when :DEC then "減りました"
        else "なりました。"
        end
    end
    Chat::write("Tomcat", s, self)
  end
end

このサンプル・アプリケーションも GNU LGPL の下におく。

12. おわりに

表現の簡潔な JRuby を使えば,しばしば煩雑なきらいがある Java ライブラリをより手軽に試すことができる。 ここでは Tomcat 6 に用意された HTTP サーバからのプッシュ技術である 'Comet' の JRuby による試用例を報告した。 ただし,ここでの試用の範囲では JavaScript を使用していない。 (ユーザにとってセキュリティ上の不安があることが難点だが) Web ブラウザのもつ JavaScript を活用すれば,JavaScript によるクライアントサイドの Ajax と Java/JRuby によるサーバサイドの Comet が両輪となって,さらに多彩な非同期通信が可能になる。


次回へ


Copyright (c) 2008 OKI Software Co., Ltd.