มาใช้ Apache Beam กันเถอะ – ตอนที่ 7 IO ที่สร้างได้
in this series
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 1 สร้าง batch process แบบง่ายๆ
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 2 วาด Beam ให้เป็น flow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 3 สร้าง function ไว้ใช้เอง
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 4 ได้เวลา Google Dataflow
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 5 Beam functions จัดให้แล้ว
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 6 IO สำเร็จรูป
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 7 IO ที่สร้างได้
- มาใช้ Apache Beam กันเถอะ – ตอนที่ 8 โพย side inputs และการติด tag
จาก blog ที่แล้ว ตอนที่ 6 - IO สำเร็จรูป เราก็รู้กันไปแล้วบ้างว่า Apache Beam มันมี packages ให้เราเลือกเชื่อมต่อ Input/Output มาพร้อมใช้ตั้งแต่แรกหลายตัวเลยฮะ
แต่ทีนี้ ถ้าตัวที่เราต้องการจะต่อไปอ่านค่า หรือเขียนค่า มันไม่มีใน Beam ให้ตั้งแต่แรกล่ะ เราก็จำเป็นต้องเขียนมันขึ้นมาเองนะฮะ
สำหรับ blog นี้ เราเลยมารู้จักกันว่า ถ้าเราจะต้องเขียน IO package ขึ้นมาเองเนี่ย จะทำยังไงได้บ้างฮะ
อธิบายกันก่อน
ใน blog นี้จะเล่าถึงการเขียน IO package เพื่ออ่านและเขียนค่าใน Google Firestore นะฮะ
สำหรับคนที่ยังไม่รู้จัก Google Firestore เป็น NoSQL database ของฝั่ง Google Cloud Platform มีจุดเด่นด้าน real-time sync และ integrate กับ Google Cloud Service ได้สะดวกมากๆ คนนิยมใช้กันหลากหลาย เช่น online gaming และพวก mobile app นั่นเองฮะ
อ่านค่าจาก Google Firestore
code snippet
เริ่มจากตัวอย่างง่ายๆ ก่อน คือ list แล้วอ่านทุก document ใน collection ที่ต้องการ
- บรรทัด 8: เชื่อมต่อ Firestore ที่ default database ด้วย default credentials
- บรรทัด 11:
stream()
เพื่อดึง document ทุกอันมาจาก collection ที่กำหนด - บรรทัด 13: print id และข้อมูลใน document ด้วย
to_dict()
อ่านด้วย Beam
ทีนี้ เรามาเขียนให้ Beam ทำงานเหมือนกับข้างต้น เราสามารถเขียน class ที่ inherit apache_beam.io.iobase.BoundedSource
เพื่อให้มันอ่านค่าจาก Firestore ได้ ซึ่งหลักการเขียนสามารถอ่านเพิ่มเติมได้ตามลิงก์ข้างล่างนี้ฮะ
- บรรทัด 22:
init()
class ขึ้นมาแล้วรับค่า collection ที่จะอ่าน, limit ที่จะอ่าน แล้วก็ database name - บรรทัด 33:
estimate_size()
เพื่อให้ Beam สามารถคำนวณ total size ในหน่วย byte ก่อนทำ decompression หรือ process อย่างอื่น - line 36:
get_range_tracker()
เพื่อรายงาน progress กับทำ dynamic splitting - line 44:
read()
เพื่ออ่านค่าข้อมูล โดยเราก็ code จากข้างบนนั่นแหละมาใส่ฮะ - line 61:
split()
เพื่อ split data เป็น bundle
ทีนี้ พอเราจะเรียกใช้งาน ก็เขียนว่าbeam.io.Read(FireStoreReaderFromCollection("collection_name"))
เขียนค่าลงใน Google Firestore
code snippet
เราใช้ batch write เพราะว่า Firestore มัน support การเขียน batch write ฮะ เพื่อประหยัดเวลาและค่าใช้จ่าย เขียนทีนึง 500 record ไปเล้ย
- บรรทัด 8: เชื่อมต่อกับ default database ด้วย default credential
- บรรทัด 11: กำหนดค่าว่า batch นึงเก็บ element มากที่สุดเท่าไหร่
- บรรทัด 12: ตั้งต้น batch เป็น array
- บรรทัด 14: internal method เพื่อ commit batch ไปยัง Firestore
- บรรทัด 16: iterate element ใน batch เพื่อเขียนลง Firestore ด้วย
batch.set()
- บรรทัด 21: commit batch
- บรรทัด 29: iterate element ใน data ต้นทาง แล้ว load ลง batch
- บรรทัด 31: call internal method เพื่อ commit batch เมื่อจำนวน element ใน batch มันถึง maximum ที่เรากำหนดในบรรทัด 11
- บรรทัด 33; call internal method อีกรอบนึง ถ้ายังมี element เหลืออยู่ใน batch กรณีที่มันไม่ถึง maximum
เขียนด้วย Beam
เราจะใช้ apache_beam.DoFn
กันนะฮะ
- บรรทัด 21:
init()
class ขึ้นมารับค่า collection ที่จะเขียน, database name แล้วก็ maximum elements ของ batch - บรรทัด 31:
setup()
จะทำงานอัตโนมัติ เมื่อ "the instance is deserialized" หรือก็คือDoFn
instance มันพร้อมทำงานหลังจากที่ worker มัน start up เรียบร้อยแล้วฮะ ตามเอกสารของ Beam แนะนำให้เรา setup connection กับ database หรือพวก network ที่ method นี้ - บรรทัด 40:
start_bundle()
ทำงานอัตโนมัติ เมื่อ element bundle กำลังจะถูก process โดยที่ element bundle คือ PCollection ที่แยกส่วนออกมาโดย runner เพื่อให้ทำงานได้อย่างมีประสิทธิภาพน่ะฮะ ใน method นี้แหละที่เราจะใช้ประกาศตัวแปร current batch และ clear ค่าของมันด้วย - บรรทัด 43:
finish_bundle()
ทำงานอัตโนมัติ เมื่อ element bundle กำลังจะเสร็จจาก process ซึ่งเราจะใช้เพื่อ commit batch - บรรทัด 47:
process()
ทำงานอัตโนมัติ เมื่อ element กำลังจะถูก process โดยจะถูกเรียกหลังจากstart_bundle()
และก่อนหน้าfinish_bundle()
ตรงนี้แหละที่เราจะใส่ element ลงไปใน batch ล่ะ - บรรทัด 52:
commit_batch()
commit current batch ให้ไปเขียนลง Firestore จากนั้นก็สั่ง clear current batch เพื่อทำงานใน bundle ต่อไป - บรรทัด 62:
teardown()
ทำงานอัตโนมัติ เมื่อ "the instance is shutting down" หรือก็คือDoFn
ทำงานจบแล้ว เอกสาร Beam แนะนำให้ทำปิดงาน เช่น close connection แต่ในตัวอย่างนี้ ไม่ต้องทำอะไรฮะ เลย pass ไปเฉยๆ แหละ
เรียกใช้งาน DoFn
อันนี้ แบบนี้ฮะ beam.ParDo(FireStoreWriterToCollection("collection_name"))
เอาทั้งคู่มาอยู่ใน Pipeline
เรามีทั้งเขียน ทั้งอ่านล่ะ ทีนี้ ลองเอามาทำเป็น pipeline จริงจัง ให้อ่านจาก collection นึงแล้ว copy ไปเขียนที่อีก collection นึง แบบ diagram นี้ฮะ
เราจะเรียง file แบบนี้ฮะ
แล้วสร้างmain.py
ขึ้นมา call class ทั้งสองตัว
ก่อนจะ run เรามีแค่ collection เดียว คือ customers
หลัง run เราจะได้ collection อันใหม่มาพร้อม document เหมือนกับอันแรก ด้วยชื่อ new_customers
Repo
References
- Developing I/O connectors for Python https://beam.apache.org/documentation/io/developing-io-python/#implementing-the-boundedsource-subclass
- Firestore: Get all documents in a collection https://firebase.google.com/docs/firestore/query-data/get-data#get_all_documents_in_a_collection
- Firestore: Batched writes https://firebase.google.com/docs/firestore/manage-data/transactions#batched-writes