Near real-time data pipeline in CTW

York | Posted on 01-19

The near real-time data pipeline solution

The existing data pipeline

As a game platform, g123 consumes lots of kinds of event data created from games.

Previously we used Firehose and Lambda to process the data transformation and dump the events into S3, since there may be duplicated events been produced by the client-side, we also need to have a deduplication and formating layer after the raw data. The workflow will look like the graph below.

flowchart LR
Source[Data Gateway] --> Kinesis[AWS Kinesis]
Kinesis --> Lambda[Data transformation Lambdas] 
Lambda --> Kinesis
Kinesis --> Firehose[AWS Firehose]
Firehose -- write as compressed raw data --> S3[Raw Data]
S3 --> Spark[Daily processing] 
Spark -- deduplicate, format --> S3_2[Formated Data]

In this solution we mainly used AWS-managed features, the code that needs to be maintained was only the code of Lambda functions, it helped us to deliver the pipeline as fast as possible.

Although this solution works pretty good in the past year, we still facing serval problems:

  1. The process of data transformation is asynchronous, we can hardly guarantee the processing of one particular event has been finished when we are able to query the data unless we create one more kinesis after the lambda function.

  2. Although the Kinesis Data Firehose provides us the functionality to dynamically partition the data if we need more complex partitioning logic we need to create lambda functions, and may take the resource quota from other services.

To solve the problems, we made a small change to the existing solution by replacing Firehose and the lambda function with Flink.

flowchart LR
Source[Data Gateway] --> Kinesis[AWS Kinesis]
Kinesis --> Flink[Flink] 
Flink -- transform--> S3_1[Row Data]
S3_1 --> Spark[Daily processing] 
Spark -- deduplicate, format --> S3_2[Formated Data]

For stability and manageability, we consider the system like

  • Run the Flink cluster on K8S(EKS) with application mode
  • Enable HA and external state storage for fault tolerance
  • Use Datadog metrics reporter to expose the job metrics
  • Process the transformation of one record before storing it into S3
  • Once there is any processing failure, redirect the event to the error output folder by using the side-output feature.

The flink job graph will be

flowchart LR
kinesis[Kinesis Source] --> Formator[Format event] 
Formator --> Processor[Transform processing]
Processor --> SideOuput[Error Output on S3]
Formator --> SideOuput
Processor --> Destination[Destination on S3]

There are two problems here which cost time

  1. We need to use fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider to let the s3 client in Hadoop lib use the credential of the service account.
  2. The official datadog metrics reporter lib will not send tags, we have to make a custom reporter

After finishing this version, we realized that, although we have migrated the data transformation into Flink we still need to do deduplication and formating in a separated process, we may find some solution to simplify the whole pipeline design by doing everything by Flink.

There are two possible ways to archive the goal for us

  1. Use redis, mysql for deduplication, parquet format for final output.
  2. Use Apache Hudi which is a data platform tool to provide record level deduplication/update.

We compared the two solutions, finally, we chose Hudi with the below reasons

  • Hudi manages the existing index in rocksdb and provides the method to reload the index from existing data, which makes the application roll up easily.
  • Hudi manages ACID transactions, guarantees schema evolution, and some other features like z-ordering which may be helpful with the query performance in the future.
flowchart LR
Source[Data Gateway] --> Kinesis[AWS Kinesis]
Kinesis --> Flink[Hudi on Flink] 
Flink -- transform, deduplicate, format --> S3[Formatted Data]

This is what the system looks like.

Migrating the previous version of the Flink application into this is not so easy.

Big points need to pay attention with:

  1. compatibility matrix, Hudi only works with a specific version of Flink
    • Hudi 0.9.0 <==> Flink 1.12.x
    • Hudi 0.10.0 <==> Flink 1.13.x
  2. The fs.s3a.aws.credentials.provider config needs to be also defined in the core-site.xml, or Hudi writing process will fail.
  3. Offical Hudi on Flink doc only provides Flink table api usage, it will be hard to work with datastream api.
  4. There is a memory leak issue in Hudi 0.9.0 while compacting data, we recommend using 0.10.0
  5. Since Hudi 0.10.0 the timestamp of instant changed from yyyyMMddHHmmss to yyyyMMddHHmmssSSS for concurrent operation, but if you are using RedShift, it will lead to data invisible problem refer to https://github.com/apache/hudi/issues/4283

There are two table types in Hudi

  • MOR(Merge on Read), high performance on writing
    • Calculate index based on incoming data, decide to insert or update.
    • Write incoming data to Avro format (Row-based) first.
    • Create a compaction plan.
    • Compact the data in Avro files into Parquet files by creating a new version.
  • COW(Copy on Write)
    • Calculate index based on incoming data, decide to insert or update.
    • Update existing Parquet files by creating a new version.

We compared the two types of table based on the data we had, here is the config of the COW table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CREATE TABLE table (
schema STRING,
type STRING,
uuid STRING,
`time` STRING,
id STRING,
game_id STRING,
meta STRING,
custom_meta STRING,
dt STRING,
region STRING
)
PARTITIONED BY (`dt`, `game_id`, `region`)
WITH (
'connector' = 'hudi',
'path' = 's3a://xxxxxx',
'hoodie.embed.timeline.server' = 'true',
'write.precombine.field' = 'time',
'index.partition.regex' = 'dt=(2022-01-14|2022-01-13|2022-01-12).*',
'hoodie.filesystem.view.type' = 'MEMORY',
'hoodie.parquet.max.file.size' = '62914560',
'index.bootstrap.enabled' = 'true',
'hoodie.parquet.block.size' = '62914560',
'hoodie.metadata.enable' = 'false',
'hoodie.datasource.write.recordkey.field' = 'uuid,ctwid,appid,type',
'write.tasks' = '2',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'index.state.ttl' = '1.5D',
'write.bucket_assign.tasks' = '1',
'read.streaming.enabled' = 'false',
'table.type' = 'COPY_ON_WRITE',
'index.global.enabled' = 'false'
)

The flink job graph will be

flowchart LR
kinesis[Kinesis Source] --> Formator[Format event] 
Formator --> Processor[Transform processing]
Processor --> SideOuput[Error Output on S3]
Formator --> SideOuput
Processor --> IndexBootstrap[index_bootstrap]
IndexBootstrap --> BucketAssigner[bucket_assigner]
BucketAssigner --> HoodieStreamWriter[hoodie_stream_write]
HoodieStreamWriter --> CleanCommit[clean_commit]

The MOR table config will like below

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
CREATE TABLE table (
schema STRING,
type STRING,
uuid STRING,
`time` STRING,
id STRING,
game_id STRING,
meta STRING,
custom_meta STRING,
dt STRING,
region STRING
)
PARTITIONED BY (`dt`, `game_id`, `region`)
WITH (
'connector' = 'hudi',
'path' = 's3a://xxxxxx',
'hoodie.embed.timeline.server' = 'true',
'write.precombine.field' = 'time',
'index.partition.regex' = 'dt=(2022-01-14|2022-01-13|2022-01-12).*',
'hoodie.filesystem.view.type' = 'MEMORY',
'hoodie.parquet.max.file.size' = '62914560',
'index.bootstrap.enabled' = 'true',
'hoodie.parquet.block.size' = '62914560',
'hoodie.metadata.enable' = 'false',
'hoodie.datasource.write.recordkey.field' = 'uuid,id,game_id,type',
'compaction.delta_commits' = '0',
'write.tasks' = '2',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'index.state.ttl' = '1.5D',
'write.bucket_assign.tasks' = '1',
'read.streaming.enabled' = 'false',
'table.type' = 'MERGE_ON_READ',
'index.global.enabled' = 'false',
'compaction.tasks' = '2',
'compaction.async.enabled' = 'true',
'hoodie.compaction.strategy' = 'org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy'
)

The flink job graph will be

flowchart LR
kinesis[Kinesis Source] --> Formator[Format event] 
Formator --> Processor[Transform processing]
Processor --> SideOuput[Error Output on S3]
Formator --> SideOuput
Processor --> IndexBootstrap[index_bootstrap]
IndexBootstrap --> BucketAssigner[bucket_assigner]
BucketAssigner --> HoodieStreamWriter[hoodie_stream_write]
HoodieStreamWriter --> CompactPlan[compact_plan_generate]
CompactPlan --> CompactTask[compact_task]
CompactTask --> CompactCommit[compact_commit]

After tested with the configs we got below rough results

Mode Existing Partition Count Time cost per write(second) Compaction mode
MOR 10 13 Async
MOR 7000 120 Async
MOR 10 45 Sync
MOR 7000 125 Sync
COW 10 43 NaN
COW 7000 45 NaN

We expect that MOR should give us more performance with asynchronous compaction, but the result is against it. By seeking into the source code of Hudi, the root reason caused the problem is that, once Hudi creates the compaction plan, it will load all partition paths and filter by the compaction settings, in this case, the more partitions you have, the more time you will cost on listing them up, whatever you run compaction asynchronously or synchronously.

According to Hudi doc, we can enable metatable to maintain a list of files in storage to avoid time cost on listing all partitions, but in our case, we choose the COW table because it shows the total time cost per checkpoint clearly.

Although Hudi on Flink is really fast to deliver data into S3, there still some tricks to make it faster on high load

  1. reduce hoodie.parquet.max.file.size and hoodie.parquet.block.size from 110 MB to less, it may affect query performance, but according to our experience, one file around 100 MB on S3, will cost about 25 - 30 seconds to be appended/upserted, reduce file size will help to cut down the maximum latency of processing time.

  2. Since Hudi 0.10.0, the default TTL of index becomes forever, changes to a reasonable value, and set the flink config state.backend.incremental: true to reduce the extra file copy within a checkpoint

About the detail of performance

As we are using

  • cpu: 4 cores
  • memory: 12 GB
  • kinesis shards: 4


According to the weekly graph, the pipeline can handle 100 - 250 TPS with stable latency,

Once testing with consuming remaining data, we can see the throughput was close to the kinesis reading capacity (4 * 1000 records per second)

When we look deeply into the checkpoint duration detail, we can see as the current file size increases, the processing time will also increase, but once a new file starts to be written, the processing time will decrease rapidly.

Summary

By using Hudi on Flink, we can synchronize the separated data processing logic, according to the performance testing result, the application is fast enough. Different from Hudi on Spark, Hudi on Flink will maintain state storage to record the index of records, which saves a lot of time consumption of loading index, but this is also a double-edged sword, depending on different business needs. In short, the reasonable use of Hudi on Flink solves our business problems in one shot.