DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)を触ってみる

2020/10/31追記 DataflowのUDF(function.js)を行削除と現在日付をカラム追加する内容に更新


はじめに

GCSからBigQueryにCSVを送ってテーブルを作成するに当たり、DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)が使えそうだったので触ってみた。
結論から言うと、ちょっと触った限りではDataflowのCloud Storage Text to BigQueryテンプレート(バッチ)の良さがわからなかったが、やったことを記録しておく。
なお私はこの記事を書いている時点ではDataflowのドキュメントをほぼ読んでおらず、Dataflowのことを全然分かっていない。
とりあえず動かしてみたという程度で、DataflowやGoogle提供のテンプレートについて誤解があるかもしれないので、その点に留意してほしい。
使ったファイルはGitHubリポジトリにまとめてある。

流れ

  • インプットとなるデータの準備
  • BigQueryセットアップ
  • GCSセットアップ
  • Dataflow実行

インプットとなるデータの準備

今回はMicrosoftが公開している財務サンプルデータを利用する。様々な型のデータがそれなりの量用意されているので、サンプルとしていい感じなので。Power BI 用の Excel の財務サンプル ブックのダウンロードからFinancial Sample.xlsxをダウンロードしてCSVに変換する。この時、フォーマットによっては価格欄に「$」が入ったり、3桁区切りの「,」が入って後々扱いづらいので注意する。Macのnumbersで編集する場合は、セルを全選択した上で、画面右のフォーマットのセルでデータフォーマットを数字にして、3桁区切りのチェックボックスをオフにすると良い。また1行目のカラム名の行も使わないので削除する。ファイル名はFinancialSample.csvとする。

BigQueryセットアップ

データセット作成

適当にデータセットを作成する。ここでデータセットはdataflow_datasetとしている。
入力

bq --location=asia-northeast1 mk \
--dataset \
--default_table_expiration 0 \
--default_partition_expiration 93600 \
--description "" \
<PROJECT_ID>:dataflow_dataset

出力

Dataset '<PROJECT_ID>:dataflow_dataset' successfully created.

スキーマ作成

スキーマ準備は面倒。とにかく手間かけず進めたいので、BigQueryのスキーマ自動検出機能を使ってスキーマを作成する。コマンド打つ時点でテーブルはなくても大丈夫。
入力

bq --location=asia-northeast1 load --autodetect \
--source_format=CSV \
<PROJECT_ID>:dataflow_dataset.FinancialSample \
~/Downloads/FinancialSample.csv

出力

Upload complete.
Waiting on bqjob_r1520ec4b3b8ef942_000001755d6471f2_1 ... (1s) Current status: DONE 

なおここで欲しいのはスキーマだけなので、スキーマ作成後にTRUNCATE TABLE dataflow_dataset.FinancialSample;でデータは削除しておく。 Cloud Storage Text to BigQueryテンプレートを利用するためには、BigQueryのスキーマを後述の2箇所で利用する必要があるため、自動検出されたスキーマJSON形式で出力する。
入力

bq show --schema --format=prettyjson dataflow_dataset.FinancialSample

出力

[
  {
    "mode": "NULLABLE",
    "name": "Segment",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "Country",
    "type": "STRING"
  },
〜中略〜
  {
    "mode": "NULLABLE",
    "name": "string_field_16",
    "type": "STRING"
  }
]

こうして出力したスキーマは以下2箇所で利用する。スキーマをどこで必要になるのかわかりづらくて混乱したので、ここで記しておく。詳細は後述。

  • GCSにスキーマ情報を格納して、Cloud Storage Text to BigQueryテンプレート利用時に指定する

  • Cloud Storage Text to BigQueryテンプレート利用時のUDF内でJSONのキーとして利用する

とりあえずこれでBigQUeryセットアップは終わり。

GCSセットアップ

バケット作成

dataflow-template-sandboxという名前でバケットを作成する。

コマンド

gsutil mb -p <PROJECT_ID> -c STANDARD -l ASIA-NORTHEAST1 -b on gs://dataflow-template-template

出力

Creating gs://dataflow-template-sandbox/...

ファイル格納

作成したバケットには以下を格納する。

  • BigQueryに突っ込むデータ。ここではFinancialSample.csvとする。これは作成済み
  • BigQueryのスキーマ。ここではschema.jsonとして、この後作成する
  • UDF用のJavaScript。ここではfunction.jsとして、この後作成する
  • DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)実行時に吐き出される中間ファイル。Dataflow実行時にフォルダを指定するだけでよい。(そのディレクトリがなくても作ってくれる。)

BigQueryのスキーマ作成

先ほどBigQueryから出力したJSON形式のスキーマをBigQuery SchemaというJSONオブジェクトにする。(なぜ出力したスキーマがそのまま使えないのか。。。管理しづらい。。。という気はするが。)

{
  'BigQuery Schema': [
    {
      "mode": "NULLABLE",
      "name": "Segment",
      "type": "STRING"
    },
    {
      "mode": "NULLABLE",
      "name": "Country",
      "type": "STRING"
    },
  〜中略〜
    {
      "mode": "NULLABLE",
      "name": "string_field_16",
      "type": "STRING"
    }
  ]
}

UDF用のJavaScript作成

UDFでは、BigQueryに突っ込むCSVを、スキーマ名をキーとしたJSONに変換して返却する必要があるらしい。BigQueryにはCSVをそのままインポートする機能があるのに、なぜ途中でJSONに変換する必要があるのか。。。別にUDF内でちょっとデータ加工する分にはCSVのままでもいい気がするが。。。よくわからない。
CSVだとDateは1/1/14のフォーマットになっていてBigQueryに格納できないので、UDFの中で2014-1-1のフォーマットに変換しつつ、CSVJSONに変換していく。
また試しに、ContryがGermanyだったら行を削除する処理、string_field_16カラムに現在日付を設定する処理も追加する。(2020/10/31追記)

function transform(line) {
  var values = line.split(',');
  var obj = new Object();
  obj.Segment = values[0];
  obj.Country = values[1];
  obj.Product = values[2];
  obj.Discount_Band = values[3];
  obj.Units_Sold = values[4];
  obj.Manufacturing_Price = values[5];
  obj.Sale_Price = values[6];
  obj.Gross_Sales = values[7];
  obj.Discounts = values[8];
  obj._Sales = values[9];
  obj.COGS = values[10];
  obj.Profit = values[11];
  var date = values[12].split('/');
  obj.Date = '20' + date[2] + '-' + date[1] + '-' + date[0];
  obj.Month_Number = values[13];
  obj.Month_Name = values[14];
  obj.Year = values[15];
  var dt = new Date();
  var insert_day = dt.getFullYear() + '-' + (dt.getMonth()+1) + '-' + dt.getDate();
  obj.string_field_16 = insert_day;
  var jsonString = JSON.stringify(obj);
  if (obj.Country == "Germany") {
    return null
  }
  return jsonString;
}

GCSに格納するファイルの準備ができたので、FinancialSample.csvとschema.jsonとfunction.jsを格納する

gsutil cp ./FinancialSample.csv gs://dataflow-template-trial
gsutil cp ./schema.json gs://dataflow-template-trial
gsutil cp ./function.js gs://dataflow-template-trial

Dataflow実行

準備ができたので、Dataflowを実行する。
入力

gcloud dataflow jobs run trialJob \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=transform,\
JSONPath=gs://dataflow-template-trial/schema.json,\
javascriptTextTransformGcsPath=gs://dataflow-template-trial/function.js,\
inputFilePattern=gs://dataflow-template-trial/FinancialSample.csv,\
outputTable=<PROJECT_ID>:dataflow_dataset.FinancialSample,\
bigQueryLoadingTemporaryDirectory=gs://dataflow-template-trial/tmp --region=asia-northeast1

出力

createTime: '2020-10-25T14:54:52.699960Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: 2020-10-25_07_54_50-9162729669340431769
location: asia-northeast1
name: trialJob
projectId: <PROJECT_ID>
startTime: '2020-10-25T14:54:52.699960Z'
type: JOB_TYPE_BATCH

CLIの戻り値だと成功/失敗はわからない?コンソールで成功していること、BigQueryに確かにデータが格納されていることを確認する。(BigQueryに挿入される順番はCSV通りの順番ではないっぽい)

f:id:sonomirai:20201026002751p:plain
dataflowのjob成功画面
f:id:sonomirai:20201031014327p:plain
BigQueryのプレビュー画面

おわりに

わかったこと

  • DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)の動かし方
  • Dataflowのデバッグの仕方
  • Dataflowはコンソールだとできない操作方法が多いっぽいということ
  • DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)で指定するJavaScriptのUDFは、BigQueryのUDFとは役割が違いそうなこと
  • GCS上のtmpバケットは処理がコケると残って、成功すると削除される
  • そもそもDataflowのCloud Storage Text to BigQueryテンプレート(バッチ)はβ版

わからなかったこと

  • Dataflowのジョブの操作方法(停止や更新など)
  • Dataflowのジョブ(バッチ)は、バッチということだけど時刻の設定箇所がわからなかった。そういう類のものではない?
  • GCSのデータが更新されたらどのタイミングでBigQueryのデータが更新されるのか?そもそも更新されるのか?更新時は追記されるのか?上書きされるのか?
  • Dataflow全般(DataflowのCloud Storage Text to BigQueryテンプレート(バッチ)を動かしたくらいではDataflowのことはわからないことがわかった笑)

参考サイト