Let's try Apache Beam part 5 — transform it with Beam functions
in this series
- Let's try: Apache Beam part 1 – simple batch
- Let's try: Apache Beam part 2 – draw the graph
- Let's try: Apache Beam part 3 - my own functions
- Let's try: Apache Beam part 4 - live on Google Dataflow
- Let's try: Apache Beam part 5 - transform it with Beam functions
Apache beam has many transformations out of the box. This blog we are going to see what functions we can try on our problems, and it may be useful for us not to waste time for.
Preparations
This blog we will use the CSV file of 30 people. This.
It contains ID, name, gender, occupation, team, and age.
Also prepare a same method to transform a CSV to dict.
Example 1: Women in teams
We want to see list of female in each team.

Steps of thought:
- Read a file and
beam.ParDo()
to transform it to a dict. - Perform
beam.Filter()
to get only female. beam.GroupBy()
to group them based on "team".beam.Map()
with the functioncustom_print()
which displays team and people in the team.
beam.GroupBy()
requires a function to determine what property in an element is a key to be grouped.
Example 2: How many men and women?
We want to see number of men and women in the list.

Steps of thought:
- Read a file and
beam.ParDo()
to transform it to a dict. - Group them based on "gender" using
beam.Partition()
. - For male, count the male group with
beam.combiners.Count.Globally()
and print out. - For female, do the same.
beam.Partition()
requires a function to determine a number of each partition.
In this case, we give the number by index vialist.index()
, it wasgenders.index()
.beam.combiners.Count.Globally()
returns a number of elements in the PCollection.
We perform branching here by applying parentheses with different PCollections from the previous step.
As we can see, we have male_people
and female_people
PCollections after the 2nd PTransform. After that we use them as initial PCollection of the later blocks.
This can be rendered like this.

See how to generate a DAG as above at part 2.
Example 3: Numbers of occupation
We want to see list of single word occupation and number of occurrence.

Steps of thought:
- Read a file and
beam.ParDo()
to transform it to a dict. - Split "occupation" into words with
beam.FlatMap()
andstr.split()
. - Cleanse all non-alphanumeric with
beam.Regex.replace_all()
. - Transform to lowercase with
beam.Map()
. - Retrieve only occupation vocabularies with
beam.Regex.matches()
. - Group and count number of occurrence using
beam.combiners.Count.PerElement()
.
beam.FlatMap()
accepts a function that returns iterable then transform it to PCollection.
For example, we have a string"Administrator, charities/voluntary organisations"
. Then split to["Administrator", "charities/voluntary", "organisations"]
. This list will become a PCollection.beam.Regex.replace_all()
replace all occurrence in an element to a given string, validated by a Regex.
In this case we replace all non-alphanumeric (r"[^\w\d]"
) to an empty string, implying remove it.beam.Regex.matches()
filters elements based on a Regex.
In this case we accepts words starting with first 4 letters or more and ends with "ist", "er", "or", "ian", or "ant".-
beam.combiners.Count.PerElement()
returns a PCollection of unique elements with their number of occurrences.
Read more about Regex, follow the link below.
Example 4: The oldest in a group
We want to see how old of the oldest per team per gender.

Steps of thought:
- Read a file and
beam.ParDo()
to transform it to a dict. - Transform with
beam.Map()
to tuples in format(team-gender, age)
. - Group with
beam.CombinePerKey()
and supplymax
to return max age of each group.
beam.CombinePerKey()
requires a PCollection having a key and value.
In this case we have key asteam-gender
and the value asage
, the function will group keys together and executemax
on the value, then return unique keys withmax
ofage
.
Repo
I have concluded source code in the repo below.
Reference
