มาใช้ Apache Beam กันเถอะ – ตอนที่ 4 ได้เวลา Google Dataflow
in this series
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 2 วาด Beam ให้เป็น flow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 4 ได้เวลา Google Dataflow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 5 Beam functions จัดให้แล้ว
3 part ก่อนหน้านี้ เรา run Apache Beam pipeline บนเครื่องของเราเอง ทีนี้ เรามาลอง run บน Cloud กันบ้างฮะ แน่นอน ก็ต้องเป็น Google Cloud Platform นี่แหละ ซึ่งเราจะได้ใช้ Google Dataflow กัน
Google Dataflow คืออะไร
Google Dataflow เป็น service นึงของ GCP ที่สามารถทำงานกับ Apache Beam ได้ โดยมันจะสร้าง VM instance มาเป็น worker node เพื่อ run pipeline ที่เขียนเอาไว้ แล้วเราก็สามารถกำหนดรายละเอียดได้อีกว่า จะเอา RAM เท่าไหร่ CPU กี่ตัว เชื่อมต่อ network ยังไง และอีกเพียบฮะ
ตอนที่เราจะ deploy Beam pipeline ไปที่ Dataflow เนี่ย ก็มีหลายวิธีฮะ รวมถึงจะ integrate กับ GCP service อื่นๆ เช่น process files ใน Google Cloud Storage หรือรับ message จาก Google Pub/Sub หรือ ให้เขียนข้อมูลลง Google Cloud Firestore ก็ทำได้จบในตัวฮะ
สำหรับ part นี้ เราทำแบบง่ายๆ ให้พอเข้าใจกันก่อน โดยเราจะยก part ที่แล้ว ก็คือ อ่านและเขียน file นั่นแหละ แต่ให้ Dataflow เป็นคนทำแทน
Runner
Core concept อยู่ตรงนี้ ถ้าเราจะใช้ Google Dataflow เราต้องกำหนด runner ให้เป็น DataflowRunner
ซึ่งกำหนดใน beam.Pipeline()
หรือ python -m
command ก็ได้ฮะ
แต่ก่อนอื่นเราต้องเปิดใช้ Dataflow API ก่อนนะ

เอาล่ะ เปิดแล้ว ก็ไปดูตัวอย่างกัน
ตัวอย่าง 1: สั่งตรงจาก local
อันนี้เบสิคสุด
เตรียม file
เราให้ pipeline ของเราหยิบมาแต่คนที่เป็นเพศ "F"
จะเห็นว่าใน function run_beam()
เรากำหนด GCS file ที่จะอ่านและเขียนเอาไว้ที่บรรทัด 30-31 และก็กำหนด runner DataflowRunner
ไว้ที่บรรทัด 37
สั่ง run
พอจะ run ที่ Dataflow ให้เราสั่งผ่าน Terminal
- flag
--region
คือ region ของ Dataflow job - flag
--runner
ก็คือDataflowRunner
แต่ไม่ต้องใส่ก็ได้ เพราะเรากำหนดใน code แล้ว - flag
--project
คือ project ของ job - flag
--temp_location
คือ GCS folder เอาไว้เก็บ temporary file ที่ถูกสร้างโดย Dataflow job
ดูผลลัพท์
รอแปบนึง ก็จะเห็นสีเขียวๆ บอกว่า job เราทำงานเสร็จสิ้นฮะ

สังเกตมั้ย ว่าใน code มี import csv
อยู่ข้างใน function mapToDict()
เคยให้ import ไว้บนสุด และเจอ error แล้วผมเข้าใจว่ามันเกิดจาก library ที่ import ไปนั้นมันไม่มีอยู่ใน worker node ทำให้ต้อง import ใน function ฮะ

file มาครบแล้ว

ข้อมูลถูกต้อง ผ่านฮะ

ตัวอย่าง 2: Container image
วิธีแรกมันง่าย แต่ใช้ได้กับพวก code เล็กๆ จบใน file เดียว ถ้ามีหลาย file เป็น code ขนาดใหญ่ขึ้นมาล่ะ
เราใช้ container image เพื่อรวม source code ไว้เป็นก้อนเดียว และนี่ก็เป็นข้อดีของ image ด้วยฮะ
เตรียม file
folder structure แบบนี้ฮะ
file CSVToDict
file main.py
คราวนี้ เราเอาแค่คนที่เป็นเพศ "M" เท่านั้น
และ Dockerfile.
รอบนี้เรามี requirements.txt
เป็น file เปล่าๆ โล่งๆ เพราะไม่ต้องการ library ใดๆ ฮะ
เตรียม container image
ทีนี้ เราก็มาเตรียม image ของเรา แล้วเอาไปไว้ที่ Google Artifact Registry.
ก่อนอื่นก็ต้องสร้าง repo ใน Google Artifact Registry
gcloud artifacts repositories create sample-beam \
--repository-format=docker \
--location=europe-west1 \
--async
สั่ง run
มี repo พร้อมแล้วง เราก็สามารถ build และ push image ก่อนจะสร้าง Beam pipeline จาก image
- flag
--region
คือ region ของ Dataflow job - flag
--runner
ก็คือDataflowRunner
แต่ไม่ต้องใส่ก็ได้ เพราะเรากำหนดใน code แล้ว - flag
--project
คือ project ของ job - flag
--temp_location
คือ GCS folder เอาไว้เก็บ temporary file ที่ถูกสร้างโดย Dataflow job - flag
experiments=use_runner_v2
หมายถึง เราเปิดใช้ Dataflow Runner V2 - flag
sdk_container_image
คือ ที่อยู่ของ image ที่เราใช้ทำ this Beam job
ดูผลลัพท์
หน้าตาเขียวๆ แบบนี้แหละดีแล้ว

ที่มุมล่างขวา จะเห็น image ตรง "sdk_container_image" ข้างใต้ "Pipeline options" นะฮะ เอาไว้ตรวจสอบทีหลังได้แหละ
ผลลัพท์ที่ต้องการคือ คนเพศ "M" เท่านั้น ผ่านล่ะ

ถ้าอยากอ่านเกี่ยวกับ Google Artifact Registry เพิ่มเติม ดูได้ตามลิงก์นี้ฮะ
ตัวอย่าง 3: Container image พร้อมรับ parameter
บางที เราก็อยากให้รับ parameter แทนที่จะ hard-code มัน ก็สามารถทำได้ โดยใช้ Argparse
ฮะ ซึ่งเป็น library สำหรับจัดการ input จาก command line
เตรียม file
ที่ main.py
ของเดิม เราแก้ให้รับ file path argument (บรรทัด 12-17) และขอปรับนิดนึงให้เลือกเฉพาะคนที่มี "id" เป็นเลขคี่ (บรรทัด 37)
สั่ง run
ทำเหมือนเดิมเลย แต่มี parameter เพิ่มเข้าไปอีกสองตัวนะฮะ
- flag
--region
คือ region ของ Dataflow job - flag
--runner
ก็คือDataflowRunner
แต่ไม่ต้องใส่ก็ได้ เพราะเรากำหนดใน code แล้ว - flag
--project
คือ project ของ job - flag
input_file
และoutput_file
ก็เป็น parameter ของ image ตัวนี้ - flag
--temp_location
คือ GCS folder เอาไว้เก็บ temporary file ที่ถูกสร้างโดย Dataflow job - flag
experiments=use_runner_v2
หมายถึง เราเปิดใช้ Dataflow Runner V2 - flag
sdk_container_image
คือ ที่อยู่ของ image ที่เราใช้ทำ this Beam job
ดูผลลัพท์
ผ่านเรียบร้อย เขียวอื๋อ สังเกตว่า ที่ "Pipeline options" มีค่าของ "input_file" กับ "output_file" แสดงอยู่นะ

file ผลลัพท์ก็แสดงคนที่มี id เลขคี่ออกมาถูกต้องด้วยแหละ

สำหรับ Argparse
สามารถไปตามลิงก์นี้ อ่านเพิ่มเติมได้ฮะ
Git Repo
แปะลิงก์ github ของ part นี้เอาไว้ข้างล่างเลยฮะ
References
- Google Dataflow https://cloud.google.com/dataflow
- Run pipeline on the Dataflow service https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python#run-the-pipeline-on-the-dataflow-service
- Name error https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
- Pipeline options https://cloud.google.com/dataflow/docs/reference/pipeline-options
- Dataflow Runner V2 https://cloud.google.com/dataflow/docs/runner-v2