dbt-snowflake のテストフレームワークを作った話

はじめに

この記事は、dbt Advent Calendar 2023 22日目の記事です。

こんにちは、不動産テック企業である株式会社 estie でソフトウェアエンジニアをしている Ryosuke839 です。estie ではデータパイプラインを作る仕事をしているのですが、最近はデータパイプラインの MySQL + Python + sqlalchemy から Snowflake + dbt-core への移行を行っています。

さて、Python で書かれたデータパイプラインを dbt へ移行するに当たって、ひとつつまづきポイントがありました。

dbt でのテスト

それは、データ加工のテストをどう書くかという問題です。estie のデータパイプラインでは中間層でデータの正規化、匿名化、名寄せなど様々な加工を行っているのですが、dbt で複雑なデータ加工を行ってしまうユースケースはメジャーではないらしくレコード単位での加工を検証するようなテストはサポートされていません。

dbt でレコード単位でのテストを行うには、次のような方法が考えられます。

  • EqualExperts/dbt-unit-testing を使う
    • 👍 dbt のテスト機能をそのまま使える
    • 👍 dbt のテスト機能をそのまま使うので並列実行できる
    • 👎 テストケースの記述が冗長になる
    • 👎 CTE を用いた singular test なのでテストできるのは model だけで、pre/post_hook や incremental model における merge の挙動はテストできない
  • dbt test + PyTest dbt-core/core/dbt/tests を使う
    • 👍 pre/post_hook や merge を含め何でもテストできる
    • 👎テスト用の project や profile を作る必要がある
    • 👎 テストケースごとに dbt 全体を走らせるため時間がかかる
    • 👎 source の値は csv で設定できるが、model のモックは大変
    • 👎 Snowflake 上のオブジェクトを読み書きするため並列実行できず、テスト後にテストデータも残る
  • テストフレームワークを作ってしまう
    • 入出力を簡潔に記述したい
    • pre/post_hook や merge の挙動もテストしたい
    • Snowflake 上の本物のデータは読み書きしたくない

今回の移行ではなるべく従来の処理を踏襲したため、post_hook で delete 文を発行したり merge を使ったりと、dbt-unit-testing ではカバーされない部分のテストも必要でした。また、PyTest ではテストの実行に大変な時間がかかり、生産性の低下や Snowflake のコスト爆発は避けられないと予想されました。テストを実行するだけで実環境のデータを読み書きするのも好ましくありません。

そこで、以上の問題点を解決したテストフレームワークを作ってしまうことにしました!

実装方針

まず何をテストで実行するかですが、pre/post_hook や merge の挙動もテストしたいため PyTest と同様 dbt を本物の Snowflake に対して動かします。

本物の Snowflake に対しての読み書きは temporary table / temporary view 仮テーブルと一時テーブルの使用 | Snowflake Documentation を使うことで防ぎます。temporary table はセッション内でしか永続しないテーブルで既存の temporary でない table と同じ識別子を持つことができるため、テストのセッション中に限りテーブルをモックする用途に使うことができます。

ここまでの実装とテストの実行は dbt-snowflake のマクロ実装(dbt-snowflake/dbt/include/snowflake/macros/materializations/table.sql のあたり)をオーバーライドしテストケースを --vars で渡し dbt run することでも可能ですが、セッションが dbt run の内部で終わってしまうため結果を取り出すことができません。

そこで、dbt 自体の実装に手を入れてしまいます。幸い dbt-core は Python で実装されているため、何でも unittest.mock.patch で処理をオーバーライドできることを利用します。

実装詳細

dbt run の実装は dbt.task.run.ModelRunner.execute (dbt-core/core/dbt/task/run.py:262-300) に記述されており、pre/post_hook もここで実行されます。その直前で temporary table の作成とデータの挿入を、直後で実行結果の取得を行うことでセッション内でテストを完結させることができます。

セッション内でテストを完結させることで Snowflake 上での状態が独立になり、並列実行できるようにもなります。

+ test_results: Optional[agate.Table] = None
  
+ # dbt run の処理をオーバーライドする
+ # https://github.com/dbt-labs/dbt-core/blob/v1.6.6/core/dbt/task/run.py#L262-L300
  def execute(self: ModelRunner, model: ManifestNode, manifest: Manifest):
      ...
      hook_ctx = self.adapter.pre_model_hook(context_config)
      try:
+         # mock データを用意する
+         is_incremental = self.adapter.execute_macro("is_incremental", context_override=context)
+         for node_id in set(model.depends_on.nodes):
+             node = manifest.nodes[node_id] if node_id in manifest.nodes else manifest.sources[node_id]
+             table_name = f"{node.database}.{node.schema}.{getattr(node.config, 'alias', None) or node.name}"
+             _, columns = self.adapter.execute(f"describe table {table_name}", fetch=True)
+             self.adapter.execute(f"create temporary view {table_name} as select " +
+                                  ", ".join(f'${i + 1}::{c["type"]} as {c["name"]}' for i, c in enumerate(columns)) +
+                                  " from (values " + (values_to_sql_str(mocks[node.unique_id]) +
+                                  ")" if len(mocks[node.unique_id]) > 0 else "(" + ", ".join("null" for _ in columns) + ")) where False"))
+         if is_incremental:
+             # incremental の場合は自分自身にもデータを流し込む
+             ...
          result = MacroGenerator(
              materialization_macro, context, stack=context["context_macro_stack"]
          )()
+         # 実行結果を取得する
+         nonlocal test_result
+         _, test_result = self.adapter.execute(
+             f"select * from {model.database}.{model.schema}.{model.config.alias or model.name}", fetch=True)
      finally:
          self.adapter.post_model_hook(context_config, hook_ctx)
      for relation in self._materialization_relations(result, model):
          self.adapter.cache_added(relation.incorporate(dbt_created=True))
      return self._build_run_model_result(model, context)

作成した関数は unittest.mock.patch を使い既存の関数の代わりに呼び出されるようにします。

def run_dbt_e2e_tests(model: str, mocks: dict[str, list[dict[str, Any]]]) -> Optional[agate.Table]:
    test_result: Optional[agate.Table] = None

    # dbt run の処理をオーバーライドする
    # https://github.com/dbt-labs/dbt-core/blob/v1.6.6/core/dbt/task/run.py#L262-L300
    def execute(self: ModelRunner, model: ManifestNode, manifest: Manifest):
        ...

    # dbt run する
    with patch("dbt.task.run.ModelRunner.execute", execute):
        res = dbtRunner().invoke(["run", "--profiles-dir", "./dpl_dbt/", "--select", model, "--vars", "{'e2e_testing': 1}", "--threads", "8"])

    if res.exception is not None:
        return None
    if res.result.results[0].status == "error" or res.result.results[0].status == "fail":
        return None
    if res.result.results[0].status == "warn":
        return None
    return test_result

あとは run_dbt_e2e_tests を好きな方法で呼び出し、結果をチェックすることでテストが完了します!

実行イメージ

従来は unittest を使ってテストを書いていたので、run_dbt_e2e_tests(をラップしたもの)を呼び出すように変更するだけでプルリクの diff も見やすいスムーズな移行ができました。

unittest をそのまま使える

セッション内で create temporary table から merge まで実行されている

最後に

今回作成したテストフレームワークはゆくゆくは OSS として公開したいと考えています。現時点ではパッケージが分離されていなかったりフレームワーク自体のテストが未整備だったりとまだまだですが、ご期待ください…!

estie では一緒に dbt を書いてくださる方、今回のテストフレームワークを OSS 化してくださる方を探しています(現在のメンバーでも可能ですが手が足りていません)。DWH や dbt についてカジュアルにお話するのでも大歓迎です!

hrmos.co

hrmos.co

hrmos.co

© 2019- estie, inc.