GCSへのファイル格納を契機にCloud FunctionsでDataflowをキックする
はじめに
一つの前の記事で、GCSからBigQueryにファイルを送るDataflowを作った。
前の記事の状態では、Dataflowは手動で起動する必要があり、自動化ができていない。
そこで本記事では、GCSへのファイル格納を契機にDataflowをキックするCloud Functionsを作成する。
GCSからBigQueryへファイルを送るところにCloud Functionsを使う(≒Dataflowを使わない)方式もあるが、この方式だと大きいサイズのファイルを送る場合にCloud Functionsの実行時間の上限に引っかかるリスクがある。
そこで今回はCloud FunctionsからDataflowをキックする。
Dataflowの起動を自動化する方法は、Cloud Functionsの他にApp Engine, Cloud Composer, Compute Engineなど様々な方法があるが、サンプルが多い&学んだことが他にも応用しやすいということで、Cloud Functionsを使う。
ランタイムにはPython 3.8を使う。
Cloud Functionsの設定
from googleapiclient.discovery import build def execute_dataflow_template(event, context): project = <PROJECT_ID> job = "trialJob" template = "gs://dataflow-templates/latest/GCS_Text_to_BigQuery" parameters = { "javascriptTextTransformFunctionName": "transform", "JSONPath": "gs://dataflow-template-trial/schema.json", "javascriptTextTransformGcsPath": "gs://dataflow-template-trial/function.js", "inputFilePattern": "gs://dataflow-template-trial/FinancialSample.csv", "outputTable": project + ":dataflow_dataset.FinancialSample", "bigQueryLoadingTemporaryDirectory": "gs://dataflow-template-trial/tmp", } dataflow = build("dataflow", "v1b3") request = ( dataflow.projects() .templates() .launch( projectId=project, gcsPath=template, body={ "jobName": job, "parameters": parameters, }, ) ) response = request.execute()
動作確認
- BigQueryのテーブルのデータを空にするとともに、GCSのFinancialSample.csvを削除する。
この状態でFinancialSample.csvのアップロードを契機に、 BigQueryにデータが転送されることを確認する。
% gsutil cp ./FinancialSample.csv gs://dataflow-template-trial Copying file://./FinancialSample.csv [Content-Type=text/csv]... - [1 files][ 71.4 KiB/ 71.4 KiB] Operation completed over 1 objects/71.4 KiB.
Dataflowを見るとジョブが走っているのが見えて、BigQueryにもデータが入る。Cloud Functionsのログは以下の通り。
おわりに
わかったこと
- Cloud Functionsのファンクションの引数は、デフォルトで設定されている数(今回だと2つ)にしておかないといけない。引数を使っていなくても
- Cloud FunctionsのGCSトリガーは、バケットまでしか設定できないので、そのままだと特定のファイルが作成されたタイミングで処理をキックする設定はできなそう