dbt-snowflakeにおけるPython modelの保守性を上げる実装パターン

こんにちは。Data Management Group(以下、DMG)の宮崎です。主にデータパイプライン開発やWebサーバーサイド開発に携わりつつ、CIやデータ基盤の改善にも取り組んでいます。

DMGではデータパイプラインにdbtとSnowflakeを採用しています。また、Pythonの経験を持つメンバーが多いこともあり、dbt Python model(以下、Python model)を積極的に活用しています。このようなdbtデータパイプラインを開発する中で、Python modelの保守性・信頼性の課題も見えてきました。そこで、Python modelを含むdbtデータパイプラインを、継続的に改善できるソフトウェアとして扱うための設計・運用上の工夫をまとめたものが本稿です。特に、レビュー可能性・テスト容易性・再現性といった観点で、Python modelを実用的に運用することに焦点を当てます。

前提

以下の記述はすべて「バックエンドにSnowflakeを使用したdbt」を前提としています。

Python model運用で見えた課題

dbtにおいてPython modelを導入する動機としては、多くの場合「SQLで表現しづらい複雑さを扱う」という点かと思います。ところが、Python modelはその複雑さの上昇に対し、入出力の検証やテスト機構など信頼性の担保手段がSQL modelより限定されます。

課題1: dbt modelの可読性低下

dbt modelは原則として1モデルにつき1ファイルです。一方で、必然性がないまま1ファイルに肥大化したコードは認知負荷を高めます。

課題2: テスト機構の制約

dbtにはUnit testsという仕組みがあります。

docs.getdbt.com

これによりモデル単位での静的な入出力の検証が可能ですが、dbt-core v1.11時点ではPython modelで使用できません。少なくともdbtの機能としては、Python modelのテスト機構はSQL modelのそれよりも制約が大きいです。

課題3: dbt外の機構での検証困難性

dbtの機能でテストできなくても、pytestなどPython側の仕組みでテストすれば良いのでは、と考えたくなります。しかし、ここで障壁になるのがPython modelがパッケージ化されていない単体ファイルである点で、テスト側のimportが通るようにsys.pathの操作や実行ディレクトリの固定が必要になります。

結果として、dbt modelのディレクトリ構成とテストコードのimportが強く結合し、テストのための保守コストが増加しやすいです。加えて、フラットなディレクトリ構成に寄せざるを得ず、構造化の自由度も下がります。

解決方針

これらの課題は単に書き方の問題に留まらず、「変更が心理的に重くなる」「レビューが重くなる」といった形で開発速度と運用品質の両方に悪影響を及ぼします。そこで、Pythonロジックをパッケージとして扱えるようにしテストと構造化を標準化することで、変更容易性と信頼性を同時に引き上げる方針を取ります。具体的にはuv workspacesを導入することで解消を図ります。なお、uv workspacesでなければならないというわけではなく、Python部分を「普通のPython」にして、dbtはそれを参照・実行するのみに寄せるというのがコアとなる考えです。

ソリューションの全体像は以下の通りです。

  1. uv workspace memberとしてPythonロジックをパッケージ化。
  2. パッケージを利用したPython modelを実装。
  3. uv workspace memberをzip化するスクリプトを用意。
  4. 「zip化 + SnowflakeステージへのPUT + dbt実行」を一つのジョブとして実行する。

それぞれ詳しく見ていきます。

1. uv workspace memberとしてPythonロジックをパッケージ化

uv workspacesはuvで複数パッケージを管理する機能です。

docs.astral.sh

この機能の詳細は本稿の範囲外とし、要点のみを述べると、DMGのプロジェクトで新規にworkspace memberを追加するフローは以下の通りです。

  1. パッケージ作成
uv init --lib {package-path}
  1. ruffのisort設定を追加(pyproject.tomlの例)
[tool.ruff.lint.isort]
known-first-party = [
  "{package-name}",
  ...
]
  1. パッケージの中身を実装しpytestでテスト実装

パッケージの分割単位については、ビジネスドメインごとに区切りつつ、ビジネスドメインに関わらず使用する横断的に使う共通パッケージを設ける形にしています。

2. パッケージを利用したPython modelを実装

uv workspace memberを利用するPython modelの最小構成イメージは以下の通りです。

from snowflake import snowpark as sp

from my_package import my_module


def model(dbt, session: sp.Session):
    dbt.config(
        ...,
        imports=[
            # zipをSnowflakeステージに配置するのでそのパスを指定
            "@my_stage/my_package.zip",
        ],
        ...,
    )

    # package内の関数を利用
    result = my_module.my_function(session)

    ...
3. uv workspace memberをzip化するスクリプトを用意

uv init --libした際にpyproject.tomlにworkspace memberのパスが追加されているので、それを参照してzip化するスクリプトを用意します。標準ライブラリのtomllibでpyproject.tomlをパースし、同様に標準ライブラリのzipfileを使ってmemberのパス以下をzip化するのみなので実装コストは小さいです。

dbtで使用しないworkspace memberを実装する可能性があるので、zip化対象から除外できる設定を設けておくと扱いやすくなります。我々のプロジェクトではカスタム項目をpyproject.tomlに追加しています。

[tool.dbt.zip]
exclude = [
  "{package-name}",
  ...
]

DMGのプロジェクトで実際に除外しているパッケージとしてはflake8 pluginなどが該当します。(余談ですが、Python modelに特化したlintルールをプロジェクト独自のflake8 pluginとして実装しています)

4. 「zip化 + SnowflakeステージへのPUT + dbt実行」を一つのジョブとして実行する

DMGが管理するdbtパイプラインはAmazon ECSで実行しており、実際の流れは以下の通りです。

  • Docker image build時にworkspace memberをzip化
  • ECS Taskでdbt model実行
    • dbtのpre-hookで一時ステージを作成しzipをPUT

なお、開発マシンでdbtを実行する場合は、zip化からdbt実行までをラップしたコマンドをメンバー各位が使用するタスクランナーで用意する運用にしています。

pre-hookで一時ステージを作成しzipをPUTと書きましたが、zipの管理方法は他にも選択肢があり、大きく3つの方法が考えられます。選定の主な観点は、ライフサイクル管理の容易さ、実行時オーバーヘッド、同時実行時の競合耐性です。

方法 Pros Cons
1. pre-hookで一時ステージ作成 + PUT ライフサイクル管理が最も単純
(セッションスコープで完結)
modelごとにCREATE STAGE/PUTが走るためオーバーヘッド増
reuse_connections=True だと同一セッション再利用により競合の可能性
2. 事前作成ステージへ実行ごとに一括PUT パイプラインジョブでは
CREATE STAGE不要でオーバーヘッドが最小
同時実行で競合しやすい
削除しないと未使用zipが蓄積するリスク有り
削除処理を入れても失敗で削除漏れが起こり得る
3. 実行ごとにユニークステージ作成 + 一括PUT 実行単位で分離でき、同時実行に強い ステージ削除失敗時の残骸増加

我々が1を採用しているのは、ライフサイクル管理が単純なうえ、reuse_connections=Falseを前提にすれば競合リスクを十分に抑えられるためです。一時ステージにアクセスできるのは作成したセッションに限定されるため、1はライフサイクル管理の観点では最も単純です。ただし、dbt-snowflakeの設定でreuse_connections=Trueとなっているとdbt modelをまたいで同一セッションが使われるため、同一ステージにアクセスして競合する可能性があります。これを避けるため、我々はreuse_connections=Falseに設定しています。実行時オーバーヘッドは「dbt modelごとにCREATE STAGEが実行される」、「同一パッケージがそれを利用するmodel数だけPUTされる」、「reuse_connections=Falseによりdbt modelごとに接続が作成される」の3つが重なり高くなります。我々のパイプラインでは、このオーバーヘッドがdbt model自体の実行時間と比較し小さかったため許容できました。

2はステージが共有である以上、同時実行時の競合可能性が構造的に残ります。また、削除処理を実装しない場合は未使用zipが蓄積する可能性があり、削除処理を実装する場合もジョブ中断・失敗等で削除漏れが起こり得るため、残骸を許容するか回収するかの設計が必要です。

3は競合耐性が最も高い一方、実行ごとに作成されるステージの削除を担保できるか、あるいは削除漏れを許容できるか、がポイントになります。


ここまで記載した方法により、Python modelの可読性とテスト機構の制約を緩和し、変更に対するレビュー可能性・テスト容易性を底上げできます。結果として、品質を維持したまま複雑なビジネス要件に追従するための実装・運用コストを抑えやすくなります。

一方で、Pythonを自由に書ける状態は、dbtのメンタルモデルからの逸脱や、Snowflakeの性能特性を十分に活かせない実装につながるリスクもあります。そこでDMGでは、uv workspacesによって「普通のPython」としてテスト・構造化を標準化しつつ、Python側の実装は可能な限りUDF/UDTFとして閉じ込め、dbtは参照・実行に寄せる方針をスタンダードとしています。

パッケージ内でUDF/UDTFを実装する例は以下の通りです。

# my_package/user_functions.py

import pandas as pd

def my_udf(arg1: int, arg2: str) -> str:
    ...

class MyVectorizedUDTF:
    def __init__(self) -> None:
        ...

    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        ...

    process._sf_vectorized_input = pd.DataFrame

Python modelで上記UDF/UDTFを使用するにはsnowpark.Sessionのregisterメソッドを使います。

from snowflake import snowpark as sp

from my_package.user_functions import my_udf, MyVectorizedUDTF

def model(dbt, session: sp.Session):
    udf = session.udf.register(
        my_udf,
        ...,
        imports=["@my_stage/my_package.zip"],
    )
    udtf = session.udtf.register(
        MyVectorizedUDTF,
        ...,
        imports=["@my_stage/my_package.zip"],
    )

    _ = dbt.ref("my_table").select(udf(...))
    _ = dbt.ref("my_table").select(udtf(...))

    ...

解消しない課題

本稿の要旨からは外れますが、上記ソリューションを導入した上でdbtとPythonの組み合わせに残る課題についても触れておきます。

サードパーティパッケージの依存関係管理

サードパーティパッケージにはここまで触れていませんでしたが、model configのpackagesに依存関係を指定することで利用可能です。

models:
  - name: my_model
    config:
      packages:
        - some_package==1.2.3

Python modelファイルで指定する例

def model(dbt, session):
    dbt.config(
        packages=["some_package==1.2.3"],
    )
    ...

しかし、これをuv.lockなどのlockファイルと同期させる仕組みは自前で用意する必要があります。

DMGにおいては現時点で未解決の課題です。解消方法の案としては、model yaml内ではvarかenv_varでサードパーティパッケージのバージョンを指定し、dbt model実行前にuv.lockからバージョンを抽出してvarもしくはenv_varにセットする、などでしょうか。

ただし、バージョン指定が同期できたとしても、uv.lockに記録されているsdistやwheelと一致しているのかは検証されません。つまり、ファーストパーティコードはGit commitから再現できますが、サードパーティコードは厳密な再現性を担保できていません。これを解消するためには、ファーストパーティコードだけでなくサードパーティコードもzipで、としたいところですが、Pure Pythonでないパッケージのビルドを考慮するとかなりの労力を要します。

JVMエコシステムであればfat JARをビルドするという一般的なアプローチがあり、比較すると論点が増えると考えられます。(fat JARはfat JARで運用しづらいという声もあると思いますが)

SQL modelとの連携

Python modelではviewやdynamic tableなどのmaterializationはサポートされません。また、同じUDFを複数のdbt modelで使用する場合、すべてをPython modelにするよりPython UDFを一度だけ生成しdbt modelはSQL modelで実装する方が効率的です。要するに、Python UDFをdbt resourceとして管理する機能があると嬉しいです。

これはdbtコミュニティでは長らく求められていた機能で、少なくとも2016年にはissueが立っていました。

github.com

そんなUDFサポートですが、dbt-core v1.11でリリースされ、dbt-snowflakeもスカラー関数および集約関数のみと部分的ながらPython UDFのmaterializationをサポートしました。これにより基本的なユースケースはサポートされたと言えますが、様々な機能をサポートするにはもうしばらく時間がかかりそうです。また、UDTFなど本稿執筆時点では計画に見られない機能もあります。

dbt-core UDFサポートのEPIC issue

github.com

dbt-adaptersのUDFsラベルがついたissue一覧

github.com

一方、DMGではcustom materializationを用いてPython UDF/UDTFをdbt resourceとして管理することを可能にしています。このcustom materializationではUDF/UDTFのコードがSnowflakeステージに配置されることを前提としており、entrypointはzip化されたPythonモジュールのパスを指定するようにしています。公式の機能とは異なり、zip化を前提にすることでPythonファイルのパースを不要とし、Snowflake UDF/UDTFのより多くの機能をサポートしています。

ただし、UDF/UDTFのライフサイクル上pre-hookが使えないので、「zip化 + SnowflakeステージへのPUT + dbt実行」を一つのジョブとして実行するで述べた、pre-hookを用いることで回避していたzip不整合のリスクが再発生しています。このリスクに対しては、ステージパスをパッケージごとに固定ではなく任意に指定可能とすることで、意図しないzip上書きのリスクを低減させています。これは根本解決ではありませんが、そのコストが高いことを踏まえ運用上の許容範囲に収めるための対応です。

最後に

DMGではパイプラインによってデータを生み出すだけに留まらず、パイプラインのテスト戦略、CI、運用設計、性能特性を踏まえた実装標準化まで含めて扱います。堅牢なデータ基盤を育てるにあたって改善の余地と裁量が大きい環境です。

このようなチームで、データ基盤の信頼性や保守性、拡張性を高めることに関心のある方を募集しています。気になった方はぜひカジュアルにお話ししましょう。

hrmos.co

© 2019- estie, inc.