dbt Python model on Snowflake でユーザ作成モジュールを使う


こんにちは。estie でソフトウェアエンジニアをしている @Ryosuke839 です。
「dbt Python model on Snowflake に型をつける」に続いて再び dbt Python model on Snowflake トピックをご紹介します。

www.estie.jp

dbt Python model でのコードの再利用

dbt model に限らず Python コードを書いていると、共通の処理をまとめたり、まとめたものをモジュールとして分割したりしたくなります。データウェアハウス製品の処理系にプリインストールされたパッケージであれば dbt.config.packages で指定することでインポートが可能です。しかし、ユーザ作成モジュールの場合は一筋縄ではいかず、たとえば dbt v1.8 の公式ドキュメントにはあるモデルで定義された関数は他のモデルで再利用はできないという記述があります。

Code reuse​

Currently, Python functions defined in one dbt model can't be imported and reused in other models.

ただし、これは dbt のモデルを再利用できないと述べているだけであり、統一的な仕組みは提供されていないものの各データウェアハウス製品に依存した形であればユーザ作成モジュールでも利用が可能です。

dbt Python model on Snowflake でのユーザ作成モジュールの利用

dbt Python model は Snowflake では stored procedures として動きます。そして、Snowflake の公式ドキュメントには以下のようなステップで Python モジュールを stored procedures から利用できると記されています。

次のステップに従って、依存関係を関数またはプロシージャで使用できるようにします。

  1. ハンドラーが使用できる ステージを選択または作成します
  2. ステージに 依存関係をアップロードします
  3. 関数またはプロシージャを作成するときに、 IMPORTS で 依存関係を参照します

このうち、1 と 2 は自前で行う必要があり、3 は dbt.config.imports を通じて設定できます。

また、1 と 2 については一時 stage を用いる方法と通常の stage を用いる方法があり、一長一短あります。

1. ステージの準備と 2. アップロード

一時 stage を用いたアップロード

一時 stage は、一時 stage を作成したセッションの間でだけ有効な stage です。dbt はモデルごとに異なるセッションを用いるため、モデルごとにアップロードを行う必要があります。

モデルのpre-hook で以下のような SQL コマンドを呼び出すことでアップロードができます。

- name: some_model
  config:
    pre-hook:
      - create temporary stage python_imports
      - put file://{{ env_var('PWD') }}/imports/*.py @python_imports auto_compress=false

ここで、put コマンドはアップロード対象を絶対パスで指定する必要があることに注意が必要です。ここで使っている PWD は一般的なシェルでは自動的に設定されていますが、Docker コンテナ内で直接起動するなどシェルを経由せずに dbt を呼び出す際には自前で設定しておく必要があります。

なお、Python ファイル内に dbt.config.pre_hook を記述することもできますが、テンプレートの展開は行われないため絶対パスを直接記述する必要があります。

一時 stage を用いる方法には以下の利点と欠点があります。

利点

  • CREATE STAGE 権限を持っていない role でも実行できる
  • モデルに必要なファイルだけを選択してアップロードすることもできる
  • 実行後に stage が残らない
  • 他のセッションと競合することがない

欠点

  • モデルごとに毎回アップロードする必要がある

特にこだわりがなければこちらの一時 stage を使うとよいでしょう。

通常の stage を用いたアップロード

通常の stage はセッション間で永続するため、モデルごとにアップロードを行う必要がありません。そのため、dbt_project.ymlon-run-start からのアップロードを一度実行するだけで済みます。

on-run-start:
  - create stage if not exists python_imports
  - put file://{{ env_var('PWD') }}/imports/*.py @python_imports auto_compress=false overwrite=true

ここでは上記と同様に PWD の扱いに注意が必要なほか、前回実行時の stage が残っていることがあるので create or replace stage を行うか putoverwrite=true を指定する必要があります。

通常の stage を用いる方法には一時 stage とは逆の以下の利点と欠点があります。

利点

  • アップロードは dbt の実行に対し 1 度で済む

欠点

  • CREATE STAGE 権限を持った role が必要になる
  • 実行対象に関わらず全てのモデルを実行するのに十分なファイルをアップロードすることになる
  • 明示的に drop しない限り、実行後にも stage が残ってしまう
  • 他のセッションから stage を上書きしてしまえる(そもそも dbt の同時実行は安全ではありませんが)

3. インポート

stage へのアップロードができれば、あとは 3 の stored procedures からインポートするだけです。

dbt の公式ドキュメントには記述が無いようですが、dbt.config.imports で stored procedures でインポートするファイルを指定できます。これは、create procedure コマンドの imports 句に変換されます。

インポートしたファイルはパスの通った場所にあるモジュールとして import することができます。

from models import Building
from normalization import normalize_name

dbt.config(
    python_version="3.11",
    packages=["pydantic==2.5.3", "requests==2.31.0"],
    imports=[
        "@python_imports/base.py",
        "@python_imports/models.py",
        "@python_imports/normalization.py",
    ],
)

これで dbt Python model からユーザ作成モジュールを利用できるようになりました。よい dbt ライフをお過ごしください!

ちなみに、同様の方法で Snowflake にプリインストールされていないパッケージであっても利用ができます。ただし、ネイティブコードは含められないなどの制限があるのでご注意ください。

最後に

estie では、プロダクト開発から基盤データの整備まで幅広い領域で人材を募集しています。

データ領域では今回紹介したように dbt でデータパイプラインを開発しており、興味のある方がいらっしゃいましたらぜひバックエンドエンジニア(データ)にお申し込みください。そうでない方でもまずはカジュアル面談からお話しましょう。

hrmos.co

hrmos.co

© 2019- estie, inc.