Programming Phoenix勉強その15

Programming Phoenix勉強その15

その15です。ここからChapter10の Channel です。Phoenixの目玉機能の一つな気もするので楽しみです。

Channelについて

  • ステートを持つ双方向通信である

    • ステートフルなので Cookie などを意識しなくて良い

  • トピックと呼ばれる単位で各会話は管理される

  • 各々の会話はプロセスで管理され、一つがバグっても他に影響を与えないし、並列性も持つ

  • クライアント側はES6(ES2015)で記述する

  • 実装するにあたりクライアントとサーバーで以下3つを意識する

    • 接続と切断

    • メッセージの送信

    • メッセージの受信

クライアントサイドの実装

というわけで、ES6でクライアントサイドから実装していきます。まず video.js を作成します。

import Player from "./player"

let Video = {
    init(socket, element) {
        if (!element) { return; }
        let playerId = element.getAttribute("data-player-id");
        let videoId = element.getAttribute("data-id");
        socket.connect()
        Player.init(element.id, playerId, () => {
            this.onReady(videoId, socket);
        });
    },

    onReady(videoId, socket) {
        let msgContainer = document.getElementById("msg-container");
        let msgInput = document.getElementById("msg-input");
        let postButton = document.getElementById("msg-submit");
        // トピックの識別
        let voidChannel = socket.channel("videos:" + videoId);
        // TODO: join the vidChannel
    }
}
export default Video;

playerimport をこっちに移設しています。また、 init メソッドと onReady メソッドを定義しています。 onReady はコールバックとして使っているようです。 コメントにあるようにトピックの識別子は videoId としています。

app.js を上の実装に合わせて変えておきます。 Player を作成していた部分に変わって Video の利用にします。

import socket from "./socket";
import Video from "./video";
Video.init(socket, document.getElementById("video"));

デフォルトで用意されている socket.js のインポートも行っています。 このファイルについては後で触るようです。

通常のリクエストと socket のデータの流れの違いについても触れられています。 前の章で見たように通常のアクセスではデータは conn という形で各パイプラインを流れて、 その中で変換されていきます。 conn は新しい接続ごとに新しいものが作られて使われます。

一方 socket の方ではステートフルなためソケットの寿命まで一つの接続が変換され続けます。

socket.jsの変更

最初のソケットを作成します。 socket.js の中身を変更して実装していきます。

import { Socket } from "phoenix"

let socket = new Socket("/socket", {
    params: { token: window.userToken },
    // バッククオートで囲んだものがテンプレートリテラルとして値を文字に埋め込める
    logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data); }
});

export default socket

余計な部分を消してしまって問題ないです。ログをコンソールに出すように変更しただけです。

Phoenix 側でのソケットのエンドポイントは endpoint.ex に記述されています。

socket "/socket", Rumbl.UserSocket

サーバーサイドの実装

Rumbl.UserSocket がエントリポイントになっていることがわかったので中身を見てみます。 channel/user_socket.ex です。

defmodule Rumbl.UserSocket do
  use Phoenix.Socket

  transport :websocket, Phoenix.Transports.WebSocket
  # transport :longpoll, Phoenix.Transports.LongPoll

  def connect(_params, socket) do
    {:ok, socket}
  end

  def id(_socket), do: nil
end

余計なコメントは消してます。

  • transport のところをコメントと合わせて見るとわかるように、二種類サポートされているようです。

    通常の websocketlongpoll のロングポーリングです。 これは接続方法が抽象化され、他の部分の処理は同じで良いということです。

  • connect/2 関数はユーザの接続制御に用いられる。現在は全てのユーザが接続可能。認証は後で追加するらしい。

  • id/1 関数はソケットの識別を行っています。 nil なので全ユーザが匿名です。

実際に実装していきます。まず user_socket.ex に以下を追加します。

## Channels
channel "videos:*", Rumbl.VideoChannel

Phoenix ではトピックはリソース名( :videos とか)でサブトピックは付随するIDになるようです。

上記に書いた通り、 VideoChannel にディスパッチしているのでこれを実装していきます。 channels/video_channel.ex を実装します。

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel

  def join("videos:" <> video_id, _params, socket) do
    {:ok, assign(socket, :video_id, String.to_integer(video_id))}
  end
end

join/3 コールバック関数を作りました。(コールバックという呼び方はOTPに習っているようです。)

引数に与えられている socket は接続されている間状態を保持します。 なので、 assign などでデータを追加するとそれもずっと保持されて参照可能です。

クライアント側でも join 出来るようにします。 video.js を変更します。

onReady(videoId, socket) {
      let msgContainer = document.getElementById("msg-container");
      let msgInput = document.getElementById("msg-input");
      let postButton = document.getElementById("msg-submit");
      // トピックの識別
      let vidChannel = socket.channel("videos:" + videoId);
      // チャンネルへのjoin receiveで帰ってきたものを受け取る(OTPっぽい)
      vidChannel.join()
          .receive("ok", resp => console.log("joined the video channel", resp))
          .receive("error", reason => console.log("join failed", reason));
  }

抜粋しました。クライアントサイドでサーバーサイドの関数呼んでるような見た目です。 また、 receive はOTPでよく出てくるメッセージを受信するやつと同じっぽい感じで使っているみたいです。

次に、試しに5秒毎にクライアントに通知を投げる処理を追加してみます。 video_channel.ex を以下のように実装します。

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel

  def join("videos:" <> video_id, _params, socket) do
    # 5秒ごとにクライアントにメッセージを送る
    # send_interval/2関数は最終的にはsend_interval(Time, self(), Message)という形で呼び出される
    :timer.send_interval(5_000, :ping)
    # socket.assignsにvideo_idを保存
    {:ok, assign(socket, :video_id, String.to_integer(video_id))}
  end

  # OTPのコールバックhandle_castやhandle_callの仲間
  # castやcallで処理される以外のメッセージを処理するらしい
  def handle_info(:ping, socket) do
    count = socket.assigns[:count] || 1
    push socket, "ping", %{count: count}

    {:noreply, assign(socket, :count, count + 1)}
  end
end

コメントに書いてあるように、 join されると5秒ごとに自分自身にメッセージを投げて handle_info コールバックで受け取っています。 handle_info では socket に追加された count をインクリメントしていっているだけです。 push されるとクライアント側に通知が行くようです。

リアルタイムアノテーションの実装

基本的なところはわかったので動画にリアルタイムコメントを付けられるようにします。 ちなみに ここ によるとYouTubeの動画へのコメントとかをアノテーションって呼ぶときもあるらしいですよ。

video.js を変更します。

onReady(videoId, socket) {
        let msgContainer = document.getElementById("msg-container");
        let msgInput = document.getElementById("msg-input");
        let postButton = document.getElementById("msg-submit");
        // トピックの識別
        let vidChannel = socket.channel("videos:" + videoId);

        postButton.addEventListener("click", e => {
            let payload = { body: msgInput.value, at: Player.getCurrentTime() };
            vidChannel.push("new_annotation", payload)
                .receive("error", e => console.log(e));
            msgInput.value = "";
        });

        // サーバーからのプッシュイベントを受け取るイベントハンドラを設定
        vidChannel.on("new_annotation", (resp) => {
            this.renderAnnotation(msgContainer, resp);
        });

        // チャンネルへのjoin receiveで帰ってきたものを受け取る(OTPっぽい)
        vidChannel.join()
            .receive("ok", resp => console.log("joined the video channel", resp))
            .receive("error", reason => console.log("join failed", reason));
    },

    esc(str) {
        let div = document.createElement("div");
        div.appendChild(document.createTextNode(str));
        return div.innerHTML;
    },

    renderAnnotation(msgContainer, { user, body, at }) {
        let template = document.createElement("div");

        template.innerHTML = `
        <a href="#" data-seek="${this.esc(at)}">
            <b>${this.esc(user.username)}</b>: ${this.esc(body)}
        </a>
        `;

        msgContainer.appendChild(template);
        msgContainer.scrollTop = msgContainer.scrollHeight;
    }
}

サーバーからのプッシュイベントを受け取る用に設定したのと、受け取った物をレンダリングする関数を作成しました。 また、 esc 関数でXSS対策を行っています。

count のやり取りからコメントのやり取りに変更したのでサーバー側も合わせて変更します。

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel

  def join("videos:" <> video_id, _params, socket) do
    {:ok, socket}
  end

  # クライアントから直接送信された時に受け取るコールバック
  def handle_in("new_annotation", params, socket) do
    # 接続しているクライアント全てにブロードキャストする
    # ユーザが任意のメッセージを送れないようにparamsを分解する
    broadcast! socket, "new_annotation", %{
      user: %{username: "anon"},
      body: params["body"],
      at: params["at"]
    }

    {:reply, :ok, socket}
  end
end

join 関数をもとに戻したのと handle_in/3 関数を新たに追加しました。 handle_in では Map.put とかでメッセージを作っていないのはセキュリティ対策のようです。 メッセージはユーザから任意で入力されるので params をバラして好き勝手入れられない様にしています。

認証の追加

誰が送ったメッセージか知りたいので認証を行います。 普通のアプリケーションはセッションでの認証が主ですが、 websocket では接続が長く続くため、 トークン認証で行います。まずテンプレートにトークンを埋め込みます。

...
</div> <!-- /container -->
<!-- websocket用ユーザトークンの埋め込み Rumbl.Authでの認証が通っていることが条件 -->
<script>window.userToken = "<%= assigns[:user_token] %>"</script>
<script src="<%= static_path(@conn, "/js/app.js") %>"></script>
...

user_tokenassign するように auth.ex を変更します。

defmodule Rumbl.Auth do
  ...
  def call(conn, repo) do
    user_id = get_session(conn, :user_id)
    cond do
      user = conn.assigns[:current_user] ->
        put_current_user(conn, user) # 変更
      user = user_id && repo.get(Rumbl.User, user_id) ->
        put_current_user(conn, user) # 変更
      true ->
        assign(conn, :current_user, nil)
    end
  end

  def login(conn, user) do
    conn
    |> put_current_user(user) # 変更
    |> put_session(:user_id, user.id)
    |> configure_session(renew: true) 
  end
  ...
  # 追加
  defp put_current_user(conn, user) do
    # 第二引数はsalt
    token = Phoenix.Token.sign(conn, "user socket", user.id)

    conn
    |> assign(:current_user, user)
    |> assign(:user_token, token) # トークンを突っ込んでapp.html.eexより使う
  end
end

特に不思議なところはなくて、 Phoenix.Token.sign を使ってトークンを作っているだけです。

user_socket.ex を変更してセッションに割り当てられた :user_token から user_id を判別し、 socket に割り当てるようにします。

...
  # 2週間有効
  @max_age 2 * 7 * 24 * 60 * 60

  def connect(%{"token" => token}, socket) do
    # 第二引数はsalt
    case Phoenix.Token.verify(socket, "user socket", token, max_age: @max_age) do
      {:ok, user_id} ->
        {:ok, assign(socket, :user_id, user_id)}
      {:error, _reason} ->
        :error
    end
  end

  def connect(_params, _socket), do: :error

  def id(socket), do: "user_socket:#{socket.assigns.user_id}"
end

これも余り不思議なところはなくて、 Phoenix.Token.verify を使ってトークンから user_id を取っているだけです。 これでログインしていなければコメントが投稿できなくなりました。

まとめ

  • Channel はサーバーとクライアントの双方向リアルタイム通信を行う。

  • Channel はOTPの上に成り立っていて、コールバック関数などもそれに従っている。

  • Phoenix には最初からクライアント側の weboscket 用ライブラリも用意されている。

  • 接続が長期間続くため、認証はトークンを利用して行う。

websocket その1でした。今まで余りやったことがないことをしている感があって面白いです。 次は投稿されたコメントの永続化からです。