fbpx

By Manjit Bhattacharyya, Solutions Architecture

With information often spread across several different databases and applications, engineers and developers need to synchronize data reliably and with minimal latency and downtime. 

At the 2021 Google Cloud Data Summit, Google Cloud introduced Datastream, a serverless change data capture (CDC) and replication service that is available in preview. This new tool will help teams seamlessly replicate data streams in real-time from Oracle and MySQL databases to Google Cloud services such as BigQuery, Cloud SQL, Cloud Spanner, or leverage the event stream directly from cloud storage. 

Main benefits include:

  • Easy-to-use setup and monitoring experiences
  • Synchronizes data streams across different databases and applications
  • Services scale up and down automatically as needed with minimal downtime
  • Private connectivity options for enhanced security on top of Google Cloud security
  • Transparent status reporting and processing around data and schema changes
  • Integrates with Dataflow, Cloud Data Fusion, Pub/Sub, BigQuery, and other Google Cloud data services
  • Powers analytics, database replication, and synchronization for migrations and hybrid-cloud configurations

But how does Datastream work exactly?

Let’s take a look by loading data from Oracle Database hosted on AWS RDS into BigQuery as an example.

Use Case: Loading Data from Oracle Hosted on AWS RDS into BigQuery

Say you have an application running on Oracle Database (DB) hosted on AWS RDS (or on-premises or any other cloud platform, including Google Cloud). You have an analytical workload running on BigQuery that requires a data sync with the Oracle application. That means any data changes on the Oracle DB application end should also be reflected in BigQuery.

In this walkthrough, we’ll see how Datastream can be helpful to solve this use case. We’ll set up a real-time CDC workflow that will use Oracle DB hosted on AWS RDS to load data into BigQuery. 

Set up an Oracle DB on AWS RDS

First, let’s set up an Oracle DB on AWS RDS, the point from where you’d capture changes for replication, or CDC. Before getting started, ensure the endpoint for the Oracle DB RDS instance is accessible from Google Cloud. Note: I’m using SQL Developer to connect to Oracle DB in this example – Oracle DB installation isn’t covered here.

Enable ARCHIVELOG

Oracle DB on AWS RDS enables ARCHIVELOG by default, but you can double-check it’s switched on with the following command:

SELECT LOG_MODE FROM V$DATABASE;

If it says ARCHIVELOG, move onto the next step. But, if it says NOARCHIVELOG, follow the commands for enabling ARCHIVELOG mode for non-RDS deployments and RDS deployments below.

RDS Deployments

exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours', 96);

Non-RDS Deployments

SHUTDOWN IMMEDIATE;

 

STARTUP MOUNT;

 

ALTER DATABASE;

 

ALTER DATABASE ARCHIVELOG;

 

ALTER DATABASE OPEN;

Then define a data retention policy for your database by running:

CONFIGURE RETENTION POLICY TO RECOVERY WINDOW OF 4 DAYS;

Enable Supplemental Logging

If you plan to replicate a subset of tables after ARCHIVELOG is enabled, run the following command for each table you want to replicate:

 

ALTER TABLE [SCHEMA].[TABLE] ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

 

Alternatively, you can enable supplemental log data for the entire database. For a self-hosted deployment, run the following command to enable supplemental log data for the entire database:

 

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

 

For an Oracle deployment on Amazon RDS, run the following command when it displays the SQL prompt:

 

exec rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD','ALL');

 

Grant Privileges

Grant the privileges to the user account that will be used to connect to your database by executing the following commands:


GRANT EXECUTE_CATALOG_ROLE TO [user];
GRANT CONNECT TO [user];
GRANT CREATE SESSION TO [user];
GRANT SELECT ON SYS.V_$DATABASE TO [user];
GRANT SELECT ON SYS.V_$ARCHIVED_LOG TO [user];
GRANT SELECT ON SYS.V_$LOGMNR_CONTENTS TO [user];
GRANT SELECT ON SYS.V_$LOGMNR_LOGS TO [user];
GRANT EXECUTE ON DBMS_LOGMNR TO [user];
GRANT EXECUTE ON DBMS_LOGMNR_D TO [user];
GRANT SELECT ANY TRANSACTION TO [user];
GRANT SELECT ANY TABLE TO [user];

For an RDS-hosted deployment, run the following commands:


GRANT EXECUTE_CATALOG_ROLE TO [user];
GRANT CONNECT TO [user];
GRANT CREATE SESSION TO [user];
exec rdsadmin.rdsadmin_util.grant_sys_object('V_$DATABASE','[USER]','SELECT');
exec rdsadmin.rdsadmin_util.grant_sys_object('V_$ARCHIVED_LOG','[USER]','SELECT');
exec rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_LOGS','[USER]','SELECT');
exec rdsadmin.rdsadmin_util.grant_sys_object('V_$LOGMNR_CONTENTS','[USER]','SELECT');
exec rdsadmin.rdsadmin_util.grant_sys_object('DBMS_LOGMNR','[USER]','EXECUTE');
exec rdsadmin.rdsadmin_util.grant_sys_object('DBMS_LOGMNR_D','[USER]','EXECUTE');
GRANT SELECT ANY TRANSACTION TO [user];
GRANT SELECT ANY TABLE TO [user];

If your source database is Oracle 12c or newer, you’ll need to grant an additional privilege with the following command:

GRANT LOGMINING TO [user];

This will complete the Oracle DB setup. 

Test the Source

Now, using the schema below, we’ll create a sample table labeled cdc_test to perform a CDC test:

Example CDC test

Create a Google Cloud Storage Target and Grant Permissions

Next, create a Google Cloud Storage target and grant the following IAM permissions to your Datastream service account at “service-[project_number]@gcp-sa-datastream.iam.gserviceaccount.com” to write to the destination bucket. 

  • roles/storage.objectViewer 
  • roles/storage.objectCreator
  • roles/storage.legacyBucketReader

Configure the Datastream

Now, you’re ready to start building the Datastream configuration in Google Cloud Console. Go to Datastream under the Big Data section in the navigation menu or search for Datastream directly in the search resources and products section. Once you’re on the Datastream page, complete the following:

  • Configure the stream name and stream ID. The stream’s region should be the same as the one for the connection profile. Note: Datastream currently only supports the following regions: us-central1, europe-west1, and asia-east1.
  • Cross-check the Oracle DB prerequisites listed under “Oracle Source- Open.” (For this use case, we’ve already completed this step in the “AWS- RDS Oracle DB:-” section above.) 
Create the datastream

Get started and define the stream details.

Next, we’ll configure the source and destination, both of which will need a connection profile. 

Add the following details to configure the source:

  • Connection profile name 
  • Connection profile ID
  • Hostname or IP (provide AWS RDS instance endpoint here)
  • Username and password for the Datastream (use Oracle DB username and password)

For the connectivity, you can use various connectivity methods such as:

  • Private connectivity (VPC Peering)
  • IP whitelisting
  • Proxy tunneling

Choose the connectivity method of your choice, and then test it. 

For this use case, we’re using IP whitelisting. We need to make sure inbound traffic for the Oracle AWS-RDS instance is open for the following reserved IP used in the Datastream: 34.67.6.157, 34.67.234.134, 34.72.28.29, 34.72.239.218, 34.71.242.81. 

Once your connectivity is established, you can test the connection to ensure it doesn’t have any blockers.

Define connectivity method

Run Test to test connectivity to the Oracle source.

Next, we’ll choose which tables and schemas to include in the Datastream. We’re selecting the CDC_TEST table from the CDC schema we created earlier.

Configure the source

Configure the source and decide which tables and schemas to include.

To sync historical data with the target BigQuery bucket, enable Backfill Historical Data under Modify Historical Data Backfill. Once enabled, we would then configure the connection profile for the destination, wherein we’d only need to enter the bucket name for storing our data.

Validate and Create the Datastream

The final step of creating a Datastream is choosing a file format for storing the output file. Datastream supports both JSON and Avro file formats. For this use case, we’ll use Avro.

Configure the destination

Define the location and file size strategy.

Now, we can validate the Datastream and see if our workflow was successful. 

Review & create the datastream

Validate the datastream and see if your workflow was successful.

We’ll create a dataflow streaming process using one of Google Cloud’s Dataflow templates. Google offers several templates within Dataflow to interact with Datastream. These templates can be used to load the data into Bigquery, PostgreSQL, and Cloud Spanner, as shown in the image below. For this use case, we’ll use the Cloud Datastream to BigQuery template. 

Use a Dataflow template

Use one of Google’s Dataflow templates to create the streaming process.

Provide the following parameters to create the Dataflow process:

  • File location for the Datastream output in Google Cloud Storage
  • Data set name that will contain staging tables
  • Data set name that will contain the replica tables
Create the Dataflow process

Enter the following parameters to create the Dataflow process.

Let the Dataflow process run. Now, we can start the overall process. 

First, we’ll enter two records into the source Oracle DB table. 

Test scenario #1

Entering the records into the Oracle DB source table.

Datastream will quickly identify these new records and ingest them into the Google Cloud Storage bucket as an AVRO file. Once the file is in the bucket, the streaming Dataflow process will immediately process and load them into BigQuery. 

Since we’re using the CDC_TEST table, a new table labeled CDC_TEST_log will generate automatically in the target BigQuery dataset with the following information. The CDC_TEST_log table would contain the stream event per row that captures the change type like INSERT, UPDATE, or DELETE. 

Preview the Dataflow

Let’s test another scenario where we insert a new record and update an existing one. Here, we’re inserting a new record for ITEM_ID = 103 and updating ITEM_PRICE for ITEM_ID = 102. Once complete, the source table information will look like this:

Test scenario #2

Insert a new record and update an exsiting one.

Datastream will quickly identify these changes and export the new/updated information into the Google Cloud Storage bucket. The Dataflow process will immediately replicate this information in the CDC_TEST_log table in BigQuery.

Preview the Dataflow

The Dataflow process will pick up on these new changes.

Now that operational data is being replicated into BigQuery, we can run queries directly on BigQuery and connect to various BI tools to visualize and analyze data.

I hope you found this overview helpful. Let me know your thoughts in the comments below.