Friday, October 7, 2022
HomeBig DataIngest streaming information to Apache Hudi tables utilizing AWS Glue and Apache...

Ingest streaming information to Apache Hudi tables utilizing AWS Glue and Apache Hudi DeltaStreamer

In at this time’s world with expertise modernization, the necessity for near-real-time streaming use circumstances has elevated exponentially. Many purchasers are repeatedly consuming information from totally different sources, together with databases, functions, IoT units, and sensors. Organizations might have to ingest that streaming information into information lakes constructed on Amazon Easy Storage Service (Amazon S3). You might also want to attain analytics and machine studying (ML) use circumstances in near-real time. To make sure constant leads to these near-real-time streaming use circumstances, incremental information ingestion and atomicity, consistency, isolation, and sturdiness (ACID) properties on information lakes have been a typical ask.

To handle such use circumstances, one method is to make use of Apache Hudi and its DeltaStreamer utility. Apache Hudi is an open-source information administration framework designed for information lakes. It simplifies incremental information processing by enabling ACID transactions and record-level inserts, updates, and deletes of streaming ingestion on information lakes constructed on high of Amazon S3. Hudi is built-in with well-known open-source massive information analytics frameworks, resembling Apache Spark, Apache Hive, Presto, and Trino, in addition to with varied AWS analytics companies like AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift. The DeltaStreamer utility supplies a simple option to ingest streaming information from sources like Apache Kafka into your information lake.

This publish describes learn how to run the DeltaStreamer utility on AWS Glue to learn streaming information from Amazon Managed Streaming for Apache Kafka (Amazon MSK) and ingest the info into S3 information lakes. AWS Glue is a serverless information integration service that makes it straightforward to find, put together, and mix information for analytics, ML, and utility improvement. With AWS Glue, you possibly can create Spark, Spark Streaming, and Python shell jobs to extract, rework, and cargo (ETL) information. You may create AWS Glue Spark streaming ETL jobs utilizing both Scala or PySpark that run repeatedly, consuming information from Amazon MSK, Apache Kafka, and Amazon Kinesis Knowledge Streams and writing it to your goal.

Answer overview

To reveal the DeltaStreamer utility, we use fictional product information that represents product stock together with product title, class, amount, and final up to date timestamp. Let’s assume we stream the info from information sources to an MSK subject. Now we wish to ingest this information coming from the MSK subject into Amazon S3 in order that we are able to run Athena queries to investigate enterprise traits in near-real time.

The next diagram supplies the general structure of the answer described on this publish.

To simulate utility visitors, we use Amazon Elastic Compute Cloud (Amazon EC2) to ship pattern information to an MSK subject. Amazon MSK is a totally managed service that makes it straightforward to construct and run functions that use Apache Kakfa to course of streaming information. To eat the streaming information from Amazon MSK, we arrange an AWS Glue streaming ETL job that makes use of the Apache Hudi Connector 0.10.1 for AWS Glue 3.0, with the DeltaStreamer utility to jot down the ingested information to Amazon S3. The Apache Hudi Connector 0.9.0 for AWS Glue 3.0 additionally helps the DeltaStreamer utility.

As the info is being ingested, the AWS Glue streaming job writes the info into the Amazon S3 base path. The information in Amazon S3 is cataloged utilizing the AWS Glue Knowledge Catalog. We then use Athena, which is an interactive question service, to question and analyze the info utilizing commonplace SQL.


We use an AWS CloudFormation template to provision some sources for our answer. The template requires you to pick out an EC2 key pair. This secret’s configured on an EC2 occasion that lives within the public subnet. We use this EC2 occasion to ingest information to the MSK cluster operating in a non-public subnet. Be sure you have a key within the AWS Area the place you deploy the template. For those who don’t have one, you possibly can create a brand new key pair.

Create the Apache Hudi connection

So as to add the Apache Hudi Connector for AWS Glue, full the next steps:

  1. On the AWS Glue Studio console, select Connectors.
  2. Select Go to AWS Market.
  3. Seek for and select Apache Hudi Connector for AWS Glue.
  4. Select Proceed to Subscribe.
  5. Evaluate the phrases and situations, then select Settle for Phrases.

    After you settle for the phrases, it takes a while to course of the request.
    When the subscription is full, you see the Efficient date populated subsequent to the product.
  6. Select Proceed to Configuration.
  7. For Success choice, select Glue 3.0.
  8. For Software program model, select 0.10.1.
  9. Select Proceed to Launch.
  10. Select Utilization directions, after which select Activate the Glue connector from AWS Glue Studio.

    You’re redirected to AWS Glue Studio.
  11. For Title, enter Hudi-Glue-Connector.
  12. Select Create connection and activate connector.

A message seems that the connection was efficiently created. Confirm that the connection is seen on the AWS Glue Studio console.

Launch the CloudFormation stack

For this publish, we offer a CloudFormation template to create the next sources:

  • VPC, subnets, safety teams, and VPC endpoints
  • AWS Id and Entry Administration (IAM) roles and insurance policies with required permissions
  • An EC2 occasion operating in a public subnet throughout the VPC with Kafka 2.12 put in and with the supply information preliminary load and supply information incremental load JSON recordsdata
  • An Amazon MSK server operating in a non-public subnet throughout the VPC
  • An AWS Glue Streaming DeltaStreamer job to eat the incoming information from the Kafka subject and write it to Amazon S3
  • Two S3 buckets: one of many buckets shops code and config recordsdata, and different is the goal for the AWS Glue streaming DeltaStreamer job

To create the sources, full the next steps:

  1. Select Launch Stack:
  2. For Stack title, enter hudi-deltastreamer-glue-blog.
  3. For ClientIPCIDR, enter the IP deal with of your consumer that you simply use to hook up with the EC2 occasion.
  4. For HudiConnectionName, enter the AWS Glue connection you created earlier (Hudi-Glue-Connector).
  5. For KeyName, select the title of the EC2 key pair that you simply created as a prerequisite.
  6. For VpcCIDR, depart as is.
  7. Select Subsequent.
  8. Select Subsequent.
  9. On the Evaluate web page, choose I acknowledge that AWS CloudFormation may create IAM sources with customized names.
  10. Select Create stack.

After the CloudFormation template is full and the sources are created, the Outputs tab reveals the next data:

  • HudiDeltastreamerGlueJob – The AWS Glue streaming job title
  • MSKCluster – The MSK cluster ARN
  • PublicIPOfEC2InstanceForTunnel – The general public IP of the EC2 occasion for tunnel
  • TargetS3Bucket – The S3 bucket title

Create a subject within the MSK cluster

Subsequent, SSH to Amazon EC2 utilizing the important thing pair you created and run the next instructions:

  1. SSH to the EC2 occasion as ec2-user:
    ssh -i <KeyName> ec2-user@<PublicIPOfEC2InstanceForTunnel>

    You may get the KeyName worth on the Parameters tab and the general public IP of the EC2 occasion for tunnel on the Outputs tab of the CloudFormation stack.

  2. For the following command, retrieve the bootstrap server endpoint of the MSK cluster by navigating to msk-source-cluster on the Amazon MSK console and selecting View consumer data.
  3. Run the next command to create the subject within the MSK cluster hudi-deltastream-demo:
    ./kafka_2.12-2.6.2/bin/ --create 
    --topic hudi-deltastream-demo 
    --bootstrap-server "<change textual content with worth underneath non-public endpoint on MSK>" 
    --partitions 1 
    --replication-factor 2 
    --command-config ./config_file.txt

  4. Ingest the preliminary information from the deltastreamer_initial_load.json file into the Kafka subject:
    --broker-list "<change textual content with worth underneath non-public endpoint on MSK>" 
    --topic hudi-deltastream-demo 
    --producer.config ./config_file.txt < deltastreamer_initial_load.json

The next is the schema of a report ingested into the Kafka subject:

     "name": "id",
     "type": "int"
  }, {
     "name": "category",
     "type": "string"
  }, {
     "name": "ts",
     "type": "string"
     "name": "name",
     "type": "string"
     "name": "quantity",
     "type": "int"

The schema makes use of the next parameters:

  • id – The product ID
  • class – The product class
  • ts – The timestamp when the report was inserted or final up to date
  • title – The product title
  • amount – The accessible amount of the product within the stock

The next code provides an instance of a report:

    "id": 1, 
    "class": "Attire", 
    "ts": "2022-01-02 10:29:00", 
    "title": "ABC shirt", 
    "amount": 4

Begin the AWS Glue streaming job

To start out the AWS Glue streaming job, full the next steps:

  1. On the AWS Glue Studio console, discover the job with the worth for HudiDeltastreamerGlueJob.
  2. Select the job to assessment the script and job particulars.
  3. On the Job particulars tab, change the worth of the --KAFKA_BOOTSTRAP_SERVERS key with the Amazon MSK bootstrap server’s non-public endpoint.
  4. Select Save to save lots of the job settings.
  5. Select Run to start out the job.

When the AWS Glue streaming job runs, the information from the MSK subject are consumed and written to the goal S3 bucket created by AWS CloudFormation. To search out the bucket title, verify the stack’s Outputs tab for the TargetS3Bucket key worth.

The information in Amazon S3 is saved in Parquet file format. On this instance, the info written to Amazon S3 isn’t partitioned, however you possibly can allow partitioning by specifying hoodie.datasource.write.partitionpath.subject=<column_name> because the partition subject and setting hoodie.datasource.write.hive_style_partitioning to True within the Hudi configuration property within the AWS Glue job script.

On this publish, we write the info to a non-partitioned desk, so we set the next two Hudi configurations:

  • hoodie.datasource.hive_sync.partition_extractor_class is ready to org.apache.hudi.hive.NonPartitionedExtractor
  • hoodie.datasource.write.keygenerator.class is ready to org.apache.hudi.keygen.NonpartitionedKeyGenerator

DeltaStreamer choices and configuration

DeltaStreamer has a number of choices accessible; the next are the choices set within the AWS Glue streaming job used on this publish:

  • steady – DeltaStreamer runs in steady mode operating source-fetch.
  • enable-hive-sync – Permits desk sync to the Apache Hive Metastore.
  • schemaprovider-class – Defines the category for the schema supplier to connect schemas to the enter and goal desk information.
  • source-class – Defines the supply class to learn information and has many built-in choices.
  • source-ordering-field – The sector used to interrupt ties between information with the identical key in enter information. Defaults to ts (the Unix timestamp of report).
  • target-base-path – Defines the trail for the goal Hudi desk.
  • table-type – Signifies the Hudi storage kind to make use of. On this publish, it’s set to COPY_ON_WRITE.

The next are a few of the vital DeltaStreamer configuration properties set within the AWS Glue streaming job:

# Schema supplier props (change to absolute path primarily based in your set up)" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc
hoodie.deltastreamer.schemaprovider.goal.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc

# Kafka Supply

#Kafka props

The configuration incorporates the next particulars:

  • – The schema of the supply report
  • hoodie.deltastreamer.schemaprovider.goal.schema.file – The schema for the goal report.
  • – The supply MSK subject title
  • bootstap.servers – The Amazon MSK bootstrap server’s non-public endpoint
  • auto.offset.reset – The patron’s conduct when there is no such thing as a dedicated place or when an offset is out of vary

Hudi configuration

The next are a few of the vital Hudi configuration choices, which allow us to attain in-place updates for the generated schema:

  • hoodie.datasource.write.recordkey.subject – The report key subject. That is the distinctive identifier of a report in Hudi.
  • hoodie.datasource.write.precombine.subject – When two information have the identical report key worth, Apache Hudi picks the one with the most important worth for the pre-combined subject.
  • hoodie.datasource.write.operation – The operation on the Hudi dataset. Doable values embody UPSERT, INSERT, and BULK_INSERT.

AWS Glue Knowledge Catalog desk

The AWS Glue job creates a Hudi desk within the Knowledge Catalog mapped to the Hudi dataset on Amazon S3. As a result of the hoodie.datasource.hive_sync.desk configuration parameter is ready to product_table, the desk is seen underneath the default database within the Knowledge Catalog.

The next screenshot reveals the Hudi desk column names within the Knowledge Catalog.

Question the info utilizing Athena

With the Hudi datasets accessible in Amazon S3, you possibly can question the info utilizing Athena. Let’s use the next question:

SELECT * FROM "default"."product_table";

The next screenshot reveals the question output. The desk product_table has 4 information from the preliminary ingestion: two information for the class Attire, one for Cosmetics, and one for Footwear.

Load incremental information into the Kafka subject

Now suppose that the shop offered some amount of attire and footwear and added a brand new product to its stock, as proven within the following code. The shop offered two gadgets of product ID 1 (Attire) and one merchandise of product ID 3 (Footwear). The shop additionally added the Cosmetics class, with product ID 5.

{"id": 1, "class": "Attire", "ts": "2022-01-02 10:45:00", "title": "ABC shirt", "amount": 2}
{"id": 3, "class": "Footwear", "ts": "2022-01-02 10:50:00", "title": "DEF footwear", "amount": 5}
{"id": 5, "class": "Cosmetics", "ts": "2022-01-02 10:55:00", "title": "JKL Lip gloss", "amount": 7}

Let’s ingest the incremental information from the deltastreamer_incr_load.json file to the Kafka subject and question the info from Athena:

--broker-list "<change textual content with worth underneath non-public endpoint on MSK>" 
--topic hudi-deltastream-demo 
--producer.config ./config_file.txt < deltastreamer_incr_load.json

Inside a number of seconds, you must see a brand new Parquet file created within the goal S3 bucket underneath the product_table prefix. The next is the screenshot from Athena after the incremental information ingestion displaying the most recent updates.

Extra issues

There are some hard-coded Hudi choices within the AWS Glue Streaming job scripts. These choices are set for the pattern desk that we created for this publish, so replace the choices primarily based in your workload.

Clear up

To keep away from any incurring future fees, delete the CloudFormation stack, which deletes all of the underlying sources created by this publish, apart from the product_table desk created within the default database. Manually delete the product_table desk underneath the default database from the Knowledge Catalog.


On this publish, we illustrated how one can add the Apache Hudi Connector for AWS Glue and carry out streaming ingestion into an S3 information lake utilizing Apache Hudi DeltaStreamer with AWS Glue. You should utilize the Apache Hudi Connector for AWS Glue to create a serverless streaming pipeline utilizing AWS Glue streaming jobs with the DeltaStreamer utility to ingest information from Kafka. We demonstrated this by studying the most recent up to date information utilizing Athena in near-real time.

As all the time, AWS welcomes suggestions. You probably have any feedback or questions on this publish, please share them within the feedback.

In regards to the authors

Vishal Pathak is a Knowledge Lab Options Architect at AWS. Vishal works with prospects on their use circumstances, architects options to resolve their enterprise issues, and helps them construct scalable prototypes. Previous to his journey in AWS, Vishal helped prospects implement enterprise intelligence, information warehouse, and information lake initiatives within the US and Australia.

Noritaka Sekiyama is a Principal Huge Knowledge Architect on the AWS Glue workforce. He enjoys studying totally different use circumstances from prospects and sharing data about massive information applied sciences with the broader neighborhood.

Anand Prakash is a Senior Options Architect at AWS Knowledge Lab. Anand focuses on serving to prospects design and construct AI/ML, information analytics, and database options to speed up their path to manufacturing.



Please enter your comment!
Please enter your name here

Most Popular

Recent Comments