Snowflake で GIS データを取り込む

estie では多様な不動産関連データを扱っており、分析などに用いる補助データとしてオープンな GIS(地理情報システム)データもそのひとつです。最近では行政関連のオープンデータをプロダクト上に表示するといった取り組みも行っています。

今回は estie がデータ基盤として用いている Snowflake に GIS データを取り込む方法についてご紹介します。

GIS データ

GIS データは主に図形情報と属性情報からなり、以下の図のように地図に重畳されることもあります。

estie がオープンな GIS データのソースとしてお世話になっているのが国土交通省の国土数値情報です。国土数値情報ダウンロードサイトでは、GIS データが主に GML 形式、シェープファイル形式、GeoJSON 形式の 3 つの形式で配布されています。いずれの形式でも同じデータが含まれてはいますが、データベースへの取り込みにおいてはバイナリ形式で省サイズ、データスキーマの定義が含まれる、といった理由からシェープファイル形式が最も親和性が高いのではないでしょうか。

シェープファイルには以下の特徴があります。

  • .shp .dbf など複数のファイルから構成される
    • .shp ファイルはベクトル形式の図形情報を含む
    • .dbf ファイルはそれぞれの図形に付加される属性情報を含む
      • 表形式であり、データスキーマを持つ
  • バイナリ形式でありテキスト形式と比べ省サイズである

Snowflake と地理空間データ

Snowflake は 地理空間データ型 | Snowflake Documentation にあるように地理空間データの扱いに力を入れていて、多数の地理空間関数や地理空間型に対しての検索最適化などがサポートされています。

入出力については、図形情報の well-known text, well-known binary, GeoJSON での入出力がサポートされていますが、属性情報もあわせて入出力するには well-known text を .csv の列として与えてパースする、GeoJSON のフィールドを自前で分解するなどの操作が必要となります。また、GML 形式やシェープファイル形式の直接の入出力はサポートされていません。

Snowflake でシェープファイルを読み込む

シンプルな読み込み

さいわい Snowflake では stored procedurePython worksheets を使って任意の Python コードを実行することができ、また Python で GIS データを扱ういくつかのライブラリもプリインストールされています。

GIS データの読み込みには fiona というライブラリを使うことができ、これは geopandas の入出力部分にも使われています。fiona でのシェープファイルの読み込みはとてもシンプルで、以下のようにできます。

# 代表の .shp ファイルを指定するとシェープファイル全体が読み込まれる
with fiona.open("foo.shp", encoding="cp932") as data:
    schema = data.schema["properties"]
    assert isinstance(schema, dict)

    records = [(
        *(row["properties"][k] for k in schema.keys()),
        # geometry は WKB に変換しておくことで省メモリになる
        shapely.geometry.shape(row["geometry"]).wkb if row["geometry"] is not None else None,
    ) for row in data]

取得した schemarecords を用いて Snowpark DataFrame を生成するには以下のようにします。

type_map: dict[str, snowpark_types.DataType] = {
    "int32": snowpark_types.IntegerType(),
    "float": snowpark_types.FloatType(),
    "str": snowpark_types.StringType(),
    "date": snowpark_types.DateType(),
    "time": snowpark_types.TimeType(),
    "datetime": snowpark_types.TimestampType(),
    "bytes": snowpark_types.BinaryType(),
    "int64": snowpark_types.IntegerType(),
    "int": snowpark_types.IntegerType(),
    "List[str]": snowpark_types.ArrayType(snowpark_types.StringType()),
}

return session.create_dataframe(records, schema=snowpark_types.StructType([
    *(snowpark_types.StructField(k, type_map[fiona.schema.normalize_field_type(v)]) for k, v in schema.items()),
    # geometry は WKB なので BinaryType
    snowpark_types.StructField("GEOMETRY", snowpark_types.BinaryType()),
])).select_expr(
    "* EXCLUDE (GEOMETRY)",
    # WKB から geography に変換する
    "ST_GEOGRAPHYFROMWKB(GEOMETRY, TRUE) as GEOMETRY",
)

GeoPandas の DataFrame などから直接 Snowpark DataFrame を生成すると型は値から推論されてしまいますが、ここでは schema の型情報から Snowflake の型情報を生成することでシェープファイルでの型定義が Snowflake に引き継がれるようにしています。また、通常のキャストでは自己交差辺を持つなどした無効な図形は取り込むことができませんが、allow_invalid=true 指定した ST_GEOGRAPHYFROMWKB を噛ませることで無効な図形もひとまず取り込めるようにしています。

.zip ファイルからの読み込み

さらに、国土数値情報で提供されるシェープファイルは .zip に複数が格納されていることがありますが、嬉しいことに fiona は .zip ファイルのまま内部のシェープファイルを読み出すこともできます。

for name in fiona.listlayers("foo.zip"):
    with fiona.open("foo.zip", layer=name, encoding="cp932") as data:
        ...

Snowflake stage 上の .zip ファイルからの読み込み

さらにさらに、Snowflake の stage からは session.file.get_stream でローカルにファイルをコピーすることなくストリームを取得できますが、fiona はストリームから .zip ファイルを読み出すこともできます。

with zipfile.ZipFile(stream) as zip:
    filelist = zip.filelist
stream.seek(0)
with fiona.io.ZipMemoryFile(stream) as memoryfile:
    for zipinfo in filelist:
        if zipinfo.filename.endswith(suffix):
            with memoryfile.open(path=zipinfo.filename, encoding="cp932") as data:
                ...

実際に取り込む

国土数値情報の都市計画決定情報データなどは複数の .zip ファイルの中に複数のシェープファイルが含まれるという構造をしており、以下の図に示すように階層的にデータを読み込む必要があります。


これまでのコードを組み合わせることでこのようなデータ構造に対応でき、以下のような関数を定義できます。

import zipfile

import fiona
import fiona.io
import fiona.schema
import shapely.geometry
import snowflake.snowpark as snowpark
import snowflake.snowpark.types as snowpark_types


def load_shape_data(session: snowpark.Session, stage: str, suffix: str = ".shp", encoding: str = "cp932") -> snowpark.DataFrame:
    records: list[tuple] = []
    schema: dict | None = None

    for row in session.sql(f"list @{stage}").select("\"name\"").to_local_iterator():
        with session.file.get_stream(f"@{''.join(name + '.' for name in stage.split('/')[0].split('.')[:-1])}{row[0]}") as stream:

            with zipfile.ZipFile(stream) as zip:
                filelist = zip.filelist

            stream.seek(0)

            with fiona.io.ZipMemoryFile(stream) as memoryfile:
                for zipinfo in filelist:
                    if zipinfo.filename.endswith(suffix):
                        with memoryfile.open(path=zipinfo.filename, encoding=encoding) as data:
                            if schema is None:
                                schema = data.schema["properties"]
                                assert isinstance(schema, dict)
                            else:
                                if schema != data.schema["properties"]:
                                    raise ValueError(f"schema not equal: {schema} vs {data.schema['properties']}")

                            records.extend((
                                *(row["properties"][k] for k in schema.keys()),
                                # geometry は WKB に変換しておくことで省メモリになる
                                shapely.geometry.shape(row["geometry"]).wkb if row["geometry"] is not None else None,
                            ) for row in data)

    if schema is None:
        raise ValueError("No data found")

    type_map: dict[str, snowpark_types.DataType] = {
        "int32": snowpark_types.IntegerType(),
        "float": snowpark_types.FloatType(),
        "str": snowpark_types.StringType(),
        "date": snowpark_types.DateType(),
        "time": snowpark_types.TimeType(),
        "datetime": snowpark_types.TimestampType(),
        "bytes": snowpark_types.BinaryType(),
        "int64": snowpark_types.IntegerType(),
        "int": snowpark_types.IntegerType(),
        "List[str]": snowpark_types.ArrayType(snowpark_types.StringType()),
    }

    return session.create_dataframe(records, schema=snowpark_types.StructType([
        *(snowpark_types.StructField(k, type_map[fiona.schema.normalize_field_type(v)]) for k, v in schema.items()),
        # geometry は WKB なので BinaryType
        snowpark_types.StructField("GEOMETRY", snowpark_types.BinaryType()),
    ])).select_expr(
        "* EXCLUDE (GEOMETRY)",
        # WKB から geography に変換する
        "ST_GEOGRAPHYFROMWKB(GEOMETRY, TRUE) as GEOMETRY",
    )


def main(session: snowpark.Session):
    return load_shape_data(session, "DATABASE.SCHEMA.KSJ/A55-22_", "_youto.shp")

国土数値情報の都市計画決定情報データを stage に格納したうえで以上のコードを Python worksheet で実行すると、次のようなデータを得られます。

これでシェープファイルを Snowflake に取り込むことができました。

GIS データをクエリする

取り込んだデータは地理空間関数で加工したりフィルタリングすることができ、例えば estie の所在地の用途地域は今回のデータを使い次のようなクエリで求めることができます。

select "用途地域", "容積率", "建ぺい率"
from foo
where st_intersects(geometry, st_makepoint(139.731188, 35.6650594)); 


最後に

今回は Snowflake で GIS データのフォーマットの一つであるシェープファイルを取り込む方法を紹介しました。

estie では、プロダクト開発から基盤データの整備まで幅広い領域で人材を募集しています。今回紹介した GIS データの取り込みはデータチームでデータ基盤のひとつとして整備し、活用は全社で行っています。データ基盤の整備に興味のある方がいらっしゃいましたらぜひバックエンドエンジニア(データ)にお申し込みください。そうでない方でもまずはカジュアル面談からお話しましょう。

hrmos.co

hrmos.co

© 2019- estie, inc.