มาใช้ Apache Beam กันเถอะ – ตอนที่ 6 IO สำเร็จรูป
in this series
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 2 วาด Beam ให้เป็น flow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 4 ได้เวลา Google Dataflow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 5 Beam functions จัดให้แล้ว
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 6 IO สำเร็จรูป
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 7 IO ที่สร้างได้
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 8 โพย side inputs และการติด tag
Apache Beam มี package หลายตัวให้เราเอาไว้จัดการ input/output ซึ่งเราก็แค่ import แล้วเรียกใช้งานมันให้ถูกต้องก็พอแล้ว
ใน blog นี้ ผมขอเล่าถึง 3 IO (input/output) module ที่ได้ใช้บ่อยๆ กันฮะ
1. Text (Google Cloud Storage)
basic สุดละ กับพวก text file เนี่ย
Beam มี beam.io
library ที่เราน่าจะคุ้นกันอยู่แล้ว และข้างในก็มี function ReadFromText()
และ WriteToText()
เพื่ออ่านและเขียน text file
สามารถใช้อ่านเขียน file ใน Google Cloud Storage ได้เหมือนกันฮะ เพราะมันก็คือ text file เหมือนกันนั่นแหละ
- บรรทัด 14:
ReadFromText()
เพื่ออ่านinput_file
ที่เรารับมาจากargparse
- บรรทัด 16:
WriteToText()
เพื่อเขียนไปที่output_file
pipeline นี้เอามาวาด diagram ได้ประมาณนี้ฮะ
2. Database (Google BigQuery)
ถัดจาก Google Cloud Storage ก็เป็น Google BigQuery ที่ผมใช้บ่อยไม่แพ้กันฮะ
- บรรทัด 14:
ReadFromBigQuery()
แล้วส่งค่าที่query=
เพื่อให้มัน query ตามที่กำหนด - บรรทัด 18: เติม
temp_dataset=
ให้ Beam เขียน temporary data ที่มันจะ auto-generate ไปไว้ใน dataset นี้นะ
temp_dataset=
, Beam จะสร้าง dataset ใหม่ขึ้นมาเรื่อยๆ ทุกครั้งที่เราสั่ง run เลยฮะจาก pipeline ได้ diagram ประมาณนี้ฮะ
3. Messaging (Google Cloud Pub/Sub)
ทีนี้ มาที่ real-time กันบ้างฮะ
Google Cloud Pub/Sub ก็สามารถ integrate กับ Beam ได้ง่ายๆ เหมือนกันฮะ ซึ่งเรากำหนดได้ว่าจะไปต่อกับ publisher หรือ subscriber แล้วแต่การออกแบบ โดยตัวอย่างนี้ ผมจะให้ Beam ไปอ่านข้อมูลจาก subscriber ก่อนทำ transform แล้วส่งไปที่ topic
- บรรทัด 10: set option
streaming=True
ให้ Beam run แบบ streaming pipeline ฮะ - บรรทัด 15:
ReadFromPubSub()
ให้ไปอ่านค่าจาก subscriber ที่กำหนด - บรรทัด 26: หลัง transform ก็ส่งค่าไปที่ topic ด้วย
WriteToPubSub()
WriteToPubSub()
ให้แปลง PCollection เป็น byte string ด้วยนะฮะ เช่น การใช้.encode()
เรามาลอง publish ข้อความลง topic แล้ว pull จาก subscription ปลายทางดูกันฮะ
สังเกตว่า originMessageId
มีค่าตรงกัน เนื่องจากเรา parse มาจาก topic ในตอนที่ทำ transform ฮะ
Beam pipeline ตัวอย่างนี้ ทำงานประมาณนี้ฮะ
Repository
References
- Beam IO: Text https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html
- Beam IO: Google BigQuery https://beam.apache.org/releases/pydoc/2.36.0/apache_beam.io.gcp.bigquery.html
- Beam IO: Google BigQuery - autodetect schema https://stackoverflow.com/a/67643669
- Beam IO: Google Cloud Pub/Sub https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.io.gcp.pubsub.html