Skip to content

Data Pipeline for CDC data from MySQL DB to Amazon Kinesis Data Streams through Amazon Kinesis using Amazon DMS Serverless

License

Notifications You must be signed in to change notification settings

aws-samples/aws-dms-serverless-to-kinesis-data-pipeline

Streaming Data to Amazon Kinesis Data Streams via AWS DMS Serverless

dms_serverless-mysql-to-kinesis-arch

This is a data pipeline project using AWS DMS Serverless for Python development with CDK.

The cdk.json file tells the CDK Toolkit how to execute your app.

This project is set up like a standard Python project. The initialization process also creates a virtualenv within this project, stored under the .venv directory. To create the virtualenv it assumes that there is a python3 (or python for Windows) executable in your path with access to the venv package. If for any reason the automatic creation of the virtualenv fails, you can create the virtualenv manually.

To manually create a virtualenv on MacOS and Linux:

$ python3 -m venv .venv

After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.

$ source .venv/bin/activate

If you are a Windows platform, you would activate the virtualenv like this:

% .venv\Scripts\activate.bat

Once the virtualenv is activated, you can install the required dependencies.

(.venv) $ pip install -r requirements.txt

To add additional dependencies, for example other CDK libraries, just add them to your setup.py file and rerun the pip install -r requirements.txt command.

Prerequisites

Set up cdk.context.json

Then, before deploying the CloudFormation, you should set approperly the cdk context configuration file, cdk.context.json.

For example,

{
  "db_cluster_name": "db-cluster-name",
  "dms_data_source": {
    "database_name": "testdb",
    "table_name": "retail_trans"
  },
  "kinesis_stream_name": "your-dms-target-kinesis-stream-name"
}

Bootstrap AWS environment for AWS CDK app

Also, before any AWS CDK app can be deployed, you have to bootstrap your AWS environment to create certain AWS resources that the AWS CDK CLI (Command Line Interface) uses to deploy your AWS CDK app.

Run the cdk bootstrap command to bootstrap the AWS environment.

(.venv) $ cdk bootstrap

Now you can deploy the CloudFormation template for this code.

List all CDK Stacks

(.venv) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
(.venv) $ export CDK_DEFAULT_REGION=$(aws configure get region)
(.venv) $ cdk list
DMSAuroraMysqlToKDSVPCStack
AuroraMysqlStack
AuroraMysqlBastionHost
DMSTargetKinesisDataStreamStack
DMSRequiredIAMRolesStack
DMSServerlessAuroraMysqlToKDSStack

At this point you can now synthesize the CloudFormation template for this code.

(.venv) $ cdk synth --all

We can provision each CDK stack shown above one at a time like this:

Create Aurora MySQL cluster

  (.venv) $ cdk deploy DMSAuroraMysqlToKDSVPCStack AuroraMysqlStack AuroraMysqlBastionHost
  

Confirm that binary logging is enabled

In order to set up the Aurora MySQL, you need to connect the Aurora MySQL cluster on an EC2 Bastion host.

ℹ️ The Aurora MySQL username and password are stored in the AWS Secrets Manager as a name such as DatabaseSecret-xxxxxxxxxxxx.

To retrieve a secret (AWS console)

  • (Step 1) Open the Secrets Manager console at https://console.aws.amazon.com/secretsmanager/.
  • (Step 2) In the list of secrets, choose the secret you want to retrieve.
  • (Step 3) In the Secret value section, choose Retrieve secret value.
    Secrets Manager displays the current version (AWSCURRENT) of the secret. To see other versions of the secret, such as AWSPREVIOUS or custom labeled versions, use the AWS CLI.

To confirm that binary logging is enabled

  1. Connect to the Aurora cluster writer node.

     $ BASTION_HOST_ID=$(aws cloudformation describe-stacks --stack-name AuroraMysqlBastionHost | \
     jq -r '.Stacks[0].Outputs | .[] | select(.OutputKey | endswith("EC2InstanceId")) | .OutputValue')
    
     $ aws ec2-instance-connect ssh --instance-id ${BASTION_HOST_ID} --os-user ec2-user
    
     [ec2-user@ip-172-31-7-186 ~]$ mysql -hdb-cluster-name.cluster-xxxxxxxxxxxx.region-name.rds.amazonaws.com -uadmin -p
     Enter password:
     Welcome to the MySQL monitor.  Commands end with ; or \g.
     Your MySQL connection id is 947748268
     Server version: 5.7.12-log MySQL Community Server (GPL)
    
     Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
    
     Oracle is a registered trademark of Oracle Corporation and/or its
     affiliates. Other names may be trademarks of their respective
     owners.
    
     Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
    
     MySQL [(none)]>
    

    ℹ️ AuroraMysqlBastionHost is a CDK Stack to create the bastion host.

    ℹ️ You can connect to an EC2 instance using the EC2 Instance Connect CLI: aws ec2-instance-connect ssh. For more information, see Connect using the EC2 Instance Connect CLI.

  2. At SQL prompt run the below command to confirm that binary logging is enabled:

     MySQL [(none)]> SHOW GLOBAL VARIABLES LIKE "log_bin";
      --------------- ------- 
     | Variable_name | Value |
      --------------- ------- 
     | log_bin       | ON    |
      --------------- ------- 
     1 row in set (0.00 sec)
    
  3. Also run this to AWS DMS has bin log access that is required for replication

     MySQL [(none)]> CALL mysql.rds_set_configuration('binlog retention hours', 24);
     Query OK, 0 rows affected (0.01 sec)
    

Create a sample database and table

  1. Run the below command to create the sample database named testdb.
     MySQL [(none)]> SHOW DATABASES;
      -------------------- 
     | Database           |
      -------------------- 
     | information_schema |
     | mysql              |
     | performance_schema |
     | sys                |
      -------------------- 
     4 rows in set (0.00 sec)
    
     MySQL [(none)]> CREATE DATABASE IF NOT EXISTS testdb;
     Query OK, 1 row affected (0.01 sec)
    
     MySQL [(none)]> USE testdb;
     Database changed
     MySQL [testdb]> SHOW TABLES;
     Empty set (0.00 sec)
    
  2. Also run this to create the sample table named retail_trans
     MySQL [testdb]> CREATE TABLE IF NOT EXISTS testdb.retail_trans (
              trans_id BIGINT(20) AUTO_INCREMENT,
              customer_id VARCHAR(12) NOT NULL,
              event VARCHAR(10) DEFAULT NULL,
              sku VARCHAR(10) NOT NULL,
              amount INT DEFAULT 0,
              device VARCHAR(10) DEFAULT NULL,
              trans_datetime DATETIME DEFAULT CURRENT_TIMESTAMP,
              PRIMARY KEY(trans_id),
              KEY(trans_datetime)
            ) ENGINE=InnoDB AUTO_INCREMENT=0;
     Query OK, 0 rows affected, 1 warning (0.04 sec)
    
     MySQL [testdb]> SHOW TABLES;
      ------------------ 
     | Tables_in_testdb |
      ------------------ 
     | retail_trans     |
      ------------------ 
     1 row in set (0.00 sec)
    
     MySQL [testdb]> DESC retail_trans;
      ---------------- ------------- ------ ----- ------------------- ------------------- 
     | Field          | Type        | Null | Key | Default           | Extra             |
      ---------------- ------------- ------ ----- ------------------- ------------------- 
     | trans_id       | bigint      | NO   | PRI | NULL              | auto_increment    |
     | customer_id    | varchar(12) | NO   |     | NULL              |                   |
     | event          | varchar(10) | YES  |     | NULL              |                   |
     | sku            | varchar(10) | NO   |     | NULL              |                   |
     | amount         | int         | YES  |     | 0                 |                   |
     | device         | varchar(10) | YES  |     | NULL              |                   |
     | trans_datetime | datetime    | YES  | MUL | CURRENT_TIMESTAMP | DEFAULT_GENERATED |
      ---------------- ------------- ------ ----- ------------------- ------------------- 
     7 rows in set (0.00 sec)
    
     MySQL [testdb]>
    

After setting up the Aurora MySQL, you should come back to the terminal where you are deploying stacks.

Create Amazon Kinesis Data Streams for AWS DMS target endpoint

  (.venv) $ cdk deploy DMSTargetKinesisDataStreamStack
  

Create AWS DMS Replication Task

In the previous step we already created the sample database (i.e. testdb) and table (retail_trans).

Now let's create a migration task.

  (.venv) $ cdk deploy DMSRequiredIAMRolesStack DMSServerlessAuroraMysqlToKDSStack
  

Run Test

  1. Start the DMS Replication task by replacing the ARN in below command.

    (.venv) $ DMS_REPLICATION_CONFIG_ARN=$(aws cloudformation describe-stacks --stack-name DMSServerlessAuroraMysqlToKDSStack \
    | jq -r '.Stacks[0].Outputs | map(select(.OutputKey == "DMSReplicationConfigArn")) | .[0].OutputValue')
    (.venv) $ aws dms start-replication \
                      --replication-config-arn ${DMS_REPLICATION_CONFIG_ARN} \
                      --start-replication-type start-replication
    
  2. Generate test data.

     $ BASTION_HOST_ID=$(aws cloudformation describe-stacks --stack-name AuroraMysqlBastionHost \
     | jq -r '.Stacks[0].Outputs | .[] | select(.OutputKey | endswith("EC2InstanceId")) |.OutputValue')
    
     $ aws ec2-instance-connect ssh --instance-id ${BASTION_HOST_ID} --os-user ec2-user
    
     [ec2-user@ip-172-31-7-186 ~]$ cat <<EOF >requirements-dev.txt
     > boto3
     > dataset==1.5.2
     > Faker==13.3.1
     > PyMySQL==1.0.2
     > EOF
     [ec2-user@ip-172-31-7-186 ~]$ pip install -r requirements-dev.txt
     [ec2-user@ip-172-31-7-186 ~]$ python3 gen_fake_mysql_data.py \
                    --database your-database-name \
                    --table your-table-name \
                    --user user-name \
                    --password password \
                    --host db-cluster-name.cluster-xxxxxxxxxxxx.region-name.rds.amazonaws.com \
                    --max-count 200
    
  3. Check the Data Viewer in the Amazon Kinesis Management Console and you can see incomming records. amazon-kinesis-data-viewer

    • Insert
       {
          "data": {
             "trans_id": 6,
             "customer_id": "387378799012",
             "event": "list",
             "sku": "AI6161BEFX",
             "amount": 1,
             "device": "pc",
             "trans_datetime": "2023-01-16T06:18:32Z"
          },
          "metadata": {
             "timestamp": "2023-01-16T06:25:34.444953Z",
             "record-type": "data",
             "operation": "insert",
             "partition-key-type": "primary-key",
             "schema-name": "testdb",
             "table-name": "retail_trans",
             "transaction-id": 12884904641
          }
       }
       
    • Update
       {
          "data": {
             "trans_id": 6,
             "customer_id": "387378799012",
             "event": "list",
             "sku": "AI6161BEFX",
             "amount": 3,
             "device": "pc",
             "trans_datetime": "2023-01-16T06:18:32Z"
          },
          "metadata": {
             "timestamp": "2023-01-16T08:05:25.942777Z",
             "record-type": "data",
             "operation": "update",
             "partition-key-type": "primary-key",
             "schema-name": "testdb",
             "table-name": "retail_trans",
             "transaction-id": 12884973957
          }
       }
       
    • Delete
       {
          "data": {
             "trans_id": 6,
             "customer_id": "387378799012",
             "event": "list",
             "sku": "AI6161BEFX",
             "amount": 3,
             "device": "pc",
             "trans_datetime": "2023-01-16T06:18:32Z"
          },
          "metadata": {
             "timestamp": "2023-01-16T08:10:49.737891Z",
             "record-type": "data",
             "operation": "delete",
             "partition-key-type": "primary-key",
             "schema-name": "testdb",
             "table-name": "retail_trans",
             "transaction-id": 12884978099
          }
       }
       

Clean Up

  1. Stop the DMS Replication task by replacing the ARN in below command.
    (.venv) $ DMS_REPLICATION_CONFIG_ARN=$(aws cloudformation describe-stacks --stack-name DMSServerlessAuroraMysqlToKDSStack \
    | jq -r '.Stacks[0].Outputs | map(select(.OutputKey == "DMSReplicationConfigArn")) | .[0].OutputValue')
    (.venv) $ aws dms stop-replication \
                      --replication-config-arn ${DMS_REPLICATION_CONFIG_ARN}
    
  2. Delete the CloudFormation stack by running the below command.
    (.venv) $ cdk destroy --force --all
    

Useful commands

  • cdk ls list all stacks in the app
  • cdk synth emits the synthesized CloudFormation template
  • cdk deploy deploy this stack to your default AWS account/region
  • cdk diff compare deployed stack with current state
  • cdk docs open CDK documentation

Enjoy!

References

Related Works

Security

See CONTRIBUTING for more information.

License

This library is licensed under the MIT-0 License. See the LICENSE file.

About

Data Pipeline for CDC data from MySQL DB to Amazon Kinesis Data Streams through Amazon Kinesis using Amazon DMS Serverless

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published