The near real-time data pipeline solution
The existing data pipeline
As a game platform, g123 consumes lots of kinds of event data created from games.
Previously we used Firehose and Lambda to process the data transformation and dump the events into S3, since there may be duplicated events been produced by the client-side, we also need to have a deduplication and formating layer after the raw data. The workflow will look like the graph below.
flowchart LR Source[Data Gateway] --> Kinesis[AWS Kinesis] Kinesis --> Lambda[Data transformation Lambdas] Lambda --> Kinesis Kinesis --> Firehose[AWS Firehose] Firehose -- write as compressed raw data --> S3[Raw Data] S3 --> Spark[Daily processing] Spark -- deduplicate, format --> S3_2[Formated Data]
In this solution we mainly used AWS-managed features, the code that needs to be maintained was only the code of Lambda functions, it helped us to deliver the pipeline as fast as possible.
Although this solution works pretty good in the past year, we still facing serval problems:
The process of data transformation is asynchronous, we can hardly guarantee the processing of one particular event has been finished when we are able to query the data unless we create one more kinesis after the lambda function.
Although the Kinesis Data Firehose provides us the functionality to dynamically partition the data if we need more complex partitioning logic we need to create lambda functions, and may take the resource quota from other services.
To solve the problems, we made a small change to the existing solution by replacing Firehose and the lambda function with Flink.
New data pipeline build with Flink
flowchart LR Source[Data Gateway] --> Kinesis[AWS Kinesis] Kinesis --> Flink[Flink] Flink -- transform--> S3_1[Row Data] S3_1 --> Spark[Daily processing] Spark -- deduplicate, format --> S3_2[Formated Data]
For stability and manageability, we consider the system like
- Run the Flink cluster on K8S(EKS) with application mode
- Enable HA and external state storage for fault tolerance
- Use Datadog metrics reporter to expose the job metrics
- Process the transformation of one record before storing it into S3
- Once there is any processing failure, redirect the event to the error output folder by using the side-output feature.
The flink job graph will be
flowchart LR kinesis[Kinesis Source] --> Formator[Format event] Formator --> Processor[Transform processing] Processor --> SideOuput[Error Output on S3] Formator --> SideOuput Processor --> Destination[Destination on S3]
There are two problems here which cost time
- We need to use
fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
to let the s3 client in Hadoop lib use the credential of the service account. - The official datadog metrics reporter lib will not send tags, we have to make a custom reporter
- We referred to the logic here https://github.com/aroch/flink-metrics-dogstatsd/blob/master/src/main/java/com/aroch/flink/metrics/dogstatsd/DogStatsDReporter.java#L229-L230
- It’s better to make a separate jar file for the reporter since the class should put in the
{YOUR_PATH}/flink/lib
not{YOUR_PATH}/flink/userlib
After finishing this version, we realized that, although we have migrated the data transformation into Flink we still need to do deduplication and formating in a separated process, we may find some solution to simplify the whole pipeline design by doing everything by Flink.
There are two possible ways to archive the goal for us
- Use redis, mysql for deduplication, parquet format for final output.
- Use Apache Hudi which is a data platform tool to provide record level deduplication/update.
We compared the two solutions, finally, we chose Hudi with the below reasons
- Hudi manages the existing index in rocksdb and provides the method to reload the index from existing data, which makes the application roll up easily.
- Hudi manages ACID transactions, guarantees schema evolution, and some other features like z-ordering which may be helpful with the query performance in the future.
New data pipeline built with Hudi on Flink
flowchart LR Source[Data Gateway] --> Kinesis[AWS Kinesis] Kinesis --> Flink[Hudi on Flink] Flink -- transform, deduplicate, format --> S3[Formatted Data]
This is what the system looks like.
Migrating the previous version of the Flink application into this is not so easy.
Big points need to pay attention with:
- compatibility matrix, Hudi only works with a specific version of Flink
- Hudi 0.9.0 <==> Flink 1.12.x
- Hudi 0.10.0 <==> Flink 1.13.x
- The
fs.s3a.aws.credentials.provider
config needs to be also defined in thecore-site.xml
, or Hudi writing process will fail. - Offical Hudi on Flink doc only provides Flink table api usage, it will be hard to work with datastream api.
- There is a memory leak issue in Hudi 0.9.0 while compacting data, we recommend using 0.10.0
- Since Hudi 0.10.0 the timestamp of instant changed from
yyyyMMddHHmmss
toyyyyMMddHHmmssSSS
for concurrent operation, but if you are using RedShift, it will lead to data invisible problem refer to https://github.com/apache/hudi/issues/4283
There are two table types in Hudi
- MOR(Merge on Read), high performance on writing
- Calculate index based on incoming data, decide to insert or update.
- Write incoming data to Avro format (Row-based) first.
- Create a compaction plan.
- Compact the data in Avro files into Parquet files by creating a new version.
- COW(Copy on Write)
- Calculate index based on incoming data, decide to insert or update.
- Update existing Parquet files by creating a new version.
We compared the two types of table based on the data we had, here is the config of the COW table1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33CREATE TABLE table (
schema STRING,
type STRING,
uuid STRING,
`time` STRING,
id STRING,
game_id STRING,
meta STRING,
custom_meta STRING,
dt STRING,
region STRING
)
PARTITIONED BY (`dt`, `game_id`, `region`)
WITH (
'connector' = 'hudi',
'path' = 's3a://xxxxxx',
'hoodie.embed.timeline.server' = 'true',
'write.precombine.field' = 'time',
'index.partition.regex' = 'dt=(2022-01-14|2022-01-13|2022-01-12).*',
'hoodie.filesystem.view.type' = 'MEMORY',
'hoodie.parquet.max.file.size' = '62914560',
'index.bootstrap.enabled' = 'true',
'hoodie.parquet.block.size' = '62914560',
'hoodie.metadata.enable' = 'false',
'hoodie.datasource.write.recordkey.field' = 'uuid,ctwid,appid,type',
'write.tasks' = '2',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'index.state.ttl' = '1.5D',
'write.bucket_assign.tasks' = '1',
'read.streaming.enabled' = 'false',
'table.type' = 'COPY_ON_WRITE',
'index.global.enabled' = 'false'
)
The flink job graph will be
flowchart LR kinesis[Kinesis Source] --> Formator[Format event] Formator --> Processor[Transform processing] Processor --> SideOuput[Error Output on S3] Formator --> SideOuput Processor --> IndexBootstrap[index_bootstrap] IndexBootstrap --> BucketAssigner[bucket_assigner] BucketAssigner --> HoodieStreamWriter[hoodie_stream_write] HoodieStreamWriter --> CleanCommit[clean_commit]
The MOR table config will like below1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37CREATE TABLE table (
schema STRING,
type STRING,
uuid STRING,
`time` STRING,
id STRING,
game_id STRING,
meta STRING,
custom_meta STRING,
dt STRING,
region STRING
)
PARTITIONED BY (`dt`, `game_id`, `region`)
WITH (
'connector' = 'hudi',
'path' = 's3a://xxxxxx',
'hoodie.embed.timeline.server' = 'true',
'write.precombine.field' = 'time',
'index.partition.regex' = 'dt=(2022-01-14|2022-01-13|2022-01-12).*',
'hoodie.filesystem.view.type' = 'MEMORY',
'hoodie.parquet.max.file.size' = '62914560',
'index.bootstrap.enabled' = 'true',
'hoodie.parquet.block.size' = '62914560',
'hoodie.metadata.enable' = 'false',
'hoodie.datasource.write.recordkey.field' = 'uuid,id,game_id,type',
'compaction.delta_commits' = '0',
'write.tasks' = '2',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'index.state.ttl' = '1.5D',
'write.bucket_assign.tasks' = '1',
'read.streaming.enabled' = 'false',
'table.type' = 'MERGE_ON_READ',
'index.global.enabled' = 'false',
'compaction.tasks' = '2',
'compaction.async.enabled' = 'true',
'hoodie.compaction.strategy' = 'org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy'
)
The flink job graph will be
flowchart LR kinesis[Kinesis Source] --> Formator[Format event] Formator --> Processor[Transform processing] Processor --> SideOuput[Error Output on S3] Formator --> SideOuput Processor --> IndexBootstrap[index_bootstrap] IndexBootstrap --> BucketAssigner[bucket_assigner] BucketAssigner --> HoodieStreamWriter[hoodie_stream_write] HoodieStreamWriter --> CompactPlan[compact_plan_generate] CompactPlan --> CompactTask[compact_task] CompactTask --> CompactCommit[compact_commit]
After tested with the configs we got below rough results
Mode | Existing Partition Count | Time cost per write(second) | Compaction mode |
---|---|---|---|
MOR | 10 | 13 | Async |
MOR | 7000 | 120 | Async |
MOR | 10 | 45 | Sync |
MOR | 7000 | 125 | Sync |
COW | 10 | 43 | NaN |
COW | 7000 | 45 | NaN |
We expect that MOR should give us more performance with asynchronous compaction, but the result is against it. By seeking into the source code of Hudi, the root reason caused the problem is that, once Hudi creates the compaction plan, it will load all partition paths and filter by the compaction settings, in this case, the more partitions you have, the more time you will cost on listing them up, whatever you run compaction asynchronously or synchronously.
According to Hudi doc, we can enable metatable to maintain a list of files in storage to avoid time cost on listing all partitions, but in our case, we choose the COW table because it shows the total time cost per checkpoint clearly.
Although Hudi on Flink is really fast to deliver data into S3, there still some tricks to make it faster on high load
reduce
hoodie.parquet.max.file.size
andhoodie.parquet.block.size
from 110 MB to less, it may affect query performance, but according to our experience, one file around 100 MB on S3, will cost about 25 - 30 seconds to be appended/upserted, reduce file size will help to cut down the maximum latency of processing time.Since Hudi 0.10.0, the default TTL of index becomes
forever
, changes to a reasonable value, and set the flink configstate.backend.incremental: true
to reduce the extra file copy within a checkpoint
About the detail of performance
As we are using
- cpu: 4 cores
- memory: 12 GB
- kinesis shards: 4
According to the weekly graph, the pipeline can handle 100 - 250 TPS with stable latency,
Once testing with consuming remaining data, we can see the throughput was close to the kinesis reading capacity (4 * 1000 records per second)
When we look deeply into the checkpoint duration detail, we can see as the current file size increases, the processing time will also increase, but once a new file starts to be written, the processing time will decrease rapidly.
Summary
By using Hudi on Flink, we can synchronize the separated data processing logic, according to the performance testing result, the application is fast enough. Different from Hudi on Spark, Hudi on Flink will maintain state storage to record the index of records, which saves a lot of time consumption of loading index, but this is also a double-edged sword, depending on different business needs. In short, the reasonable use of Hudi on Flink solves our business problems in one shot.