ครั้งกระโน้น เคยเล่าถึงการตั้งเวลารันโปรแกรม (เรียกย่อๆ ว่ารันจ๊อบ) ซึ่งก็มีพูดถึงไปแล้วสามตัวนะฮะ ได้แก่

  1. Task scheduler ใน Windows (Data Integration (EP 3 จบ) - ได้แต่รันอยู่ซ้ำๆ)
  2. Crontab ใน UNIX (Data Integration (EP 3 จบ) - ได้แต่รันอยู่ซ้ำๆ)
  3. Rundeck ใน UNIX (ลองใช้ Rundeck เพื่องาน deployment อัตโนมัติ)

แต่บล็อกนี้จะพูดถึง Apache Airflow ที่เป็นโปรแกรมตั้งเวลารันที่ผมและทีมใช้งานเป็นหลักฮะ

อรัมภบท

Apache Airflow เป็นโปรแกรมของ Apache ที่ทำหน้าที่รันโปรแกรมตามการออกแบบของเราเอง เราจะให้คำสั่งไหน รันก่อน รันหลัง ตามด้วยโปรแกรมอะไร มีเงื่อนไขยังไงบ้าง เราก็สามารถเขียนให้มันทำงานเป็น flow ได้เลยฮะ ซึ่ง flow ที่ว่าเนี่ย มันจะเรียกว่า DAG ย่อมาจาก Directed Acyclic Graph หรือแปลตรงตัวว่า แผนผังที่มีทิศทางและไม่เป็นวงย้อนกลับนั่นเองฮะ

Credit: https://airflow.apache.org

ความดีงามของ Apache Airflow คือ มันเป็น open-source นั่นคือ เราสามารถใช้งานได้โดยไม่เสียค่าใช้จ่ายนั่นเองฮะ และมันก็เป็นเครื่องมือที่เราๆ เหล่า Data Engineer นิยมใช้งานกันด้วยความที่มันใช้งานง่ายฮะ

ตัว Apache Airflow รองรับการใช้งานบนระบบ Unix ทำให้สามารถรัน Bash script และมันถูกพัฒนาด้วย Python ทำให้เราเขียน Python script ไปวางแล้วรันได้เลยนั่นเองฮะ

เรามาใช้งานจริงกันเลยดีกว่า

ติดตั้ง Airflow Docker

เพื่อความง่าย เราเลยมาเริ่มหัดใช้งานกันด้วย Docker image ของ Airflow กันก่อนนะฮะ

docker pull puckel/docker-airflow
docker run -d -p 8080:8080 – name airflow -v /path/in/my/machine/:/usr/local/airflow/dags puckel/docker-airflow webserver

อธิบาย command กันนิดนึงนะฮะ หลังจากที่เรารันบรรทัดแรกเพื่อ pull image ของ Puckel/docker-airflow (Unofficial image ที่ใช้งานแพร่หลายก่อนจะมี official image ออกมา) ก็ใช้คำสั่ง docker run เพื่อสร้าง container จาก image ที่ว่า มีรายละเอียดตามนี้ฮะ

  • -d (detach) รันเป็นเบื้องหลัง เพื่อที่เราอาจจะทำคำสั่งอื่นต่อไป
  • -p 8080:8080 (publish) เชื่อมต่อ port ของ container เข้ากับเครื่องของเรา โดย 8080 ตัวแรกคือ port ของเครื่องเรา และ 8080 ตัวหลังคือ port ของ container ฮะ
  • --name airflow กำหนดชื่อให้กับ container ตัวนี้ มันก็จะมีชื่อว่า airflow
  • -v /path/in/my/machine/:/usr/local/airflow/dags (volume) mount path ของเครื่องเรา เข้ากับ path ของ container เคสนี้คือ เราจะ mount path ไปยัง /airflow/dags ซึ่งเป็น folder หลักของ airflow ที่จะรัน script นะฮะ
  • puckel/docker-airflow ระบุชื่อ image ที่เราจะใช้สร้าง container
  • webserver กำหนด service ที่เราจะรัน container เพราะเราต้องการใช้งาน Apache Airflow ซึ่งมันทำงานเป็น Web-based เราจึงต้องสั่งให้มันรัน web service ฮะ

เริ่มต้นใช้งาน Airflow Web

เมื่อเราสร้าง container สำเร็จ และลองเช็คแล้วว่า container ใช้งานอยู่ ด้วยคำสั่ง

docker ps -a
Container กำลังทำงานอยู่ล่ะ

แล้วถ้าเห็นว่า มันมีสถานะเป็น Up ไม่ใช่ Exited ก็แปลว่าเราสามารถเข้าใช้งาน Airflow ได้แล้วฮะ โดยเข้าไปที่ http://localhost:8080/admin/ แล้ว login ด้วย username/password ว่า airflow/airflow ฮะ ซึ่งเป็นค่าเริ่มต้น

รันตัวอย่าง DAG

เรามาเริ่มวาง DAG script กันฮะ โดยอ้างอิงตัวอย่างจาก Official doc ของ Airflow นะฮะ https://airflow.apache.org/docs/stable/tutorial.html

DAG script จะต้องเป็น Python script เราก็ก๊อปปี้ source code ข้างบนมาสร้างเป็นไฟล์ใหม่ (ผมกำหนดชื่อเป็น “tutorial.py”) แล้ววางไว้ที่ path ที่เรา mount ข้างบน

รีเฟรชหน้าเว็บ Airflow เราก็จะเห็นว่ามีแถวใหม่ขึ้นมา มันก็คือ DAG ที่เราวางไว้ตะกี้ฮะ

และเมื่อเรากดที่ชื่อ DAG ก็จะเจอกับ Tree view ของ DAG ตัวนั้นๆ ให้เห็นว่ามีกี่ขั้นตอนใน DAG ตัวนี้ วางลำดับยังไง และมีประวัติการรันผ่านไม่ผ่านเป็นอย่างไรบ้าง

ถ้ากดดูที่ Code เราจะเห็น DAG script ของเราฮะ

ลองรัน DAG

การรัน DAG สามารถทำได้สองวิธีฮะ อย่างแรกคือตามใจฉัน ก็แค่กดปุ่ม trigger DAG ที่เป็นรูปปุ่ม Play ที่คอลัมน์ Links กับอีกอย่างคือรอให้มันรันตาม schedule แต่ไม่ว่าจะทำวิธีไหน ก็ต้องเปิดใช้งาน DAG ก่อนโดยการกดปุ่ม OFF เป็น ON ใกล้ๆ ชื่อ DAG นะฮะ

เมื่อ DAG ถูกรันขึ้นมา มันจะไปเริ่มต้นรันที่ตัวแปร DAG() ก่อนฮะ จากตัวอย่างก็จะเป็นตรงนี้

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

'tutorial' จะเป็นชื่อ DAG และเราต้องตั้งชื่อด้วยอักขระ ตัวเลข และ underscore เท่านั้นฮะ

Operators

เรากำหนดให้ DAG มีกี่ขั้นตอนก็ได้ โดยแต่ละขั้นจะต้องกำหนดด้วย Operator กลับมาดูที่ tutorial.py จะเห็นว่ามีตัวแปร t1, t2, t3 ที่เป็น BashOperator ทั้งหมด ก็คือตัวแปรที่ใช้ Bash script ฮะ

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

ตัวอย่าง t1 ข้างบนนี้ บอกว่า นี่คือ BashOperator ที่จะรันคำสั่ง date เพื่อแสดงวันที่ออกมา โดยกำหนด task_id ให้เป็น print_date (ตั้งชื่อด้วยอักขระ ตัวเลข และ underscore เท่านั้น) ตรงนี้จะไปแสดงผลที่หน้า Graph View และ Tree View ส่วน dag= ก็เพื่อผูก Operator ตัวนี้เข้ากับตัวแปร DAG()

เมื่อกำหนด Operator เสร็จแล้ว เราก็จะโยงลำดับการรันแต่ละ Operator ตามตัวอย่างนี้

t1 >> [t2, t3]

มันแปลว่า ถ้ารัน t1 เสร็จแล้ว ให้รัน t2 และ t3 ต่อไปแบบขนานกัน เราจึงเห็น Graph View ตามแบบนี้

สมมติว่าเราต้องการให้ t1 เสร็จแล้ว t2 แล้วค่อยรัน t3 หลังสุด ก็จะเขียนแบบนี้

t1 >> t2 >> t3

ส่วนมี Operator อะไรอีกบ้างนั้น กดเข้าไปดูได้ที่ https://airflow.apache.org/docs/stable/howto/operator/index.html ฮะ

Schedule

มันถือเป็นจุดเด่นของ Airflow และก็เป็นจุดน่าเวียนหัวของ Airflow เช่นกันฮะ

เราจะต้องกำหนด Schedule ของ DAG ให้เป็น crontab format (ทดลองได้ที่ https://crontab.guru/)

Apache Airflow จะมองว่าทุกๆ DAG ที่รันผ่าน schedule จะเป็นอดีตทั้งหมด ดังนั้นวันที่ที่เราเห็นจะเป็นรอบการรันที่ย้อนไปหนึ่งรอบ ตัวอย่างเช่น

รันทุกวันตอนสองทุ่มครึ่ง

มันจะมองว่าวันที่กำลังรันเป็นเมื่อวานเสมอ จากรูปข้างล่าง รันจริงตอนสองทุ่มครึ่งวันที่ 27 (Started: 2020-09-27) แต่ Airflow จะระบุวันที่รันเป็นวันที่ 26 (Run: 2020-09-26)

รันทุกชั่วโมง ณ นาทีที่ 19

เช่นเดียวกัน มันจะมองเวลารันเป็นหนึ่งชั่วโมงที่แล้ว ตัวอย่างรันจริงตอนสี่ทุ่ม (Started: 2020-10-10T23:19:20) แต่ Airflow ระบุเวลารันเป็นหนึ่งชั่วโมงก่อนหน้า (Run: 2020-10-10T22:19:00)

เหตุผลที่เป็นแบบนี้ก็เพราะ Airflow มองที่วันเวลาของข้อมูล โดยทั่วไป การทำ batch ETL (Extract-Transform-Load) จะทำกับข้อมูลในอดีต เช่น เรารันวันนี้แต่ข้อมูลที่เราใช้เป็นของเมื่อวาน นั่นคือ Airflow ถือว่าการรันเป็นของเมื่อวานฮะ

อ้างอิง https://stackoverflow.com/questions/39612488/airflow-trigger-dag-execution-date-is-the-next-day-why

แต่ถ้าเรากดรันเอาเองเนี่ย วันที่รันจะเป็นเวลาปัจจุบันนะฮะ และมีค่า run_id ขึ้นต้นด้วยคำว่า manual_ บอกชัดเจนเลย

System variables

Airflow เองก็มีตัวแปรที่กำหนดมาตั้งแต่ต้น ดูทั้งหมดได้จาก https://airflow.apache.org/docs/stable/macros-ref เพียงแต่ว่านะ เท่าที่เคยใช้ มีแค่ {{ds}} ฮะ เพราะงานเกือบทั้งหมดอ้างอิงค่าของวันนี้มาใช้ต่อ

อันนี้เป็นตัวอย่างการใช้ variables นะฮะ โดยผมใช้ source code ข้างล่างนี้

t4 = BashOperator(
    task_id="test_t4",
    depends_on_past=False,
    bash_command="echo {{ds}}",
    dag=dag
)

มันคือการสั่งให้รัน echo {{ds}} ก็จะแสดงวันที่ออกมา ตามรูปข้างล่างนี้


เป็นตัวอย่างเล็กๆ น้อยๆ ของ Apache Airflow ที่ทีมผมใช้งานกันเป็นหลัก อย่างหนักหน่วงเลยฮะ