มาใช้ Apache Beam กันเถอะ – ตอนที่ 8 โพย side inputs และการติด tag
in this series
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 2 วาด Beam ให้เป็น flow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 4 ได้เวลา Google Dataflow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 5 Beam functions จัดให้แล้ว
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 6 IO สำเร็จรูป
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 7 IO ที่สร้างได้
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 8 โพย side inputs และการติด tag
คิดว่า ในสถานการณ์จริง เราน่าจะต้องมีเขียน logic ยุ่บยั่บอะไรในงานเราแน่ๆ ฮะ โลกเรามันไม่ได้ตรงไปตรงมาอะไรขนาดนั้น และเช่นเดียวกัน Apache Beam pipeline ของเราก็อาจจะต้องเติม condition ซับซ้อนก็เป็นได้
ดังนั้น ใน blog นี้ เราจะมาดูกันว่า ถ้าเราต้องออกแบบ pipeline ที่ต้องใช้ logic ซับซ้อนซ่อนเงื่อน เราจะทำยังไงได้บ้าง ให้มันมีผลลัพท์ที่ถูกต้อง และอ่านง่าย maintain ได้ง่าย
โจทย์ของเรา
ตั้งต้นด้วย เรามีข้อมูลหนังสือใน CSV:
แล้วก็ยังมี ISBN ของหนังสือที่โดน ban:
ให้เราแยกกลุ่มหนังสือเป็นสี่กลุ่มตามนี้:
- "BANNED": หนังสือเล่มไหนที่โดน ban อยู่ และ publish หลังปี 1970 ให้อยู่ในกลุ่มนี้ เพื่อจัดส่งไปห้องสมุด "Midnight library" แต่ถ้าเก่ากว่านั้นให้ปลด ban ได้เลย
- "ANTIQUE": เล่มไหนที่ publish ก่อนหน้าจนถึงปี 1970 ให้อยู่ในกลุ่มนี้เพื่อส่งไปที่ "Archive common library"
- "MODERATE": เล่มไหนที่ publish ช่วงปี 1971 ถึง 2017 ให้อยู่ในกลุ่มนี้ ส่งไปยัง "Central library"
- "MODERN": เล่มไหนที่ publish ตั้งแต่ปี 2018 ให้อยู่กลุ่มนี้ และจะส่งไป "New Bloom library"
Side inputs
เรามาทำความรู้จักกับ side inputs กันสักนิดนะฮะ
Side inputs คือ ชุดข้อมูลที่เราเติมเข้าไปให้ PTranform แบบตรงๆ เลย ใช้ในกรณีที่เรามีค่าคงที่ หรือ fixed dataset อะไรสักอย่าง แล้วใช้ประโยชน์จากมันในตอนทำ transformation ฮะ
ทีนี้ เราจะใช้ข้อมูลหนังสือโดน ban เนี่ยแหละ มาเป็น side inputs ล่ะ
เราสามารถเขียนแบบนี้ ให้ function สามารถรับ side inputs ได้ฮะ
เราต้องการให้เติม key is_banned
ขึ้นมาใน PCollection ถ้าเกิดว่า ISBN มันไปอยู่ใน banned_book_list
เพื่อบอกว่าเล่มนี้น่ะโดน ban นะ
จากนั้นก็เรียกใช้ ทำได้ 2 วิธี
- ประกาศ side inputs ก่อนเริ่ม pipeline
- ประกาศ side inputs ข้างใน pipeline
beam.pvalue.AsIter()
ถ้าเป็นตัวแปร iterator เช่น list หรือ beam.pvalue.AsSingleton()
ถ้าเป็นตัวแปรเดี่ยว เช่น integer เพราะ side inputs ไม่สามารถเป็น PCollection ได้หลังเรียกใช้แล้ว เราควรจะได้ผลลัพท์แนวๆ นี้ฮะ
Tagging
ปัก flag ban ลงในหนังสือแต่ละเล่มแล้ว ทีนี้ก็ได้เวลาจัดกลุ่มมันแล้วล่ะฮะ
Beam มี feature ให้เราติด tag ใน element ของ PCollection ได้ตามที่เราต้องการเลย
ใช้คำสั่ง yield beam.pvalue.TaggedOutput(self.<tag>, element)
เพื่อ return tag และ element ที่เรา transform แล้ว ทีนี้ เวลาเรียกใช้ก็เขียนทำนองนี้ฮะ
ดูบรรทัดที่ 12-16 นะฮะ เราใช้ syntaxbeam.ParDo(<DoFn>).with_outputs(<tags>)
เพื่อบอกว่า ทำ DoFn
แล้วมี tag พวกนี้นะ
จากนั้น บรรทัด 21 ก็เป็นตัวอย่างว่า เราสามารถเลือกสัก tag นึงมาทำอะไรต่อไป ด้วย syntax PCollection[<tag>]
พอเราทำเสร็จ หน้าตาของ element จะเป็นแบบนี้
เขียนลง file
เสร็จสิ้นกระบวนการทั้ง side inputs และ tagging เราเอาไปเขียนลง file ดีกว่า
ตัวอย่างหนังสือกลุ่ม "BANNED"
ตัวอย่างหนังสือกลุ่ม "MODERN"
Repo
References
- Side input patterns https://beam.apache.org/documentation/patterns/side-inputs/
- apache_beam.pvalue module https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.pvalue.html