電通総研 テックブログ

電通総研が運営する技術ブログ

ユニットテストで学ぶDataflowの基本

みなさんこんにちは、電通国際情報サービス(ISID)コーポレート本部 システム推進部の佐藤太一です。

このエントリではGoogle Dataflowを使ったデータ分析パイプライン構築において中心的なAPIの使い方について説明します。

Google Dataflowとはなにか

Google DataflowはいわゆるExtract/Transform/Load(ETL)ツールの一種です。Apache Beamというバッチ処理基盤をGCPの分散処理環境で動かしてくれます。

Apache Beam自体は、Apache FlinkApache SparkHazelcast Jetといったオンプレミスで動作する実行環境を利用することもできます。

Apache Beamでは、JavaPython、Goといった言語で処理を記述できますが、今回はJavaを使って説明します。

Dataflowの開発環境構築

まずは、Dataflowの開発環境を作っていきましょう。

開発環境として使うマシンには、事前にJava17とGradle7.5以上をインストールしておいてください。

GradleによるDataflowプロジェクトの作り方

最初にプロジェクト全体を格納するためのディレクトリを作成しましょう。

ここからは、この記事内でシェルコマンドを実行するよう説明している部分では、必ずこのルートディレクトリで実行してください。

作るプロジェクトは、説明のために dataflow-example とします。作ったdataflow-example ディレクトリの中で、以下のコマンドを実行して最小限のプロジェクトを作成します。

gradle init --type basic --dsl kotlin --project-name dataflow-example --incubating

最小限とはいえ、Gradle Wrapperとなるシェルスクリプトやgit用の設定ファイルが生成されていますね。

この中から、 build.gradle.kts を以下のように編集します。

plugins {
    id("java")
}

group = "com.example.dataflow"
version = "0.1.0-SNAPSHOT"

java.toolchain {
    languageVersion.set(JavaLanguageVersion.of(17))                                          // 1.
}

repositories {
    mavenCentral()
    maven("https://packages.confluent.io/maven/")                                            // 2.
}

dependencies {
    var beamVersion = "2.41.0"                                                               // 3.
    var slf4jVersion = "1.7.36"

    implementation(platform("com.google.cloud:libraries-bom:25.4.0"))                        // 4.

    implementation("org.apache.beam:beam-sdks-java-core:${beamVersion}")                     // 5.
    implementation("org.apache.beam:beam-sdks-java-io-google-cloud-platform:${beamVersion}") // 5.
    implementation("org.apache.beam:beam-runners-google-cloud-dataflow-java:${beamVersion}") // 5.
    implementation("org.apache.commons:commons-csv:1.9.0")
    implementation("org.slf4j:slf4j-api:${slf4jVersion}")
    implementation("org.slf4j:slf4j-jdk14:${slf4jVersion}")

    testImplementation("junit:junit:4.13.2")                                                 // 6.
    testImplementation("org.apache.beam:beam-runners-direct-java:${beamVersion}")            // 7.
}

tasks.withType<JavaCompile>().configureEach {
    options.encoding = "UTF-8"
}
  1. このビルドスクリプトで利用するコンパイラやランタイムのバージョン番号を指定しています。
    • Gradleを実行しているJavaランタイムのバージョンが開発者ごとにズレていても、コンパイルやテストに使うJavaランタイムは統一できるということです。ビルドの再現性が高まりますので必ず設定しましょう。
    • この機能を使うと、必要に応じてGradleがビルド済みのJDKを自動的にダウンロードしてくれます。
    • デフォルトではAdoptiumを使います。
  2. 依存ライブラリをダウンロードする先を宣言しています。
  3. Apache Beamのバージョンを参照する依存性がいくつかあるので、ここでは変数として切り出しています。
  4. Dataflowを動かすために必要なGCPSDKに対する依存性を宣言しています。GradleのPlatform機能を使っていますね。
  5. Apache Beamに対する依存性を宣言しています。後半の二つはGCPでBeamを動かすために必要な依存性です。
  6. ユニットテスト用の依存性としてJUnit4を指定しています。JUnitの最新版はJUnit5系ですが、記事執筆時点においてApache BeamはJUnit5をサポートしていません。cf. JUnit5 support
  7. Apache Beam用のユニットテストライブラリに対する依存性を宣言しています。

これでDataflow用のローカルビルド環境の構築は完了です。

Apache Beamの基礎

Apache Beamを理解するなら、まずはPipelineとPCollectionをしっかり理解してください。

他のコンセプトについては、Dataflowのドキュメントを参照してください。 * Apache Beam のプログラミング モデル

Pipelineについて

Pipelineは複数のステップから構成される処理全体を表すオブジェクトで、データの読み取り処理から始まりフィルターや変換を経て、出力処理までを行います。一つのパイプラインが一つのジョブとなります。

Pipelineを構成する各ステップは、実行環境が必要に応じて分散処理してくれます。つまり、各ステップを効率よく動作させるには、それぞれのステップが全く違ったプロセスの上で非同期に実行されても問題がおきないようにしましょう。 具体的には、処理単位になるデータの独立性をできるかぎり高めるようにします。つまり、RDBにおける非正規化を積極的に行うようなデータの持ち方をします。

順序に強い整合性を求める書き方もできますが、そうすると分散処理環境がもつ性能を十分に引き出せません。

PCollectionについて

PCollectionはPipelineを流れるデータの集合を表すオブジェクトです。 逐次的に要素を扱えるのでJavaのCollection Frameworkと似ていますが、PCollectionのAPIだけではデータの開始と終了を明示的に調べられません。 また、PCollectionに格納されている要素は、分散処理環境内における実行環境の都合でシリアライズされたりコピーされる可能性があります。つまり、処理に必要な情報は全て要素内に内包する必要があります。

ParDoを使った逐次処理の書き方

基本的な概念が分かった所で本題に入っていきましょう。

最初に作るのは、PCollectionを流れる要素を1:1で変換していく処理です。

この図は四角い枠がPCollectionで、〇が各要素、矢印が処理です。つまり、全体がPipelineとなります。

今回の記事では、全てのコードをテストコードとして実装しますので、以下のようにディレクトリを作成します。

mkdir src/test/java/com/example/dataflow

出来たディレクトリに CsvFn.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.transforms.DoFn;
import java.util.*;

public class CsvFn extends DoFn<String, List<String>> {               // 1.
    @ProcessElement                                                   // 2.
    public void processElement(ProcessContext c) throws Exception {   // 3.
        var element = c.element();                                    // 4.
        var list = Arrays.asList(element.split(","));                 // 5.
        c.output(list);                                               // 6.
    }
}

この処理では、単一の文字列を入力すると、それをカンマ区切りで分割したリストとして後続の処理に引き渡します。

  1. 逐次処理を実装する際に使うクラスは DoFn を継承します。
  2. 一つ目の型パラメータは、各入力要素を表す型を設定します。ここでは String を設定しています。
  3. 二つ目の型パラメータは、各出力要素を表す型を設定します。ここでは List<String> を設定しています。
  4. 逐次処理を行うメソッドは @ProcessElement アノテーションを付与します。
  5. 逐次処理を行うメソッドのアクセス修飾子は public 、戻り値は void です。送出される例外としては Exception を定義しておきます。
  6. なお、このメソッドの中から例外を送出するとジョブ全体が停止します。
  7. ProcessContextelementメソッドを呼ぶと、DoFnを継承する際に設定した一番目の型パラメータの変数が得られます。ここでは String型の変数が得られるわけです。
  8. Stringsplitメソッドを呼びだして得られた配列をListに格納しています。
  9. ProcessContextoutputメソッドを呼ぶ際には、DoFnを継承する際に設定した二番目の型パラメータの変数を渡します。ここでは既に作成済みのlistを渡していますね。

Dataflowによるユニットテストの書き方

次は、逐次処理をユニットテストしてみましょう。

CsvFn.java と同じディレクトリ内に CsvFnTest.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.testing.*;
import org.apache.beam.sdk.transforms.*;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.List;

@RunWith(JUnit4.class)
public class CsvFnTest {

    static final List<String> values = List.of(
            "foo,bar,baz", "fo1,ba2,ba3", "ba1,ba2,ba3");
    
    @Rule
    public TestPipeline pipeline = TestPipeline.create();   // 1.

    @Test
    public void testSimplePipeline() throws Exception {
        var output = pipeline
                .apply(Create.of(values))                   // 2.
                .apply(ParDo.of(new CsvFn()));              // 3.

        PAssert.that(output).containsInAnyOrder(            // 4.
                List.of("foo", "bar", "baz"),
                List.of("ba1", "ba2", "ba3"),               // 5.
                List.of("fo1", "ba2", "ba3")
        );

        pipeline.run().waitUntilFinish();                   // 6.
    }
}
  1. ユニットテスト用のパイプラインを生成しています。パイプラインを構成するための共通処理があるので@Ruleを付与しています。
  2. Createofメソッドを使って文字列のリストをパイプラインに流せる形に変換しています。ここでは、Listの各要素がパイプラインを流れていきます。
  3. ParDoofメソッドに先ほど実装したCsvFnインスタンス化して渡しています。これによって、パイプラインを流れる各要素ごとにCsvFnprocessElementメソッドが呼びだされます。
  4. パイプラインを流れる要素が正しく変換されているか確認するには、Apache Beamで用意されている専用のPAssertを使います。ここでは containsInAnyOrderメソッドを使ってそれぞれの要素が正しくカンマ区切りで分解されたか確認しています。
  5. values 変数として定義した要素の順序とは違った順序で要素を検証しています。これは、パイプラインを流れる要素の処理順序は保証されておらず、実行環境の都合で任意に入れ替わる可能性があることを意図しています。つまり、Createofメソッドで作った要素がそのままの順序でCsvFnprocessElementメソッドに入ってくるとは限りません。
  6. TestPipelinerunメソッドを呼びだした上で、さらにwaitUntilFinishメソッドを呼んでパイプラインの処理が終わるのを待っています。デバッガで実行する際に注意してほしいのは、この時点で初めてパイプラインの処理が動き始めることです。つまり、 4. の時点では、まだCsvFnprocessElementメソッドは呼びだされません。

フィルター

CsvFnでは単純な1:1の変換処理を実装しましたので、次はフィルター処理を実装してみましょう。

フィルター処理として作るのは、指定した長さよりも長い文字列だけを後続の処理に流すフィルターです。

CsvFn.java と同じディレクトリ内に FilterFn.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.transforms.DoFn;

public class FilterFn extends DoFn<String, String> { // 1.

    final int size;                                  // 2.

    public FilterFn(int size) {
        this.size = size;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        var element = c.element();
        if (size < element.length()) {               // 3.
            c.output(element);
        }
    }
}
  1. ここで実装するのはフィルター処理なので、入力と出力の型は同じです。
  2. コンストラクタで受け取った長さをメンバ変数として格納しています。Apache BeamではPCollectionの要素だけでなく、各処理のステップを表すオブジェクトも実行環境の都合で直列化される可能性があります。つまり、メンバ変数としてはSerializableな型(もしくは、Externalizableな型)だけを定義できます。
  3. 条件分岐に基づいてProcessContextoutputメソッドを呼びだすかどうかを決めています。

フィルターのテスト

では、フィルター処理をユニットテストしてみましょう。

CsvFn.java と同じディレクトリ内に FilterFnTest.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.testing.*;
import org.apache.beam.sdk.transforms.*;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.List;

@RunWith(JUnit4.class)
public class FilterFnTest {
    static final List<String> values = List.of(
            "alpha", "beta", "gamma");
    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testSimplePipeline() throws Exception {
        var output = pipeline
                .apply(Create.of(values))
                .apply(ParDo.of(new FilterFn(4)));

        PAssert.that(output).containsInAnyOrder("gamma", "alpha");

        pipeline.run().waitUntilFinish();
    }
}

長さが4文字より大きい単語をフィルターできていますね。

値の増幅処理

次は、一つの入力から複数回の出力を行う処理を実装してみましょう。

増幅処理として作るのは、文字列をカンマ区切りで分割した各要素をそのまま後続に渡す処理です。

CsvFn.java と同じディレクトリ内に FlatValuesFn.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.transforms.DoFn;

import java.util.Arrays;

public class FlatValuesFn extends DoFn<String, String> {  // 1.
    @ProcessElement
    public void processElement(ProcessContext c) {
        var element = c.element();
        var list = Arrays.asList(element.split(","));
        list.forEach(c::output);                          // 2.
    }
}
  1. ここで実装するのは増幅処理なので、入力と出力の型は同じです。
  2. 文字列を分割して得られた要素全てについてProcessContextouputメソッドを呼びだしています。

増幅処理のテスト

では、増幅処理をユニットテストしてみましょう。

CsvFn.java と同じディレクトリ内に FlatValuesFnTest.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.testing.*;
import org.apache.beam.sdk.transforms.*;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.List;

@RunWith(JUnit4.class)
public class FlatValuesFnTest {

    static final List<String> values = List.of(
            "foo,bar,baz", "fo1,ba2,ba3", "ba1,ba2,ba3");
    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testSimplePipeline() throws Exception {
        var output = pipeline
                .apply(Create.of(values))
                .apply(ParDo.of(new FlatValuesFn()));

        PAssert.that(output).containsInAnyOrder(
                "foo", "bar", "baz",
                "ba1", "ba2", "ba3",
                "fo1", "ba2", "ba3"
        );

        pipeline.run().waitUntilFinish();
    }
}

カンマ区切りで3つずつに分割できる要素を3回FlatValuesFnで処理したので9つの要素が出力されていますね。

PCollectionの分岐

ここまでの処理では、処理の流れであるPCollection自体は1つのまま要素が流れていきました。 しかし、データの1カラム目だけを見て後続の処理を切り替えるといった処理構造を実現したくなることはあります。

ここでは、入力された文字列の1文字目を使って後続の処理を切り替えるためにPCollectionを分岐してみましょう。

CsvFn.java と同じディレクトリ内に BranchFn.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.TupleTag;

import java.util.*;

public class BranchFn extends DoFn<String, List<String>> {

    static final TupleTag<List<String>> MAIN = new TupleTag<>() {   // 1.
    };

    static final TupleTag<List<String>> SUB = new TupleTag<>() {
    };

    @ProcessElement
    public void processElement(ProcessContext c) {
        var element = c.element();
        var list = Arrays.asList(element.split(","));
        if (list.get(0).equals("M")) {
            c.output(MAIN, list.subList(1, list.size()));       // 2.
        } else {
            c.output(SUB, list.subList(1, list.size()));
        }
    }
}
  1. TupleTagは実行環境全体で一意のIDを付与する必要があります。ここでは、ややトリッキーなテクニックを使ってそれを実現しています。コンストラクタ呼び出しの後ろについている中括弧{}によってインナークラスを作成していることがポイントです。実装の詳細が気になる方は是非コードを読んでみてください。
  2. ProcessContextouputメソッドを呼びだす際に、TupleTagを渡しています。これによって各要素にタグ付けをすることで、PCollectionの分岐を実現しているのです。

PCollectionの分岐をテストする

では、分岐したPCollectionをどのように扱うのかテストコードで確認してみましょう。

CsvFn.java と同じディレクトリ内に BranchFnTest.java というファイルを以下の内容で作成します。

package com.example.dataflow;

import org.apache.beam.sdk.testing.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.List;

@RunWith(JUnit4.class)
public class BranchFnTest {

    static final List<String> values = List.of(
            "M,foo,bar,baz", "S,fo1,ba2,ba3", "M,ba1,ba2,ba3");    // 1.
    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testSimplePipeline() throws Exception {
        var output = pipeline
                .apply(Create.of(values))
                .apply(ParDo.of(new BranchFn())
                        .withOutputTags(BranchFn.MAIN, TupleTagList.of(List.of(BranchFn.SUB))) // 2.
                );

        PAssert.that(output.get(BranchFn.MAIN)).containsInAnyOrder( // 3.
                List.of("foo", "bar", "baz"),
                List.of("ba1", "ba2", "ba3")
        );
        PAssert.that(output.get(BranchFn.SUB)).containsInAnyOrder(  // 4.
                List.of("fo1", "ba2", "ba3")
        );

        pipeline.run().waitUntilFinish();
    }
}
  1. テストデータとして、各要素の先頭に分岐の条件となる MSを配置しています。
  2. ParDoofメソッドを呼びだして得られた変数に対して、withOutputTagsを呼びだすことでこのパイプラインが分岐することを宣言しています。
  3. ここでは二つに分岐していますが、三つや四つ、それよりも多くのPCollectionに分岐できます。
  4. 分岐されたパイプラインからMAINでタグ付けされたPCollectionを取り出しています。1. では文字列の先頭がMになっているものがこれにあたります。
  5. 分岐されたパイプラインからSUBでタグ付けされたPCollectionを取り出しています。1. では文字列の先頭がSになっているものがこれにあたります。

まとめ

Apache Beamを使ったバッチ処理を書く上で最も汎用性の高いParDoを使ったスタイルをいくつか紹介しました。

今日紹介したスタイルは、それぞれ専用のAPIが用意されていますが、必要に応じてAPIを覚えなおすのはやや面倒です。 例えば、型を1:1で変換するなら、MapElementsという専用のAPIがあります。フィルターしたいならFilterがあります。

ParDoには、この記事では紹介しきれなかった便利な機能が他にもありますので是非試してみてください。

Google Dataflowは非常に巨大なデータをバッチ処理するための実行基盤として非常に安価に利用できる上にハイパフォーマンスに動作する環境です。 例えば、筆者の業務ではGCSにおいたログファイルをBigQueryへ投入する手段としてDataflowを利用しています。テラバイトクラスのログファイルが分散処理によって数十分でDBに投入されていく様子は圧巻というほかありません。

この記事を読んだ皆様がDataflowを使って、筆者が受けた感銘を共有していただけたら非常に嬉しいです。


私たちは同じチームで働いてくれる仲間を探しています。今回のエントリで紹介したような仕事に興味のある方、ご応募をお待ちしています。

執筆:@sato.taichi、レビュー:@handa.kentaShodoで執筆されました