[Elixir]Ecto2.0-beta2のownership周りのコードを追う

Ectoが2.0(beta)になって、concurrent acceptance testingができるようになったのでいくつか調べてみた。

参考にした元記事は以下。

Concurrent Acceptance Testing in Elixir

この中で、Ectoのownershipのところが気になったのでいくつかコードを追ってみました。

Ownweshipを取得する時、 Ecto.Adapters.SQL.Sandbox.checkout/2 が呼ばれます。
これはEcto2.0で必要になったところですね。

def checkout(repo, opts \\ []) do
  {name, opts} =
    if Keyword.get(opts, :sandbox, true) do
      proxy_pool(repo)
    else
      repo.__pool__
    end

  DBConnection.Ownership.ownership_checkout(name, opts)
end

ここの中で、もしsandboxであれば

  defp proxy_pool(repo) do
    {name, opts} = repo.__pool__
    {pool, opts} = Keyword.pop(opts, :ownership_pool, DBConnection.Poolboy)
    {name, [repo: repo, sandbox_pool: pool, ownership_pool: Pool] ++ opts}
  end

にあるように、ownership_poolのキーを持つDBConnectionのプロセスをとってきます。

これは、以下のdb_connectionのコードを呼びます。

@spec ownership_checkout(GenServer.server, Keyword.t) ::
  :ok | {:already, :owner | :allowed} | :error | no_return
def ownership_checkout(manager, opts) do
  case Manager.checkout(manager, opts) do
    {:init, owner} -> Owner.init(owner, opts)
    {:already, _} = already -> already
  end
end

この中で case されるのは以下。

@spec checkout(GenServer.server, Keyword.t) ::
  {:init, pid} | {:already, :owner | :allowed}
def checkout(manager, opts) do
  timeout = Keyword.get(opts, :pool_timeout, @timeout)
  GenServer.call(manager, {:checkout, opts}, timeout)
end

この DBConnection.Ownership.Manager はGenServerになってて、その自身に対して call します。
:checkoutcall されるところを探すと、以下がみつかります。

def handle_call({:checkout, opts}, {caller, _}, %{checkouts: checkouts} = state) do
  if kind = already_checked_out(checkouts, caller) do
    {:reply, {:already, kind}, state}
  else
    {owner, state} = checkout(state, caller, opts)
    {:reply, {:init, owner}, state}
  end
end

ここで、DBに対して処理を行うプロセスがcheckoutされていない場合、以下のprivateメソッドが呼ばれます。

defp checkout(state, caller, opts) do
  %{pool: pool, owner_sup: owner_sup, checkouts: checkouts, owners: owners,
    ets: ets} = state
  {:ok, owner} = OwnerSupervisor.start_owner(owner_sup, caller, pool, opts)
  ref = Process.monitor(owner)
  checkouts = Map.put(checkouts, caller, {:owner, ref, owner})
  owners = Map.put(owners, ref, {owner, caller, []})
  ets && :ets.insert(ets, {caller, owner})
  {owner, %{state | checkouts: checkouts, owners: owners}}
end

こう見ると、この段階でetsに対してcheckoutした、ということを保存するのですね。なるほど。

もう1つ。以下の通り allow されることがconcurrentlyにテストを実行する上では必要です。
これは、以下の通りドキュメントに書かれています。

https://hexdocs.pm/ecto/2.0.0-beta.1/Ecto.Adapters.SQL.Sandbox.html

Summing up
– Using allowances – requires explicit allowances via allow/3. Tests may run concurrently.
– Using shared mode – does not require explicit allowances. Tests cannot run concurrently.

このページにおいて、 allowances のところがさらには詳細を言及しています。
ということで、ここを追ってみます。

def allow(repo, owner, allow, _opts \\ []) do
  {name, opts} = repo.__pool__
  DBConnection.Ownership.ownership_allow(name, owner, allow, opts)
end

DBConnection.Ownership.ownership_allow/4 を見ると…

defdelegate ownership_allow(manager, owner, allow, opts), to: Manager, as: :allow

とデリゲートされて、以下に行き着きます。

@spec allow(GenServer.server, parent :: pid, allow :: pid, Keyword.t) ::
  :ok | {:already, :owner | :allowed} | :not_found
def allow(manager, parent, allow, opts) do
  timeout = Keyword.get(opts, :pool_timeout, @timeout)
  GenServer.call(manager, {:allow, parent, allow}, timeout)
end

この中は、GenServerに :allow のtupleを渡します。

def handle_call({:allow, caller, allow}, _from, %{checkouts: checkouts} = state) do
  if kind = already_checked_out(checkouts, allow) do
    {:reply, {:already, kind}, state}
  else
    case Map.get(checkouts, caller, :not_found) do
      {:owner, ref, owner} ->
        {:reply, :ok, owner_allow(state, allow, ref, owner)}
      {:allowed, ref, owner} ->
        {:reply, :ok, owner_allow(state, allow, ref, owner)}
      :not_found ->
        {:reply, :not_found, state}
    end
  end
end

ここを見ると、まだcheckoutされていないDBコネクションのプロセスであれば、以下

defp owner_allow(%{ets: ets} = state, allow, ref, owner) do
  state = put_in(state.checkouts[allow], {:allowed, ref, owner})
  state = update_in(state.owners[ref], fn {owner, caller, allowed} ->
    {owner, caller, [allow|List.delete(allowed, allow)]}
  end)
  ets && :ets.insert(ets, {allow, owner})
  state
end

ここを見ると、最終的にetsにペアとなるprocess idが保存されます。

このように、ownerとなるプロセスと、実際にDBに処理を渡すプロセスが管理され、各々のDBコネクションをうまいこと回してconcurrentなテストを実施するようにしているのですね。

学び。

広告

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト / 変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト / 変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト / 変更 )

Google+ フォト

Google+ アカウントを使ってコメントしています。 ログアウト / 変更 )

%s と連携中