こんにちは。 estie でソフトウェアエンジニアをしている @Ryosuke839 です。弊社ではデータウェアハウスとして Snowflake を採用しており、データ移送に dbt を使いだしています。dbt では SQL クエリからデータを生成する SQL model の他に Python コードでデータを生成する Python model も使うことができ、複雑なデータ変換や外部 API 呼び出しなどが絡む処理では Python model が役に立ちます。しかし、Python model には様々な制約や落とし穴があり、今回紹介する型指定もその 1 つになります。仕様を探る中で型指定のベストプラクティスを見つけたので共有します。
dbt Python model で型が一定しない
dbt Python model は Python コードの実行に対応したデータウェアハウス製品で動かすことができ、Snowflake もその1つです。
早速、dbt の公式ドキュメントのサンプルコード(を少し改変したもの)を動かしてみましょう。
import holidays import pandas as pd def is_holiday(date_col): if pd.isnull(date_col): return None # Chez Jaffle french_holidays = holidays.France() is_holiday = (date_col in french_holidays) return is_holiday def model(dbt, session): dbt.config( materialized = "table", packages = ["holidays"] ) orders_df = dbt.ref("stg_orders") df = orders_df.to_pandas() # apply our function # (columns need to be in uppercase on Snowpark) df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday) df["ORDER_DATE"].dt.tz_localize('UTC') # convert from Number/Long to tz-aware Datetime # return final dataset (Pandas DataFrame) return df
stg_orders
モデルのデータを読み、Pandas DataFrame に変換したのちにカラム追加や型変換を行うコードになっています。
適当なデータを stg_orders
に生成し、実際に処理を流してみます。
普通のデータを入れたところ、想定通りの型と値が出てきました。(当然ですね)
続いて空のデータを入れたところ、値は想定通りだったものの IS_HOLODAY
の型が BOOLEAN
ではなくなってしまいました。
さらに null を入力したところ、IS_HOLODAY
の型が今度は INTEGER
に変化してしまいました。
いずれの例でも値自体は正しかったものの、型は定まらない結果となりました。値を直接参照するだけであれば大きな問題はありませんが、下流のモデルで型に依存した処理が書かれていたりすると検知しにくいバグの出来上がりです。
型が一定しない原因を探る
このような挙動になった理由を探っていきます。
dbt Python model は Snowflake では stored procedures という機能を使って実装されています。この stored procedures は値やテーブルを返すことができ、以下のように戻り値の型を指定できます。
CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON ...
しかし、dbt Python model ではテーブルを生成はするものの、Python コードの中で create table を発行し stored procedure 自体は RETURNS STRING
と定義され常に文字列を返すだけになっています。
dbt-snowflake/dbt/adapters/snowflake/impl.py
そのため、stored procedure の定義で dbt model の型を指定することはできません。
ここで、dbt Python model のソースコード dbt-snowflake/dbt/include/snowflake/macros/materializations/table.sql を読んでみます。
def materialize(session, df, target_relation): # make sure pandas exists import importlib.util package_name = 'pandas' if importlib.util.find_spec(package_name): import pandas if isinstance(df, pandas.core.frame.DataFrame): session.use_database(target_relation.database) session.use_schema(target_relation.schema) # session.write_pandas does not have overwrite function df = session.createDataFrame(df) df.write.mode("overwrite").save_as_table('{{ target_relation_name }}', table_type='{{table_type}}') def main(session): dbt = dbtObj(session.table) df = model(dbt, session) materialize(session, df, dbt.this) return "OK"
なにやら、model
関数から pandas.DataFrame
が返った場合にのみ特別な処理をしています。
最後に呼ばれている session.createDataFrame のドキュメントを読んでみると、引数として data
のほかに任意で schema
を取ることがわかります。さらに schema
について以下のようなことが書かれています。
When
schema
is a list of column names orNone
, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.
また、下の方には以下のようなことも書かれています。
Note
When data is a pandas DataFrame,
snowflake.connector.pandas_tools.write_pandas is called.
write_pandas のドキュメントには型について明確なことは書かれていませんが、Python Connector のドキュメントには Snowflake の型と Pandas の型のマッピングが存在しており、Pandas の型から Snowflake の型も決まると思われます。
今回の DataFrame の生成方法では列の型を指定していないため、Snowflake のテーブルの型も一定していないのでした。
dbt Python model で型を指定する
それでは、dbt Python model ではどのようにして型を指定すればよいでしょうか。
アプローチとしては Pandas DataFrame の型を指定するものと、Snowpark DataFrame を自前で生成するものの 2 つが考えられそうです。
手段 1: Pandas DataFrame の列の型を指定する
- df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday) + df.insert(1, "IS_HOLIDAY", df["ORDER_DATE"].apply(is_holiday).astype(bool))
列を追加する際に astype
を呼ぶことで、Pandas での型を強制できます。これで空の場合にも IS_HOLIDAY
の型が BOOLEAN
になりますが、astype
によって None
が False
に、NaN
が True
に変換されてしまい元の値を保てないという問題があります。もちろん全てのレコードが None
や NaN
でないのであればこの手法でうまく行きます。
手段 2: Snowpark DataFrame を自前で生成する
先ほどのコードでは pandas.DataFrame
に対してのみ session.createDataFrame
を呼んでいたので、model
関数からpandas.DataFrame
以外を返せば型推論を回避できそうです。dbt のドキュメントには以下のように記されています。
The
model()
function must return a single DataFrame. On Snowpark (Snowflake), this can be a Snowpark or pandas DataFrame.
Snowpark DataFrame とは snowflake.snowpark.DataFrame
のことで、先ほど登場した session.createDataFrame
を呼ぶことで生成できます。つまり、自前で schame
付きの session.createDataFrame
を呼んであげればよさそうですので、モデルのコードを書き換えてみます。ここで、内部的に write_pandas
が呼んでしまわれないよう Pandas DataFrame を渡さない必要もあります。
from snowflake.snowpark.types import (BooleanType, StructField, StructType, TimestampType) ... df.replace({pd.NaT: None}, inplace=True) # return final dataset (Pandas DataFrame) return session.createDataFrame(df.to_dict('records'), StructType([ StructField("ORDER_DATE", TimestampType()), StructField("IS_HOLIDAY", BooleanType()) ]))
to_dict('records')
を呼ぶことで Pandas DataFrame をlist[dict]
に変換しています。
これで全てのケースで正しい型のテーブルを生成できるようになりました。
ただし、Pandas DataFrame のアップロードとそれ以外では内部処理で Stage を経由するかという差があり、パフォーマンスに影響がある可能性があることには注意が必要です。そのため、大規模で欠損値のないようなデータに対しては Pandas を使い、欠損値のあるデータに対してはこの手法を使うなどするのがよさそうです。
おまけ手法: Pydantic と併用する
ちなみに、弊社の実際のパイプラインでは Python コード内でも型安全性を高めるために Pydantic を使用しており、Pydantic モデルと組み合わせたコードも例示しておきます。
class WarehouseModel(BaseModel): id: StrictStr name: Optional[StrictStr] = None floor: Optional[StrictInt] = None area: Optional[StrictFloat] = None ... type_map = { str: StringType(), int: IntegerType(), float: FloatType(), bool: BooleanType(), Optional[StrictStr]: StringType(), Optional[StrictInt]: IntegerType(), Optional[StrictFloat]: FloatType(), Optional[StrictBool]: BooleanType(), ... } schema = StructType([StructField(k, type_map.get(v.annotation, StringType())) for k, v in WarehouseModel.model_fields.items()])
type_map
で Pydantic の型から Snowflake の型へのマッピングを定義し、Pydantic モデルから schema
を自動生成しています。これで Python コード上での型定義と dbt モデルの型定義を1か所に集約できます。
結論
問題点:
- dbt Python model on Snowflake では生成されるモデルの型指定が一筋縄ではいかない。
- 空データや全レコードが null の場合に型が異なることがある。
解決法:
Pandas DataFrame を返す場合は型を強制することで型を一定にできる。
- ただし、型によっては null をうまく扱えない
snowflake.snowpark.DataFrame
をschema
指定付きで返すと細かく型を指定できるschema
は Pydantic model から生成することもできる
最後に
estie では、プロダクト開発から基盤データの整備まで幅広い領域で人材を募集しています。
dbt に関心がある方、データウェアハウスと仲がいい方、ライブラリのコードでも読み漁れる方がいらっしゃいましたらぜひバックエンドエンジニア(データ) にお申し込みください。そうでない方でもまずはカジュアル面談からお話しましょう。