สวัสดีคร้าบ~

ขึ้นซีรี่ส์ใหม่ของ "มาใช้กันเถอะ" ฮะ เราจะมาคุยกันถึงเจ้าเครื่องมือทำ transformation ที่หลายๆ ที่นิยมใช้กัน นั่นก็คือ Apache Beam นี่เองงงงงงง


Apache Beam คือ ?

Apache Beam เป็น tool ตัวนึงจาก Apache ไว้ใช้ประมวลผลข้อมูลได้ทั้งแบบ batch และ real-time ฮะ จุดเด่นของมัน ความที่มันเป็น open-source และใช้งานได้ในหลายภาษาหลัก เช่น Java, Python, และ Go

มันอยู่ในตัว "T" - Transform ของ "ETL" นั่นล่ะฮะ แบบ transform จากไข่ไก่ เป็นไข่ดาวให้ลูกค้าเราทานนั่นเอง

Apache Beam®
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the…

และแน่นอน เราจะมาลองใช้กัน ในภาษา Python


Concept ของตอนนี้

จำเป็นต้องรู้จักพื้นฐานกันนิดนึงฮะ

Terms

Apache Beam มี term อยู่สามตัวหลักๆ

  1. Pipeline: คือกระบวนการทำงานทั้งหมดที่เรากำหนด
  2. PCollection: เป็นชุดข้อมูล แบ่งได้เป็น bounded, คือมีจำนวนข้อมูลที่จำกัด นั่นคือ batch และอีกแบบนึงคือ unbounded, เป็นข้อมูลที่มีจำนวนไม่จำกัด หรือก็คือ streaming นั่นเองฮะ
  3. PTranform: คือ operation ที่เราเอาไว้ใช้ transform ฮะ

Pipeline จะเริ่มต้นด้วย PCollection ไม่ว่าจะเป็นแบบ bounded หรือ unbounded datasource ก็ตาม จากนั้นก็มีทำ transform ด้วย PTransform ไปจนจบกระบวนการฮะ

Runner

Apache Beam ต้องการ "runner" เพื่อใช้for execute task ฮะ ตัว runner เองก็มีหลายตัว ขึ้นอยู่กับ platform และ engine ที่เราใช้ run

รายละเอียดอยู่ในลิงก์นี้ฮะ

Apache Beam Capability Matrix
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the…

ในตอนนี้ เราจะใช้ Python Direct Runner ซึ่งเป็นตัวที่ใช้งานง่ายมากๆ ตัวนึงล่ะฮะ


เริ่มลงมือ

โจทย์ของตอนนี้ คือ อ่าน CSV file และทำ transformation ฮะ

1. ติดตั้ง Apache Beam

เริ่มจากเตรียม environment และติดตั้ง Apache Beam ก่อนเลย

pip install apache-beam

2. เตรียม CSV file

ติ๊ต่างไว้ว่าข้อมูลใน CSV เป็นแบบนี้ฮะ

3. อ่าน file

ทีนี้ เราก็สามารถสั่ง Apache Beam ให้อ่าน file นั่น เขียนประมาณนี้ฮะ

เพราะเราต้องการให้มันอ่านบนเครื่องเราเอง เลยกำหนดให้ parameter runner เป็น DirectRunner() หรือจะใช้ string 'DirectRunner' ตรงๆ เลยก็ได้ฮะ

จากนั้น เราก็เพิ่ม step ต่อไปด้วยการใช้ pipe ( | ) ตามด้วยbeam.io.ReadFromText() เพื่ออ่านข้อความจาก file

ส่วนตัวผมจะใช้ beam.Map(print) เพื่อ debug ฮะ ตรงนี้คือการสั่งให้ Beam ไปทำ PTransform ในที่นี้คือ print เพื่อแสดงค่า ผ่านคำสั่ง beam.Map() และมันจะรับ PCollection จาก step ก่อหน้ามาเป็น parameter ฮะ ผลก็คือ มันจะแสดงค่าที่เราอ่านจาก file นั่นเองฮะ

4. Map to dict

ทีนี้ เรามี function mapToDict() ซึ่งรับ string มาแปลงเป็น dict เราก็เรียกมันด้วย beam.Map(mapToDict) แล้ว print ค่าดูอีกทีนึง

5. Filter

เอาล่ะ เราต้องการเฉพาะผู้หญิง (F) เลยเติมไปอีกหนึ่ง step beam.Filter() เราก็จะได้ PCollection ตามที่กำหนดเงื่อนไขเอาไว้ฮะ

6. Map to CSV row

ตอนนี้เราพอใจล่ะ ต้องการจะเขียนผลลัพท์ลง file เลยเขียน function เพื่อ transform dict เป็น strในรูปแบบ CSV

method mapToCSVRow มีหน้าที่แปลงเป็น string ตามที่ว่าฮะ

7. เขียนลง CSV file

สุดท้ายแล้ว เราก็ใช้ beam.io.WriteToText() สั่งให้เขียน PCollection จาก PTransform อันล่าสุดลงไปใน file

ตรงนี้เราจะเพิ่ม shard_name_template="" เพื่อไม่ให้โปรแกรมมันเติม sharding name ฮะ ไม่งั้นเราจะเจอชื่อ file มีต่อท้ายด้วย "-00000-of-00001" แน่ๆ

8. ได้ CSV file มาแล้ว