dbt Python model on Snowflake に型をつける


こんにちは。 estie でソフトウェアエンジニアをしている @Ryosuke839 です。弊社ではデータウェアハウスとして Snowflake を採用しており、データ移送に dbt を使いだしています。dbt では SQL クエリからデータを生成する SQL model の他に Python コードでデータを生成する Python model も使うことができ、複雑なデータ変換や外部 API 呼び出しなどが絡む処理では Python model が役に立ちます。しかし、Python model には様々な制約や落とし穴があり、今回紹介する型指定もその 1 つになります。仕様を探る中で型指定のベストプラクティスを見つけたので共有します。

dbt Python model で型が一定しない

dbt Python model は Python コードの実行に対応したデータウェアハウス製品で動かすことができ、Snowflake もその1つです。

早速、dbt の公式ドキュメントのサンプルコード(を少し改変したもの)を動かしてみましょう。

import holidays
import pandas as pd

def is_holiday(date_col):
    if pd.isnull(date_col):
        return None
    # Chez Jaffle
    french_holidays = holidays.France()
    is_holiday = (date_col in french_holidays)
    return is_holiday

def model(dbt, session):
    dbt.config(
        materialized = "table",
        packages = ["holidays"]
    )

    orders_df = dbt.ref("stg_orders")

    df = orders_df.to_pandas()

    # apply our function
    # (columns need to be in uppercase on Snowpark)
    df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday)
    df["ORDER_DATE"].dt.tz_localize('UTC') # convert from Number/Long to tz-aware Datetime

    # return final dataset (Pandas DataFrame)
    return df

stg_orders モデルのデータを読み、Pandas DataFrame に変換したのちにカラム追加や型変換を行うコードになっています。

適当なデータを stg_orders に生成し、実際に処理を流してみます。


普通のデータを入れたところ、想定通りの型と値が出てきました。(当然ですね)


続いて空のデータを入れたところ、値は想定通りだったものの IS_HOLODAY の型が BOOLEAN ではなくなってしまいました。


さらに null を入力したところ、IS_HOLODAY の型が今度は INTEGER に変化してしまいました。

いずれの例でも値自体は正しかったものの、型は定まらない結果となりました。値を直接参照するだけであれば大きな問題はありませんが、下流のモデルで型に依存した処理が書かれていたりすると検知しにくいバグの出来上がりです。

型が一定しない原因を探る

このような挙動になった理由を探っていきます。

dbt Python model は Snowflake では stored procedures という機能を使って実装されています。この stored procedures は値やテーブルを返すことができ、以下のように戻り値の型を指定できます。

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON ...

しかし、dbt Python model ではテーブルを生成はするものの、Python コードの中で create table を発行し stored procedure 自体は RETURNS STRING と定義され常に文字列を返すだけになっています。

dbt-snowflake/dbt/adapters/snowflake/impl.py

そのため、stored procedure の定義で dbt model の型を指定することはできません。

ここで、dbt Python model のソースコード dbt-snowflake/dbt/include/snowflake/macros/materializations/table.sql を読んでみます。

def materialize(session, df, target_relation):
    # make sure pandas exists
    import importlib.util
    package_name = 'pandas'
    if importlib.util.find_spec(package_name):
        import pandas
        if isinstance(df, pandas.core.frame.DataFrame):
          session.use_database(target_relation.database)
          session.use_schema(target_relation.schema)
          # session.write_pandas does not have overwrite function
          df = session.createDataFrame(df)
    df.write.mode("overwrite").save_as_table('{{ target_relation_name }}', table_type='{{table_type}}')

def main(session):
    dbt = dbtObj(session.table)
    df = model(dbt, session)
    materialize(session, df, dbt.this)
    return "OK"

なにやら、model 関数から pandas.DataFrame が返った場合にのみ特別な処理をしています。

最後に呼ばれている session.createDataFrame のドキュメントを読んでみると、引数として data のほかに任意で schema を取ることがわかります。さらに schema について以下のようなことが書かれています。

When schema is a list of column names or None, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.

また、下の方には以下のようなことも書かれています。

Note

When data is a pandas DataFrame,
snowflake.connector.pandas_tools.write_pandas is called.

write_pandas のドキュメントには型について明確なことは書かれていませんが、Python Connector のドキュメントには Snowflake の型と Pandas の型のマッピングが存在しており、Pandas の型から Snowflake の型も決まると思われます。

今回の DataFrame の生成方法では列の型を指定していないため、Snowflake のテーブルの型も一定していないのでした。

dbt Python model で型を指定する

それでは、dbt Python model ではどのようにして型を指定すればよいでしょうか。

アプローチとしては Pandas DataFrame の型を指定するものと、Snowpark DataFrame を自前で生成するものの 2 つが考えられそうです。

手段 1: Pandas DataFrame の列の型を指定する
-    df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday)
+    df.insert(1, "IS_HOLIDAY", df["ORDER_DATE"].apply(is_holiday).astype(bool))

列を追加する際に astype を呼ぶことで、Pandas での型を強制できます。これで空の場合にも IS_HOLIDAY の型が BOOLEAN になりますが、astype によって NoneFalse に、NaNTrue に変換されてしまい元の値を保てないという問題があります。もちろん全てのレコードが NoneNaN でないのであればこの手法でうまく行きます。

手段 2: Snowpark DataFrame を自前で生成する

先ほどのコードでは pandas.DataFrame に対してのみ session.createDataFrame を呼んでいたので、model 関数からpandas.DataFrame 以外を返せば型推論を回避できそうです。dbt のドキュメントには以下のように記されています。

The model() function must return a single DataFrame. On Snowpark (Snowflake), this can be a Snowpark or pandas DataFrame.

Snowpark DataFrame とは snowflake.snowpark.DataFrame のことで、先ほど登場した session.createDataFrame を呼ぶことで生成できます。つまり、自前で schame 付きの session.createDataFrame を呼んであげればよさそうですので、モデルのコードを書き換えてみます。ここで、内部的に write_pandas が呼んでしまわれないよう Pandas DataFrame を渡さない必要もあります。

from snowflake.snowpark.types import (BooleanType, StructField, StructType, TimestampType)

...
    df.replace({pd.NaT: None}, inplace=True)

    # return final dataset (Pandas DataFrame)
    return session.createDataFrame(df.to_dict('records'), StructType([
        StructField("ORDER_DATE", TimestampType()),
        StructField("IS_HOLIDAY", BooleanType())
    ]))

to_dict('records') を呼ぶことで Pandas DataFrame をlist[dict]に変換しています。

これで全てのケースで正しい型のテーブルを生成できるようになりました。

ただし、Pandas DataFrame のアップロードとそれ以外では内部処理で Stage を経由するかという差があり、パフォーマンスに影響がある可能性があることには注意が必要です。そのため、大規模で欠損値のないようなデータに対しては Pandas を使い、欠損値のあるデータに対してはこの手法を使うなどするのがよさそうです。

おまけ手法: Pydantic と併用する

ちなみに、弊社の実際のパイプラインでは Python コード内でも型安全性を高めるために Pydantic を使用しており、Pydantic モデルと組み合わせたコードも例示しておきます。

class WarehouseModel(BaseModel):
    id: StrictStr
    name: Optional[StrictStr] = None
    floor: Optional[StrictInt] = None
    area: Optional[StrictFloat] = None
...


type_map = {
    str: StringType(),
    int: IntegerType(),
    float: FloatType(),
    bool: BooleanType(),
    Optional[StrictStr]: StringType(),
    Optional[StrictInt]: IntegerType(),
    Optional[StrictFloat]: FloatType(),
    Optional[StrictBool]: BooleanType(),
...
}
schema = StructType([StructField(k, type_map.get(v.annotation, StringType()))
                    for k, v in WarehouseModel.model_fields.items()])

type_map で Pydantic の型から Snowflake の型へのマッピングを定義し、Pydantic モデルから schema を自動生成しています。これで Python コード上での型定義と dbt モデルの型定義を1か所に集約できます。

結論

問題点:

  • dbt Python model on Snowflake では生成されるモデルの型指定が一筋縄ではいかない。
  • 空データや全レコードが null の場合に型が異なることがある。

解決法:

  • Pandas DataFrame を返す場合は型を強制することで型を一定にできる。

    • ただし、型によっては null をうまく扱えない
  • snowflake.snowpark.DataFrameschema 指定付きで返すと細かく型を指定できる

    • schema は Pydantic model から生成することもできる

最後に

estie では、プロダクト開発から基盤データの整備まで幅広い領域で人材を募集しています。

dbt に関心がある方、データウェアハウスと仲がいい方、ライブラリのコードでも読み漁れる方がいらっしゃいましたらぜひバックエンドエンジニア(データ) にお申し込みください。そうでない方でもまずはカジュアル面談からお話しましょう。

hrmos.co

hrmos.co

© 2019- estie, inc.