DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)を触ってみる
2020/10/31追記 DataflowのUDF(function.js)を行削除と現在日付をカラム追加する内容に更新
はじめに
GCSからBigQueryにCSVを送ってテーブルを作成するに当たり、DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)が使えそうだったので触ってみた。
結論から言うと、ちょっと触った限りではDataflowのCloud Storage Text to BigQueryテンプレート(バッチ)の良さがわからなかったが、やったことを記録しておく。
なお私はこの記事を書いている時点ではDataflowのドキュメントをほぼ読んでおらず、Dataflowのことを全然分かっていない。
とりあえず動かしてみたという程度で、DataflowやGoogle提供のテンプレートについて誤解があるかもしれないので、その点に留意してほしい。
使ったファイルはGitHubリポジトリにまとめてある。
流れ
インプットとなるデータの準備
今回はMicrosoftが公開している財務サンプルデータを利用する。様々な型のデータがそれなりの量用意されているので、サンプルとしていい感じなので。Power BI 用の Excel の財務サンプル ブックのダウンロードからFinancial Sample.xlsxをダウンロードしてCSVに変換する。この時、フォーマットによっては価格欄に「$」が入ったり、3桁区切りの「,」が入って後々扱いづらいので注意する。Macのnumbersで編集する場合は、セルを全選択した上で、画面右のフォーマットのセルでデータフォーマットを数字にして、3桁区切りのチェックボックスをオフにすると良い。また1行目のカラム名の行も使わないので削除する。ファイル名はFinancialSample.csvとする。
BigQueryセットアップ
データセット作成
適当にデータセットを作成する。ここでデータセットはdataflow_datasetとしている。
入力
bq --location=asia-northeast1 mk \ --dataset \ --default_table_expiration 0 \ --default_partition_expiration 93600 \ --description "" \ <PROJECT_ID>:dataflow_dataset
出力
Dataset '<PROJECT_ID>:dataflow_dataset' successfully created.
スキーマ作成
スキーマ準備は面倒。とにかく手間かけず進めたいので、BigQueryのスキーマ自動検出機能を使ってスキーマを作成する。コマンド打つ時点でテーブルはなくても大丈夫。
入力
bq --location=asia-northeast1 load --autodetect \ --source_format=CSV \ <PROJECT_ID>:dataflow_dataset.FinancialSample \ ~/Downloads/FinancialSample.csv
出力
Upload complete. Waiting on bqjob_r1520ec4b3b8ef942_000001755d6471f2_1 ... (1s) Current status: DONE
なおここで欲しいのはスキーマだけなので、スキーマ作成後にTRUNCATE TABLE dataflow_dataset.FinancialSample;
でデータは削除しておく。
Cloud Storage Text to BigQueryテンプレートを利用するためには、BigQueryのスキーマを後述の2箇所で利用する必要があるため、自動検出されたスキーマをJSON形式で出力する。
入力
bq show --schema --format=prettyjson dataflow_dataset.FinancialSample
出力
[ { "mode": "NULLABLE", "name": "Segment", "type": "STRING" }, { "mode": "NULLABLE", "name": "Country", "type": "STRING" }, 〜中略〜 { "mode": "NULLABLE", "name": "string_field_16", "type": "STRING" } ]
こうして出力したスキーマは以下2箇所で利用する。スキーマをどこで必要になるのかわかりづらくて混乱したので、ここで記しておく。詳細は後述。
GCSにスキーマ情報を格納して、Cloud Storage Text to BigQueryテンプレート利用時に指定する
Cloud Storage Text to BigQueryテンプレート利用時のUDF内でJSONのキーとして利用する
とりあえずこれでBigQUeryセットアップは終わり。
GCSセットアップ
バケット作成
dataflow-template-sandboxという名前でバケットを作成する。
コマンド
gsutil mb -p <PROJECT_ID> -c STANDARD -l ASIA-NORTHEAST1 -b on gs://dataflow-template-template
出力
Creating gs://dataflow-template-sandbox/...
ファイル格納
作成したバケットには以下を格納する。
- BigQueryに突っ込むデータ。ここではFinancialSample.csvとする。これは作成済み
- BigQueryのスキーマ。ここではschema.jsonとして、この後作成する
- UDF用のJavaScript。ここではfunction.jsとして、この後作成する
- DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)実行時に吐き出される中間ファイル。Dataflow実行時にフォルダを指定するだけでよい。(そのディレクトリがなくても作ってくれる。)
BigQueryのスキーマ作成
先ほどBigQueryから出力したJSON形式のスキーマをBigQuery SchemaというJSONオブジェクトにする。(なぜ出力したスキーマがそのまま使えないのか。。。管理しづらい。。。という気はするが。)
{ 'BigQuery Schema': [ { "mode": "NULLABLE", "name": "Segment", "type": "STRING" }, { "mode": "NULLABLE", "name": "Country", "type": "STRING" }, 〜中略〜 { "mode": "NULLABLE", "name": "string_field_16", "type": "STRING" } ] }
UDF用のJavaScript作成
UDFでは、BigQueryに突っ込むCSVを、スキーマ名をキーとしたJSONに変換して返却する必要があるらしい。BigQueryにはCSVをそのままインポートする機能があるのに、なぜ途中でJSONに変換する必要があるのか。。。別にUDF内でちょっとデータ加工する分にはCSVのままでもいい気がするが。。。よくわからない。
CSVだとDateは1/1/14
のフォーマットになっていてBigQueryに格納できないので、UDFの中で2014-1-1
のフォーマットに変換しつつ、CSVをJSONに変換していく。
また試しに、ContryがGermanyだったら行を削除する処理、string_field_16カラムに現在日付を設定する処理も追加する。(2020/10/31追記)
function transform(line) { var values = line.split(','); var obj = new Object(); obj.Segment = values[0]; obj.Country = values[1]; obj.Product = values[2]; obj.Discount_Band = values[3]; obj.Units_Sold = values[4]; obj.Manufacturing_Price = values[5]; obj.Sale_Price = values[6]; obj.Gross_Sales = values[7]; obj.Discounts = values[8]; obj._Sales = values[9]; obj.COGS = values[10]; obj.Profit = values[11]; var date = values[12].split('/'); obj.Date = '20' + date[2] + '-' + date[1] + '-' + date[0]; obj.Month_Number = values[13]; obj.Month_Name = values[14]; obj.Year = values[15]; var dt = new Date(); var insert_day = dt.getFullYear() + '-' + (dt.getMonth()+1) + '-' + dt.getDate(); obj.string_field_16 = insert_day; var jsonString = JSON.stringify(obj); if (obj.Country == "Germany") { return null } return jsonString; }
GCSに格納するファイルの準備ができたので、FinancialSample.csvとschema.jsonとfunction.jsを格納する
gsutil cp ./FinancialSample.csv gs://dataflow-template-trial gsutil cp ./schema.json gs://dataflow-template-trial gsutil cp ./function.js gs://dataflow-template-trial
Dataflow実行
準備ができたので、Dataflowを実行する。
入力
gcloud dataflow jobs run trialJob \ --gcs-location gs://dataflow-templates/latest/GCS_Text_to_BigQuery \ --parameters \ javascriptTextTransformFunctionName=transform,\ JSONPath=gs://dataflow-template-trial/schema.json,\ javascriptTextTransformGcsPath=gs://dataflow-template-trial/function.js,\ inputFilePattern=gs://dataflow-template-trial/FinancialSample.csv,\ outputTable=<PROJECT_ID>:dataflow_dataset.FinancialSample,\ bigQueryLoadingTemporaryDirectory=gs://dataflow-template-trial/tmp --region=asia-northeast1
出力
createTime: '2020-10-25T14:54:52.699960Z' currentStateTime: '1970-01-01T00:00:00Z' id: 2020-10-25_07_54_50-9162729669340431769 location: asia-northeast1 name: trialJob projectId: <PROJECT_ID> startTime: '2020-10-25T14:54:52.699960Z' type: JOB_TYPE_BATCH
CLIの戻り値だと成功/失敗はわからない?コンソールで成功していること、BigQueryに確かにデータが格納されていることを確認する。(BigQueryに挿入される順番はCSV通りの順番ではないっぽい)
おわりに
わかったこと
- DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)の動かし方
- Dataflowのデバッグの仕方
- Dataflowはコンソールだとできない操作方法が多いっぽいということ
- DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)で指定するJavaScriptのUDFは、BigQueryのUDFとは役割が違いそうなこと
- GCS上のtmpバケットは処理がコケると残って、成功すると削除される
- そもそもDataflowのCloud Storage Text to BigQueryテンプレート(バッチ)はβ版
わからなかったこと
- Dataflowのジョブの操作方法(停止や更新など)
- Dataflowのジョブ(バッチ)は、バッチということだけど時刻の設定箇所がわからなかった。そういう類のものではない?
- GCSのデータが更新されたらどのタイミングでBigQueryのデータが更新されるのか?そもそも更新されるのか?更新時は追記されるのか?上書きされるのか?
- Dataflow全般(DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)を動かしたくらいではDataflowのことはわからないことがわかった笑)