Cloud Dataflow Fundamentals

29 May 2017

Profile

Cloud Dataflow Overview

大雑把な雰囲気

IO

Process

Pipeline

Pipelene全体を表すクラス
処理を定義していき、最後にrun()を呼ぶ

public static void main(String[] args) {
        DatastoreToBigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DatastoreToBigQueryOptions.class);
        Pipeline p = Pipeline.create(options);
        ...
        // Input from Datastore
        PCollection<Entity> entities = p
                .apply(DatastoreIO.v1().read().withProjectId(options.getProject()).withQuery(query));
        PCollection<TableRow> rows = entities.apply(new EntityToTableRow());
        ...
        // Output to BigQury
        rows.apply(BigQueryIO.Write.named("Write").to("training-topgate-dev:output.output_table").withSchema(schema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

        p.run();
    }

PCollectipn

PCollection<T>

// for Example
PCollection<String>
PCollection<Entity>
PCollection<TableRow>

PTransform

データのInput, Outputを決めるクラス

// Entityの配列を受け取って、TableRowの配列を返すExample
public static class EntityToTableRow 
    extends PTransform<PCollection<Entity>,PCollection<TableRow>> {

    @Override
    public PCollection<TableRow> apply(PCollection<Entity> entities) {

        PCollection<TableRow> rows = entities.apply(ParDo.of(new EntityToTableRowFn()));

        return rows;
    }
}

DoFn

1つのValueの処理を行うクラス

// Entityを受け取り、TableRowを返すExample
public static class EntityToTableRowFn extends DoFn<Entity, TableRow> {
    @Override
    public void processElement(ProcessContext c) {

        TableRow row = new TableRow();
        // c.element()でInput Valueを受け取る
        row.set("__key__", c.element().getKey().toString());
        ...
        // c.output()にOutput Valueを渡す
        c.output(row);
    }
}

大まかな流れ

Group By

Group Byを行うにはGroup化する値をKeyにしたMapを生成して行う

public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Side IO

複数のInputを束ねる時や、データを複数にOutputする時に使う

ParDo.withOutputTags(packageObjects, TupleTagList.of(invalidRecord)).of(new DoFn<String, KV<String, String>>() {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(ProcessContext c) {
        Tokenizer tokenizer = new Tokenizer();
        List<Token> tokens = tokenizer.tokenize(c.element());
        for (Token token : tokens) {
            if (token.getAllFeaturesArray().length < 1 || token.getAllFeaturesArray()[0].equals("記号")) {
                // invalid dataをside outputに逃がす
                c.sideOutput(invalidRecord, token.getSurface());
            } else {
                c.output(KV.of(token.getAllFeaturesArray()[0], token.getSurface()));
            }
        }
    }
}).named("Tokenizer"));

Example

Batch vs Streaming

Batch

Batch Overview

起動

起動 (Dataflow Template) 1/2

Cloud Storage上にデプロイしておき任意のタイミングで起動する機能

mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \
              -Dexec.args="--project=cpb101demo1 \
              --stagingLocation=gs://cpb101demo1/staging \
              --inputTable=cpb101demo1:samples.table \
              --dataflowJobFile=gs://cpb101demo1-df-template/BigQueryToDatastore201702021500 \
              --runner=TemplatingDataflowPipelineRunner"
gcloud beta dataflow jobs run {job_name} \
        --gcs-location gs://{your Dataflow Template Path}

起動 (Dataflow Template) 2/2

起動時に実行時パラメータを渡す場合、Template作成時に定義しておく

public interface BigQueryToDatastoreOptions extends PipelineOptions {

    @Description("Path of the bigquery table to read from")
    @Default.String("cpb101demo1:samples.table")
    ValueProvider<String> getInputTable();

    void setInputTable(ValueProvider<String> value);
}
gcloud beta dataflow jobs run {job_name} \
        --gcs-location gs://{your Dataflow Template Path} --parameters inputTable=cpb101demo1:samples.table

Streaming

Streaming Overview

Window

UnitTest

UnitTest Class

Example for DoFn UnitTest

@Test
public void testEntityToTableRowFn() {
    final String KIND = "SampleKind";
    final String ID = "hogeId";
    final String COLUMN_CONTENT = "content";
    final String COLUMN_CONTENT_VALUE = "hogeContent";

    DoFnTester<Entity, TableRow> extractEntityToTableRowFn = DoFnTester.of(new EntityToTableRowFn());

    ...

    List<TableRow> rows = extractEntityToTableRowFn.processBatch(entity);
    Assert.assertThat(rows.size(), CoreMatchers.is(1));
    Assert.assertTrue(rows.get(0).containsKey(COLUMN_CONTENT));
    Assert.assertThat(rows.get(0).get(COLUMN_CONTENT).toString(), CoreMatchers.is(COLUMN_CONTENT_VALUE));
}

Price

Dataflow vs Compute Engine

実際バッチ処理を動かすとDataflowの方が安くなる

We are hiring

Resource

Advertise

Thank you

TOPGATE GAEマイスター