こんにちは。estie でソフトウェアエンジニアをしている @Ryosuke839 です。
「dbt Python model on Snowflake に型をつける」に続いて再び dbt Python model on Snowflake トピックをご紹介します。
dbt Python model でのコードの再利用
dbt model に限らず Python コードを書いていると、共通の処理をまとめたり、まとめたものをモジュールとして分割したりしたくなります。データウェアハウス製品の処理系にプリインストールされたパッケージであれば dbt.config.packages
で指定することでインポートが可能です。しかし、ユーザ作成モジュールの場合は一筋縄ではいかず、たとえば dbt v1.8 の公式ドキュメントにはあるモデルで定義された関数は他のモデルで再利用はできないという記述があります。
Code reuse
Currently, Python functions defined in one dbt model can't be imported and reused in other models.
ただし、これは dbt のモデルを再利用できないと述べているだけであり、統一的な仕組みは提供されていないものの各データウェアハウス製品に依存した形であればユーザ作成モジュールでも利用が可能です。
dbt Python model on Snowflake でのユーザ作成モジュールの利用
dbt Python model は Snowflake では stored procedures として動きます。そして、Snowflake の公式ドキュメントには以下のようなステップで Python モジュールを stored procedures から利用できると記されています。
次のステップに従って、依存関係を関数またはプロシージャで使用できるようにします。
- ハンドラーが使用できる ステージを選択または作成します。
- ステージに 依存関係をアップロードします。
- 関数またはプロシージャを作成するときに、 IMPORTS で 依存関係を参照します。
このうち、1 と 2 は自前で行う必要があり、3 は dbt.config.imports
を通じて設定できます。
また、1 と 2 については一時 stage を用いる方法と通常の stage を用いる方法があり、一長一短あります。
1. ステージの準備と 2. アップロード
一時 stage を用いたアップロード
一時 stage は、一時 stage を作成したセッションの間でだけ有効な stage です。dbt はモデルごとに異なるセッションを用いるため、モデルごとにアップロードを行う必要があります。
モデルのpre-hook
で以下のような SQL コマンドを呼び出すことでアップロードができます。
- name: some_model config: pre-hook: - create temporary stage python_imports - put file://{{ env_var('PWD') }}/imports/*.py @python_imports auto_compress=false
ここで、put コマンドはアップロード対象を絶対パスで指定する必要があることに注意が必要です。ここで使っている PWD
は一般的なシェルでは自動的に設定されていますが、Docker コンテナ内で直接起動するなどシェルを経由せずに dbt を呼び出す際には自前で設定しておく必要があります。
なお、Python ファイル内に dbt.config.pre_hook
を記述することもできますが、テンプレートの展開は行われないため絶対パスを直接記述する必要があります。
一時 stage を用いる方法には以下の利点と欠点があります。
利点
CREATE STAGE
権限を持っていない role でも実行できる- モデルに必要なファイルだけを選択してアップロードすることもできる
- 実行後に stage が残らない
- 他のセッションと競合することがない
欠点
- モデルごとに毎回アップロードする必要がある
特にこだわりがなければこちらの一時 stage を使うとよいでしょう。
通常の stage を用いたアップロード
通常の stage はセッション間で永続するため、モデルごとにアップロードを行う必要がありません。そのため、dbt_project.yml
の on-run-start
からのアップロードを一度実行するだけで済みます。
on-run-start: - create stage if not exists python_imports - put file://{{ env_var('PWD') }}/imports/*.py @python_imports auto_compress=false overwrite=true
ここでは上記と同様に PWD
の扱いに注意が必要なほか、前回実行時の stage が残っていることがあるので create or replace stage
を行うか put
で overwrite=true
を指定する必要があります。
通常の stage を用いる方法には一時 stage とは逆の以下の利点と欠点があります。
利点
- アップロードは dbt の実行に対し 1 度で済む
欠点
CREATE STAGE
権限を持った role が必要になる- 実行対象に関わらず全てのモデルを実行するのに十分なファイルをアップロードすることになる
- 明示的に drop しない限り、実行後にも stage が残ってしまう
- 他のセッションから stage を上書きしてしまえる(そもそも dbt の同時実行は安全ではありませんが)
3. インポート
stage へのアップロードができれば、あとは 3 の stored procedures からインポートするだけです。
dbt の公式ドキュメントには記述が無いようですが、dbt.config.imports
で stored procedures でインポートするファイルを指定できます。これは、create procedure コマンドの imports 句に変換されます。
インポートしたファイルはパスの通った場所にあるモジュールとして import することができます。
from models import Building from normalization import normalize_name dbt.config( python_version="3.11", packages=["pydantic==2.5.3", "requests==2.31.0"], imports=[ "@python_imports/base.py", "@python_imports/models.py", "@python_imports/normalization.py", ], )
これで dbt Python model からユーザ作成モジュールを利用できるようになりました。よい dbt ライフをお過ごしください!
ちなみに、同様の方法で Snowflake にプリインストールされていないパッケージであっても利用ができます。ただし、ネイティブコードは含められないなどの制限があるのでご注意ください。
最後に
estie では、プロダクト開発から基盤データの整備まで幅広い領域で人材を募集しています。
データ領域では今回紹介したように dbt でデータパイプラインを開発しており、興味のある方がいらっしゃいましたらぜひバックエンドエンジニア(データ)にお申し込みください。そうでない方でもまずはカジュアル面談からお話しましょう。