มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
in this series
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 2 วาด Beam ให้เป็น flow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 4 ได้เวลา Google Dataflow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 5 Beam functions จัดให้แล้ว
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 6 IO สำเร็จรูป
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 7 IO ที่สร้างได้
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 8 โพย side inputs และการติด tag
ในงานจริง เรามักจะต้องทำ transformation ซับซ้อน แบบที่เขียน function ง่ายๆ ไม่กี่บรรทัดคงไม่ตอบโจทย์ฮะ และให้ code ของเรามันอ่านง่าย เป็นระบบระเบียบเรียบร้อยด้วย ดังนั้น blog นี้เราเลยมาคุยกันถึงการเขียน function และ classe กัน
แล้วใน Apache Beam เค้าเขียนกันยังไงน่ะ?
ParDo
ParDo
เป็น function ใน Apache Beam ที่เอาไว้สั่ง execute transformation class ฮะ ด้วย syntax เริ่มต้นง่ายๆ กันก่อนเลย แบบนี้
Custom class
ที่ transformation class ของเรา ให้มัน inherits beam.DoFn
ก่อนนะฮะ จากนั้นก็ต้องมี function process()
อันนี้คือต้องมีเลยนะ เพราะ Beam จะไปเรียกใช้มันแน่นอน แบบอัตโนมัติด้วย
มันยังมี function อื่นอีก เดี๋ยวเราค่อยๆ ทำความรู้จักไปฮะ
Main script
ทีนี้ในส่วนของ main เราก็เรียกใช้ด้วย beam.ParDo(<class name>())
ตัวอย่างแรก
เรามา reuse ของเก่ากันฮะ
เราอยาก run Beam เพื่อ transform CSV เหมือนครั้งที่แล้วแหละ แต่รอบนี้ เราจะแยกส่วนออกมาเป็น transformation class
ตอนนี้เรามี class CSVToDictFn
ที่ inherit beam.DoFn
มาไว้เรียบร้อย ตาม code ข้างล่างนี้เลยฮะ
กลับไปที่ main ก็จะเรียกใช้ class แบบนี้
ซึ่งเวลา run ก็คือ มันจะไปเรียก function process
เองเลยฮะ ทำให้ได้ผลลัพท์แบบรูปข้างล่างนี้
อย่างนึงที่ควรรู้ คือ beam.ParDo
จะ accumulate ผลลัพท์ ดังนั้น เราควรใช้ yield
เพราะ yield
จะคืนค่าเป็น iterator ของแต่ละ element ในขณะที่ return
จะคืนค่าเป็น iterator ของทุก elements
ถ้าเราใช้ return
ผลลัพท์จะไม่ถูกต้องฮะ เช่นอันนี้ มันแสดงผลลัพท์เป็น key ของ dictionary เท่านั้นเลย
ตัวอย่าง 2 ที่ใช้ parameter
คราวนี้ ลอง advance อีกนิด ด้วยการที่เราจะ parse parameter ไปด้วย ตรงนี้ เราจำเป็นต้อง implement __init__()
ตอนนี้ class CSVToDictFn
ของเรามี __init__()
เพื่อรับตัวแปร schema
สำหรับแปลงค่า CSV ให้เป็น type ต่างๆ กัน
ตอนเรียกใช้ ก็เขียนแบบนี้ฮะ
เราเตรียม schema
ไว้ที่บรรทัด 8 และเรียกใช้ class ที่บรรทัด 23 พร้อมกับ parse schema
ไปพร้อมกัน
logic ของ CSVToDictFn
อันนี้ คือ เราจะมีค่าอยู่ 3 ค่า ได้แก่ field name, field type, และค่าจาก CSV จากนั้นเราจะใช้ zip
เพื่อ map เป็น dictionary
ตรงนี้ วาดเป็น diagram ได้ประมาณนี้ฮะ