มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
in this series
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 2 วาด Beam ให้เป็น flow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
สวัสดีคร้าบ~
ขึ้นซีรี่ส์ใหม่ของ "มาใช้กันเถอะ" ฮะ เราจะมาคุยกันถึงเจ้าเครื่องมือทำ transformation ที่หลายๆ ที่นิยมใช้กัน นั่นก็คือ Apache Beam นี่เองงงงงงง
Apache Beam คือ ?
Apache Beam เป็น tool ตัวนึงจาก Apache ไว้ใช้ประมวลผลข้อมูลได้ทั้งแบบ batch และ real-time ฮะ จุดเด่นของมัน ความที่มันเป็น open-source และใช้งานได้ในหลายภาษาหลัก เช่น Java, Python, และ Go
มันอยู่ในตัว "T" - Transform ของ "ETL" นั่นล่ะฮะ แบบ transform จากไข่ไก่ เป็นไข่ดาวให้ลูกค้าเราทานนั่นเอง

และแน่นอน เราจะมาลองใช้กัน ในภาษา Python
Concept ของตอนนี้
จำเป็นต้องรู้จักพื้นฐานกันนิดนึงฮะ
Terms
Apache Beam มี term อยู่สามตัวหลักๆ
Pipeline
: คือกระบวนการทำงานทั้งหมดที่เรากำหนดPCollection
: เป็นชุดข้อมูล แบ่งได้เป็น bounded, คือมีจำนวนข้อมูลที่จำกัด นั่นคือ batch และอีกแบบนึงคือ unbounded, เป็นข้อมูลที่มีจำนวนไม่จำกัด หรือก็คือ streaming นั่นเองฮะPTranform
: คือ operation ที่เราเอาไว้ใช้ transform ฮะ
Pipeline
จะเริ่มต้นด้วย PCollection
ไม่ว่าจะเป็นแบบ bounded หรือ unbounded datasource ก็ตาม จากนั้นก็มีทำ transform ด้วย PTransform
ไปจนจบกระบวนการฮะ

Runner
Apache Beam ต้องการ "runner" เพื่อใช้for execute task ฮะ ตัว runner เองก็มีหลายตัว ขึ้นอยู่กับ platform และ engine ที่เราใช้ run
รายละเอียดอยู่ในลิงก์นี้ฮะ

ในตอนนี้ เราจะใช้ Python Direct Runner ซึ่งเป็นตัวที่ใช้งานง่ายมากๆ ตัวนึงล่ะฮะ
เริ่มลงมือ
โจทย์ของตอนนี้ คือ อ่าน CSV file และทำ transformation ฮะ
1. ติดตั้ง Apache Beam
เริ่มจากเตรียม environment และติดตั้ง Apache Beam ก่อนเลย
pip install apache-beam
2. เตรียม CSV file
ติ๊ต่างไว้ว่าข้อมูลใน CSV เป็นแบบนี้ฮะ
3. อ่าน file
ทีนี้ เราก็สามารถสั่ง Apache Beam ให้อ่าน file นั่น เขียนประมาณนี้ฮะ
เพราะเราต้องการให้มันอ่านบนเครื่องเราเอง เลยกำหนดให้ parameter runner
เป็น DirectRunner()
หรือจะใช้ string 'DirectRunner'
ตรงๆ เลยก็ได้ฮะ
จากนั้น เราก็เพิ่ม step ต่อไปด้วยการใช้ pipe ( |
) ตามด้วยbeam.io.ReadFromText()
เพื่ออ่านข้อความจาก file
ส่วนตัวผมจะใช้ beam.Map(print)
เพื่อ debug ฮะ ตรงนี้คือการสั่งให้ Beam ไปทำ PTransform
ในที่นี้คือ print
เพื่อแสดงค่า ผ่านคำสั่ง beam.Map()
และมันจะรับ PCollection
จาก step ก่อหน้ามาเป็น parameter ฮะ ผลก็คือ มันจะแสดงค่าที่เราอ่านจาก file นั่นเองฮะ

4. Map to dict
ทีนี้ เรามี function mapToDict()
ซึ่งรับ string มาแปลงเป็น dict เราก็เรียกมันด้วย beam.Map(mapToDict)
แล้ว print ค่าดูอีกทีนึง

5. Filter
เอาล่ะ เราต้องการเฉพาะผู้หญิง (F
) เลยเติมไปอีกหนึ่ง step beam.Filter()
เราก็จะได้ PCollection
ตามที่กำหนดเงื่อนไขเอาไว้ฮะ

6. Map to CSV row
ตอนนี้เราพอใจล่ะ ต้องการจะเขียนผลลัพท์ลง file เลยเขียน function เพื่อ transform dict
เป็น str
ในรูปแบบ CSV
method mapToCSVRow
มีหน้าที่แปลงเป็น string ตามที่ว่าฮะ

7. เขียนลง CSV file
สุดท้ายแล้ว เราก็ใช้ beam.io.WriteToText()
สั่งให้เขียน PCollection
จาก PTransform
อันล่าสุดลงไปใน file
ตรงนี้เราจะเพิ่ม shard_name_template=""
เพื่อไม่ให้โปรแกรมมันเติม sharding name ฮะ ไม่งั้นเราจะเจอชื่อ file มีต่อท้ายด้วย "-00000-of-00001" แน่ๆ