GCSへのファイル格納を契機にCloud FunctionsでDataflowをキックする

はじめに

一つの前の記事で、GCSからBigQueryにファイルを送るDataflowを作った。
前の記事の状態では、Dataflowは手動で起動する必要があり、自動化ができていない。
そこで本記事では、GCSへのファイル格納を契機にDataflowをキックするCloud Functionsを作成する。
GCSからBigQueryへファイルを送るところにCloud Functionsを使う(≒Dataflowを使わない)方式もあるが、この方式だと大きいサイズのファイルを送る場合にCloud Functionsの実行時間の上限に引っかかるリスクがある。
そこで今回はCloud FunctionsからDataflowをキックする。
Dataflowの起動を自動化する方法は、Cloud Functionsの他にApp Engine, Cloud Composer, Compute Engineなど様々な方法があるが、サンプルが多い&学んだことが他にも応用しやすいということで、Cloud Functionsを使う。
ランタイムにはPython 3.8を使う。

Cloud Functionsの設定

f:id:sonomirai:20201031163254p:plain
Cloud Functions設定画面
f:id:sonomirai:20201031163316p:plain
ファンクション設定

from googleapiclient.discovery import build

def execute_dataflow_template(event, context):
    project = <PROJECT_ID>
    job = "trialJob"
    template = "gs://dataflow-templates/latest/GCS_Text_to_BigQuery"
    parameters = {
        "javascriptTextTransformFunctionName": "transform",
        "JSONPath": "gs://dataflow-template-trial/schema.json",
        "javascriptTextTransformGcsPath": "gs://dataflow-template-trial/function.js",
        "inputFilePattern": "gs://dataflow-template-trial/FinancialSample.csv",
        "outputTable": project + ":dataflow_dataset.FinancialSample",
        "bigQueryLoadingTemporaryDirectory": "gs://dataflow-template-trial/tmp",
    }

    dataflow = build("dataflow", "v1b3")
    request = (
        dataflow.projects()
        .templates()
        .launch(
            projectId=project,
            gcsPath=template,
            body={
                "jobName": job,
                "parameters": parameters,
            },
        )
    )

    response = request.execute()

動作確認

  • BigQueryのテーブルのデータを空にするとともに、GCSのFinancialSample.csvを削除する。
    この状態でFinancialSample.csvのアップロードを契機に、 BigQueryにデータが転送されることを確認する。
% gsutil cp ./FinancialSample.csv gs://dataflow-template-trial
Copying file://./FinancialSample.csv [Content-Type=text/csv]...
- [1 files][ 71.4 KiB/ 71.4 KiB]                                                
Operation completed over 1 objects/71.4 KiB.                                     

Dataflowを見るとジョブが走っているのが見えて、BigQueryにもデータが入る。Cloud Functionsのログは以下の通り。

f:id:sonomirai:20201031163843p:plain
Cloud Functionsのログ

おわりに

わかったこと

  • Cloud Functionsのファンクションの引数は、デフォルトで設定されている数(今回だと2つ)にしておかないといけない。引数を使っていなくても
  • Cloud FunctionsのGCSトリガーは、バケットまでしか設定できないので、そのままだと特定のファイルが作成されたタイミングで処理をキックする設定はできなそう

わからないこと

  • 接続を「内部トラフィックのみを許可」に設定する際は、認証の設定をしないといけない気がするが、そこはやっていない
  • Goでのファンクションの書き方(Pythonのサンプルが多かったからそっちでやった)

参考URL