มาใช้ Apache Beam กันเถอะ – ตอนที่ 5 Beam functions จัดให้แล้ว
in this series
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 2 วาด Beam ให้เป็น flow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 4 ได้เวลา Google Dataflow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 5 Beam functions จัดให้แล้ว
Apache beam มี transformation มาให้เราใช้หลายอย่างตั้งแต่แรกแล้วฮะ ดังนั้น สำหรับ blog นี้ เรามาดูกันว่า มี function ไหนน่าสนใจ เอาไปใช้กับงานของเราได้บ้างนะ
เตรียมของก่อน
ใน blog นี้เราจะใช้ CSV ข้อมูลคน 30 คนฮะ
เป็นข้อมูล ID, ชื่อ, เพศ, อาชีพ, ทีม, และอายุ
แล้วก็ reuse ของเก่า เป็น method transform CSV to dict ฮะ
ตัวอย่าง 1: สาวๆ อยู่ทีมไหน
เราอยากรู้ว่า แต่ละทีมมีผู้หญิงเป็นใครบ้าง

ลำดับความคิดของโจทย์นี้
- อ่าน file แล้ว
beam.ParDo()
เพื่อ transform เป็น dict - ใช้
beam.Filter()
เอาเฉพาะผู้หญิง beam.GroupBy()
เพื่อ group ตาม "team"beam.Map()
กับ functioncustom_print()
เพื่อแสดงผลทีมและคนในทีม
beam.GroupBy()
ต้องการ function เพื่อกำหนดว่า element นี้จะจัด group จาก property ไหนฮะ
ตัวอย่าง 2: หญิงเท่าไหร่ ชายกี่คน
เราอยากรู้ว่า ใน list อันนี้ มีผู้ชายและผู้หญิงอย่างละกี่คนกันนะ

ลำดับความคิดของโจทย์นี้
- อ่าน file แล้ว
beam.ParDo()
เพื่อ transform เป็น dict - Group จากเพศด้วย
beam.Partition()
- สำหรับกลุ่มผู้ชาย นับจำนวนด้วย
beam.combiners.Count.Globally()
แล้วแสดงผล - ทำเหมือนกันกับกลุ่มผู้หญิง
beam.Partition()
ต้องการ function เพื่อกำหนดตัวเลขให้แต่ละ element ว่าจะไปอยู่ใน partition ไหน
สำหรับเคสนี้ เราให้ตัวเลขอิงจาก index ของตัวแปรเพศ นั่นก็คือgenders.index()
beam.combiners.Count.Globally()
คืนค่าเป็นตัวเลขจำนวน element ใน PCollection
เราใช้ branching ในตัวอย่างนี้ ซึ่งทำได้โดยใส่วงเล็บแยกกับ PCollection ที่เป็นผลลัพท์จาก block ก่อนหน้า
นั่นคือ male_people
กับ female_people
PCollection จาก PTransform ก่อนหน้าเนี่ย เราใช้มันเป็น PCollection เริ่มต้นของ block ต่อไปฮะ
วาด DAG ได้แบบนี้นะ

วิธีสร้าง DAG graph แบบนี้ หาอ่านได้ที่ตอนที่ 2 นะฮะ
ตัวอย่าง 3: อาชีพไหนฮิต
เราอยากรู้ว่า อาชีพไหน มีจำนวนเท่าไหร่ โดยเอาแค่ชื่ออาชีพแค่คำเดียว

ลำดับความคิดของโจทย์นี้
- อ่าน file แล้ว
beam.ParDo()
เพื่อ transform เป็น dict - จากค่า "อาชีพ" ให้แยก (split) คำย่อยๆ ด้วย
beam.FlatMap()
และstr.split()
- เลือกแค่ตัวอักษรและตัวเลข (alphanumeric) ด้วย
beam.Regex.replace_all()
- แปลงเป็นตัวพิมพ์เล็กด้วย
beam.Map()
- คัดคำที่มีความหมายเป็นอาชีพ ด้วย
beam.Regex.matches()
- Group และนับด้วย
beam.combiners.Count.PerElement()
beam.FlatMap()
รับ function ที่คืนค่าเป็น iterable แล้วแปลงเป็น PCollection
จากตัวอย่าง เรามี string"Administrator, charities/voluntary organisations"
แล้วแยกเป็น["Administrator", "charities/voluntary", "organisations"]
มันก็จะกลายเป็น PCollectionbeam.Regex.replace_all()
ใช้ Regex เพื่อแทนที่ทุกคำในข้อความต้นฉบับให้เป็นค่าที่ต้องการ
ในตัวอย่างนี้ เราแทนที่ทุกอักขระที่ไม่ใช่ตัวเลขหรือตัวอักษร (non-alphanumeric) ด้วย regexr"[^\w\d]"
ให้เป็น empty string ผลลัพท์ก็เหมือนเราลบค่าพวกนั้นทิ้งไปฮะbeam.Regex.matches()
เลือก element ด้วย Regex
ตัวอย่างนี้เราต้องการชื่ออาชีพ เลยคัดที่ขึ้นต้นด้วยตัวอักษร 4 ตัวแรก แล้วจบด้วย "ist", "er", "or", "ian", หรือ "ant" ฮะ-
beam.combiners.Count.PerElement()
คืนค่าเป็น PCollection ของ element ที่ไม่ซ้ำกัน (unique) พร้อมนับจำนวนให้ (number of occurrences)
มีบล็อกเกี่ยวกับ Regex หากสนใจฮะ
ตัวอย่าง 4: ผู้อาวุโสประจำกลุ่ม
เราอยากรู้ว่า ในแต่ละทีม ใครแก่สุด แยกชายหญิง

ลำดับความคิดของโจทย์นี้
- อ่าน file แล้ว
beam.ParDo()
เพื่อ transform เป็น dict - แปลงเป็น tuple ด้วย
beam.Map()
ใน format(team-gender, age)
- Group ด้วย
beam.CombinePerKey()
และให้ functionmax
เพื่อหาage
ที่มากที่สุด
beam.CombinePerKey()
ต้องการ PCollection ที่มี key และ value
ในตัวอย่างนี้ เรากำหนด key เป็นteam-gender
และให้ value คือage
จากนั้นตัว function จะ group key เข้าด้วยกัน แล้วคำนวณหาmax
ของ value จะได้เป็น unique keys ที่มีmax
ของage
นั่นเองฮะ
Repo
ตัวอย่างทั้งหมด มีใน repo ตามลิงก์ด้านล่างนี้ฮะ
Reference
