Let's try: Apache Beam part 8 - Tags & Side inputs
in this series
- Let's try: Apache Beam part 1 – simple batch
- Let's try: Apache Beam part 2 – draw the graph
- Let's try: Apache Beam part 3 - my own functions
- Let's try: Apache Beam part 4 - live on Google Dataflow
- Let's try: Apache Beam part 5 - transform it with Beam functions
- Let's try: Apache Beam part 6 - instant IO
- Let's try: Apache Beam part 7 - custom IO
- Let's try: Apache Beam part 8 - Tags & Side inputs
We sometimes have to apply some complex conditions in our Beam pipeline. This blog we will get along together to see how can we design those complex ideas into a simple-readable yet powerful workflow.
Quest today
We are finding out preparing these books in the CSV:
However, we also have a list of banned books' ISBN:
We have to group the books into one of these:
- "BANNED": any book in the ban list and is published after 1970 will be in this group. They will be sent to "Midnight library". Any that is published before or in 1970 will be unbanned.
- "ANTIQUE": any book which is published until 1970 will be in this group and will be sent to "Archive common library".
- "MODERATE": any book which is published in between 1971 and 2017 will be in this list and will be sent to "Central library".
- "MODERN": any book which is published since 2018 will be in this group and will be sent to "New Bloom library".
Side inputs
First thing first, we are able to tell which book is in the ban list. One solution in order to add ban status into the book list is "Side inputs".
Side inputs are additional inputs we directly added into PTransform.
With side inputs, we are enabling a function to get more parameters. For example:
We are adding new key is_banned
if the book's ISBN is in the banned_book_list
parameter.
To call this DoFn
, there are 2 ways.
- Prepare an input before the pipeline
- Prepare an input inside the pipeline
beam.pvalue.AsIter()
for many values (e.g. a list) or beam.pvalue.AsSingleton()
for single value (e.g. integer).After the transformation with these side inputs, we are able to see the output like this:
Tagging
Now we have a ban flag in the book list. Next is to group or classify each book into each group.
It is the time to utilize tagging feature. This feature allows us to tag a value in each element of the PCollection.
We apply the logic of grouping here and return transformed element and particular tag in each if-clause in the formatyield beam.pvalue.TaggedOutput(self.<tag>, element)
.
Tagging DoFn
is ready and call it like this:
Look at line 12-16, adding beam.ParDo(<DoFn>).with_outputs(<tags>)
is executing that DoFn
and get elements with tags in return.
Line 21 is that we can select a tag to perform some PTransformation by refering as a list, PCollection[<tag>]
.
Example element after this step would be like this:
Write to files
Applying side inputs and tagging, it's ready to print out to files.
Example "BANNED" books:
Example "MODERN" books:
Repo
References
- Side input patterns https://beam.apache.org/documentation/patterns/side-inputs/
- apache_beam.pvalue module https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.pvalue.html