คิดว่า ในสถานการณ์จริง เราน่าจะต้องมีเขียน logic ยุ่บยั่บอะไรในงานเราแน่ๆ ฮะ โลกเรามันไม่ได้ตรงไปตรงมาอะไรขนาดนั้น และเช่นเดียวกัน Apache Beam pipeline ของเราก็อาจจะต้องเติม condition ซับซ้อนก็เป็นได้

ดังนั้น ใน blog นี้ เราจะมาดูกันว่า ถ้าเราต้องออกแบบ pipeline ที่ต้องใช้ logic ซับซ้อนซ่อนเงื่อน เราจะทำยังไงได้บ้าง ให้มันมีผลลัพท์ที่ถูกต้อง และอ่านง่าย maintain ได้ง่าย


โจทย์ของเรา

ตั้งต้นด้วย เรามีข้อมูลหนังสือใน CSV:

แล้วก็ยังมี ISBN ของหนังสือที่โดน ban:

ให้เราแยกกลุ่มหนังสือเป็นสี่กลุ่มตามนี้:

  • "BANNED": หนังสือเล่มไหนที่โดน ban อยู่ และ publish หลังปี 1970 ให้อยู่ในกลุ่มนี้ เพื่อจัดส่งไปห้องสมุด "Midnight library" แต่ถ้าเก่ากว่านั้นให้ปลด ban ได้เลย
  • "ANTIQUE": เล่มไหนที่ publish ก่อนหน้าจนถึงปี 1970 ให้อยู่ในกลุ่มนี้เพื่อส่งไปที่ "Archive common library"
  • "MODERATE": เล่มไหนที่ publish ช่วงปี 1971 ถึง 2017 ให้อยู่ในกลุ่มนี้ ส่งไปยัง "Central library"
  • "MODERN": เล่มไหนที่ publish ตั้งแต่ปี 2018 ให้อยู่กลุ่มนี้ และจะส่งไป "New Bloom library"

Side inputs

เรามาทำความรู้จักกับ side inputs กันสักนิดนะฮะ

Side inputs คือ ชุดข้อมูลที่เราเติมเข้าไปให้ PTranform แบบตรงๆ เลย ใช้ในกรณีที่เรามีค่าคงที่ หรือ fixed dataset อะไรสักอย่าง แล้วใช้ประโยชน์จากมันในตอนทำ transformation ฮะ

ทีนี้ เราจะใช้ข้อมูลหนังสือโดน ban เนี่ยแหละ มาเป็น side inputs ล่ะ

เราสามารถเขียนแบบนี้ ให้ function สามารถรับ side inputs ได้ฮะ

เราต้องการให้เติม key is_banned ขึ้นมาใน PCollection ถ้าเกิดว่า ISBN มันไปอยู่ใน banned_book_list เพื่อบอกว่าเล่มนี้น่ะโดน ban นะ

จากนั้นก็เรียกใช้ ทำได้ 2 วิธี

  1. ประกาศ side inputs ก่อนเริ่ม pipeline
  1. ประกาศ side inputs ข้างใน pipeline
✳️
ถ้าเราเลือกประกาศข้างใน pipeline ให้ใช้ beam.pvalue.AsIter() ถ้าเป็นตัวแปร iterator เช่น list หรือ beam.pvalue.AsSingleton() ถ้าเป็นตัวแปรเดี่ยว เช่น integer เพราะ side inputs ไม่สามารถเป็น PCollection ได้

หลังเรียกใช้แล้ว เราควรจะได้ผลลัพท์แนวๆ นี้ฮะ


Tagging

ปัก flag ban ลงในหนังสือแต่ละเล่มแล้ว ทีนี้ก็ได้เวลาจัดกลุ่มมันแล้วล่ะฮะ

Beam มี feature ให้เราติด tag ใน element ของ PCollection ได้ตามที่เราต้องการเลย

ใช้คำสั่ง yield beam.pvalue.TaggedOutput(self.<tag>, element) เพื่อ return tag และ element ที่เรา transform แล้ว ทีนี้ เวลาเรียกใช้ก็เขียนทำนองนี้ฮะ

ดูบรรทัดที่ 12-16 นะฮะ เราใช้ syntaxbeam.ParDo(<DoFn>).with_outputs(<tags>) เพื่อบอกว่า ทำ DoFn แล้วมี tag พวกนี้นะ

จากนั้น บรรทัด 21 ก็เป็นตัวอย่างว่า เราสามารถเลือกสัก tag นึงมาทำอะไรต่อไป ด้วย syntax PCollection[<tag>]

พอเราทำเสร็จ หน้าตาของ element จะเป็นแบบนี้


เขียนลง file

เสร็จสิ้นกระบวนการทั้ง side inputs และ tagging เราเอาไปเขียนลง file ดีกว่า

ตัวอย่างหนังสือกลุ่ม "BANNED"

ตัวอย่างหนังสือกลุ่ม "MODERN"


Repo

sample-beam/08-tag-n-side-inputs at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

References