[Elixir]Elixirのライブラリ眺めてたらErlangのhttpcにたどり着いた

今はやりのElectronに触発されて、というわけでもないですが、ElixirでTwitterのStreamを取得できたりするライブラリである、ExTwitterの処理の一部を追ってみました。

その中で、

ここの、以下のような process_stream の関数に含まれる receive のパターンマッチがイマイチ理解できませんでした。というのも、このライブラリの中に send self, {:http, ...} というような応答を生成するところがなく、どこからこの receive にメッセージが来ているのだろう、とよくわからなかったからです。

  def process_stream(processor, request_id, configs, acc \ []) do
    receive do
      {:http, {request_id, :stream_start, headers}} ->
        send processor, {:header, headers}
        process_stream(processor, request_id, configs)

      {:http, {request_id, :stream, part}} ->
        cond do
        ...

結果的に、Erlangの httpc モジュールから来ているのだとわかったのですが、その中で最終的にErlangモジュールまで手を出すようになったので、私の後学の意味も含めてメモを残しておきます。

process_stream の場所

まずは入り口からです。Streamを取得する stream_filter の処理を追うと、以下メソッドに行き着きます。

  def stream_filter(options, timeout \ @default_stream_timeout) do
    {options, configs} = seperate_configs_from_options(options)
    params = ExTwitter.Parser.parse_request_params(options)
    pid = async_request(self, :post, "1.1/statuses/filter.json", params, configs)
    create_stream(pid, timeout)
  end

この分岐を持つ関数 process_stream は以下箇所で初めて呼ばれます。

  defp async_request(processor, method, path, params, configs) do
    oauth = ExTwitter.Config.get_tuples |> ExTwitter.API.Base.verify_params
    consumer = {oauth[:consumer_key], oauth[:consumer_secret], :hmac_sha1}

    spawn(fn ->
      response = ExTwitter.OAuth.request_async(
        method, request_url(path), params, consumer, oauth[:access_token], oauth[:access_token_secret])

      case response do
        {:ok, request_id} ->
          process_stream(processor, request_id, configs)
        {:error, reason} ->
          send processor, {:error, reason}
      end
    end)
  end

spawnとして別プロセスにメッセージを送っている箇所を知る

ここをみると、spawn としてプロセスにメッセージを送っている処理の中で、 ExTwitter.OAuth.request_async をよんでます。

これを少し追うと、以下に行き着きます。

  def request_async(:get, url, params, consumer, access_token, access_token_secret) do
    :oauth.get(url, params, consumer, access_token, access_token_secret, [{:sync, false}, {:stream, :self}])
  end

erlang-oauth に足を踏み入れる

このモジュール :oauth はErlangの erlang-oauth というモジュールだそうです。そこで、少しそのモジュールの get を追ってみます。引数6個なので、以下だとわかります。

get(URL, ExtraParams, Consumer, Token, TokenSecret, HttpcOptions) ->
  SignedParams = sign("GET", URL, ExtraParams, Consumer, Token, TokenSecret),
  http_request(get, {uri(URL, SignedParams), []}, HttpcOptions).

ここでhttp_requestをみると、 httpc:request/4 を呼んでいることがわかります。

http_request(Method, Request, Options) ->
  httpc:request(Method, Request, [{autoredirect, false}], Options).

Erlangのhttpcドキュメントを追う

ここで、Erlangのドキュメントをみてみました。

request(Method, Request, HTTPOptions, Options) -> 

また、 Options に関してはその内容が以下のように書かれていました。

option() = {sync, boolean()} | {stream, stream_to()} | {body_format, body_format()} | {full_result, boolean()} | {headers_as_is, boolean() | {socket_opts, socket_opts()} | {receiver, receiver()}, {ipv6_host_with_brackets, boolean()}}
stream_to() = none | self | {self, once} | filename()

ここで、同じErlangのドキュメントを見てみると、このオプションの stream に関して以下の通り書かれていて、応答が {:http, 内容} という状態であることが確認できます。

Streams the body of a 200 or 206 response to the calling process or to a file. When streaming to the calling process using the option self the following stream messages will be sent to that process: {http, {RequestId, stream_start, Headers}}, {http, {RequestId, stream, BinBodyPart}}, {http, {RequestId, stream_end, Headers}}. When streaming to the calling processes using the option {self, once} the first message will have an additional element e.i. {http, {RequestId, stream_start, Headers, Pid}}, this is the process id that should be used as an argument to http:stream_next/1 to trigger the next message to be sent to the calling process.

Note that it is possible that chunked encoding will add headers so that there are more headers in the stream_end message than in the stream_start. When streaming to a file and the request is asynchronous the message {http, {RequestId, saved_to_file}} will be sent.

Defaults to none.

なるほど。

はじめから順に追ってみる

はじめの頃に疑問だった、

ExTwitter.OAuth.request_async(
        method, request_url(path), params, consumer, oauth[:access_token], oauth[:access_token_secret])

においては、以下が呼ばれ( ExTwitter.OAuth.request_async の中で引数の代入も行った状態)、これは {:stream, :self} を与えている。

:oauth.get(request_url(path), params, consumer, oauth[:access_token], oauth[:access_token_secret], [{:sync, false}, {:stream, :self}])

そのため、返ってくるメッセージはErlangのhttpcのドキュメントより

{:http, 内容}

という感じになる。そのメッセージを process_stream では受け取り、よしなにパターンマッチして処理を継続すると。

Erlangのモジュールまで含んだ、関数の追い方がだいぶ把握できてきた気がします。

なるほど。

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト / 変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト / 変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト / 変更 )

Google+ フォト

Google+ アカウントを使ってコメントしています。 ログアウト / 変更 )

%s と連携中