จาก blog ที่แล้ว ตอนที่ 6 - IO สำเร็จรูป เราก็รู้กันไปแล้วบ้างว่า Apache Beam มันมี packages ให้เราเลือกเชื่อมต่อ Input/Output มาพร้อมใช้ตั้งแต่แรกหลายตัวเลยฮะ

แต่ทีนี้ ถ้าตัวที่เราต้องการจะต่อไปอ่านค่า หรือเขียนค่า มันไม่มีใน Beam ให้ตั้งแต่แรกล่ะ เราก็จำเป็นต้องเขียนมันขึ้นมาเองนะฮะ

สำหรับ blog นี้ เราเลยมารู้จักกันว่า ถ้าเราจะต้องเขียน IO package ขึ้นมาเองเนี่ย จะทำยังไงได้บ้างฮะ


อธิบายกันก่อน

ใน blog นี้จะเล่าถึงการเขียน IO package เพื่ออ่านและเขียนค่าใน Google Firestore นะฮะ

สำหรับคนที่ยังไม่รู้จัก Google Firestore เป็น NoSQL database ของฝั่ง Google Cloud Platform มีจุดเด่นด้าน real-time sync และ integrate กับ Google Cloud Service ได้สะดวกมากๆ คนนิยมใช้กันหลากหลาย เช่น online gaming และพวก mobile app นั่นเองฮะ

Firestore | Firebase
Use our flexible, scalable NoSQL cloud database, built on Google Cloud infrastructure, to store and sync data for client- and server-side development.

อ่านค่าจาก Google Firestore

code snippet

เริ่มจากตัวอย่างง่ายๆ ก่อน คือ list แล้วอ่านทุก document ใน collection ที่ต้องการ

  • บรรทัด 8: เชื่อมต่อ Firestore ที่ default database ด้วย default credentials
  • บรรทัด 11: stream() เพื่อดึง document ทุกอันมาจาก collection ที่กำหนด
  • บรรทัด 13: print id และข้อมูลใน document ด้วย to_dict()

อ่านด้วย Beam

ทีนี้ เรามาเขียนให้ Beam ทำงานเหมือนกับข้างต้น เราสามารถเขียน class ที่ inherit apache_beam.io.iobase.BoundedSource เพื่อให้มันอ่านค่าจาก Firestore ได้ ซึ่งหลักการเขียนสามารถอ่านเพิ่มเติมได้ตามลิงก์ข้างล่างนี้ฮะ

Apache Beam: Developing I/O connectors for Python
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the…
  • บรรทัด 22: init() class ขึ้นมาแล้วรับค่า collection ที่จะอ่าน, limit ที่จะอ่าน แล้วก็ database name
  • บรรทัด 33: estimate_size() เพื่อให้ Beam สามารถคำนวณ total size ในหน่วย byte ก่อนทำ decompression หรือ process อย่างอื่น
  • line 36: get_range_tracker() เพื่อรายงาน progress กับทำ dynamic splitting
  • line 44: read() เพื่ออ่านค่าข้อมูล โดยเราก็ code จากข้างบนนั่นแหละมาใส่ฮะ
  • line 61: split() เพื่อ split data เป็น bundle

ทีนี้ พอเราจะเรียกใช้งาน ก็เขียนว่าbeam.io.Read(FireStoreReaderFromCollection("collection_name"))


เขียนค่าลงใน Google Firestore

code snippet

เราใช้ batch write เพราะว่า Firestore มัน support การเขียน batch write ฮะ เพื่อประหยัดเวลาและค่าใช้จ่าย เขียนทีนึง 500 record ไปเล้ย

  • บรรทัด 8: เชื่อมต่อกับ default database ด้วย default credential
  • บรรทัด 11: กำหนดค่าว่า batch นึงเก็บ element มากที่สุดเท่าไหร่
  • บรรทัด 12: ตั้งต้น batch เป็น array
  • บรรทัด 14: internal method เพื่อ commit batch ไปยัง Firestore
  • บรรทัด 16: iterate element ใน batch เพื่อเขียนลง Firestore ด้วย batch.set()
  • บรรทัด 21: commit batch
  • บรรทัด 29: iterate element ใน data ต้นทาง แล้ว load ลง batch
  • บรรทัด 31: call internal method เพื่อ commit batch เมื่อจำนวน element ใน batch มันถึง maximum ที่เรากำหนดในบรรทัด 11
  • บรรทัด 33; call internal method อีกรอบนึง ถ้ายังมี element เหลืออยู่ใน batch กรณีที่มันไม่ถึง maximum

เขียนด้วย Beam

เราจะใช้ apache_beam.DoFn กันนะฮะ

  • บรรทัด 21: init() class ขึ้นมารับค่า collection ที่จะเขียน, database name แล้วก็ maximum elements ของ batch
  • บรรทัด 31: setup() จะทำงานอัตโนมัติ เมื่อ "the instance is deserialized" หรือก็คือ  DoFn instance มันพร้อมทำงานหลังจากที่ worker มัน start up เรียบร้อยแล้วฮะ ตามเอกสารของ Beam แนะนำให้เรา setup connection กับ database หรือพวก network ที่ method นี้
  • บรรทัด 40: start_bundle() ทำงานอัตโนมัติ เมื่อ element bundle กำลังจะถูก process โดยที่ element bundle คือ PCollection ที่แยกส่วนออกมาโดย runner เพื่อให้ทำงานได้อย่างมีประสิทธิภาพน่ะฮะ ใน method นี้แหละที่เราจะใช้ประกาศตัวแปร current batch และ clear ค่าของมันด้วย
  • บรรทัด 43: finish_bundle() ทำงานอัตโนมัติ เมื่อ element bundle กำลังจะเสร็จจาก process ซึ่งเราจะใช้เพื่อ commit batch
  • บรรทัด 47: process() ทำงานอัตโนมัติ เมื่อ element กำลังจะถูก process โดยจะถูกเรียกหลังจาก start_bundle() และก่อนหน้า finish_bundle() ตรงนี้แหละที่เราจะใส่ element ลงไปใน batch ล่ะ
  • บรรทัด 52: commit_batch() commit current batch ให้ไปเขียนลง Firestore จากนั้นก็สั่ง clear current batch เพื่อทำงานใน bundle ต่อไป
  • บรรทัด 62: teardown() ทำงานอัตโนมัติ เมื่อ "the instance is shutting down" หรือก็คือ DoFn ทำงานจบแล้ว เอกสาร Beam แนะนำให้ทำปิดงาน เช่น close connection แต่ในตัวอย่างนี้ ไม่ต้องทำอะไรฮะ เลย pass ไปเฉยๆ แหละ

เรียกใช้งาน DoFn อันนี้ แบบนี้ฮะ beam.ParDo(FireStoreWriterToCollection("collection_name"))


เอาทั้งคู่มาอยู่ใน Pipeline

เรามีทั้งเขียน ทั้งอ่านล่ะ ทีนี้ ลองเอามาทำเป็น pipeline จริงจัง ให้อ่านจาก collection นึงแล้ว copy ไปเขียนที่อีก collection นึง แบบ diagram นี้ฮะ

เราจะเรียง file แบบนี้ฮะ

แล้วสร้างmain.py ขึ้นมา call class ทั้งสองตัว

ก่อนจะ run เรามีแค่ collection เดียว คือ customers

หลัง run เราจะได้ collection อันใหม่มาพร้อม document เหมือนกับอันแรก ด้วยชื่อ new_customers


Repo

sample-beam/07-custom-io at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

References