Ingest CSV data to BigQuery using Cloud Data Fusion - Batch ingestion

i. Introduction

12fb66cc134b50ef.png

Terminal Updated: 2020-02-28

This codelab demonstrates a information ingestion pattern to ingest CSV formatted healthcare information into BigQuery in bulk. We volition use Cloud Data fusion Batch Data pipeline for this lab. Realistic healthcare exam data has been generated and made bachelor in the Google Cloud Storage bucket (gs://hcls_testing_data_fhir_10_patients/csv/) for you.

In this code lab you will acquire:

  • How to ingest CSV data (batch-scheduled loading) from GCS to BigQuery using Cloud Information Fusion.
  • How to visually build a data integration pipeline in Cloud Data Fusion for loading, transforming and masking healthcare data in bulk.

What do you need to run this codelab?

  • You need access to a GCP Project.
  • You must be assigned an Owner role for the GCP Project.
  • Healthcare information in CSV format, including the header.

If you don't accept a GCP Projection, follow these steps to create a new GCP Project.

Healthcare data in CSV format has been pre-loaded into GCS saucepan at gs://hcls_testing_data_fhir_10_patients/csv/. Each resource CSV file has its unique schema construction. For example, Patients.csv has a different schema than Providers.csv. Pre-loaded schema files can be constitute at gs://hcls_testing_data_fhir_10_patients/csv_schemas.

If yous need a new dataset, you lot can always generate information technology using SyntheaTM. And then, upload it to GCS instead of copying it from the bucket at Re-create input data step.

2. GCP Project Setup

Initialize shell variables for your environs.

To detect the PROJECT_ID, refer to Identifying projects.

<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Saucepan in your current Project  - INPUT -> consign BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> consign DATASET_ID=<DATASET_ID>                      
gsutil mb -50 united states of america gs://$BUCKET_NAME                      

Go admission to the synthetic dataset.

  1. From the e-mail address yous are using to login to Cloud Panel, send an electronic mail to hcls-solutions-external+subscribe@google.com requesting to bring together.
  2. You volition receive an email with instructions on how to ostend the action. 525a0fa752e0acae.png
  3. Apply the option to respond to the email to bring together the grouping. DO NOT click the button.
  4. Once you receive the confirmation electronic mail, you lot tin proceed to the side by side step in the codelab.

Copy input data.

gsutil -1000 cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME                      

Create a BigQuery Dataset.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID                      

3. Cloud Data Fusion Environment Setup

Follow these steps to enable the Cloud Information Fusion API and grant required permissions:

Enable APIs.

  1. Become to the GCP Console API Library.
  2. From the projects listing, select your project.
  3. In the API Library, select the API you want to enable. If you demand assist finding the API, use the search field and/or the filters.
  4. On the API folio, click ENABLE.

Create a Deject Information Fusion example.

  1. In GCP Console, select your ProjectID.
  2. Select Data Fusion from the left carte du jour, then click the CREATE AN Example button in the middle of the page (1st creation), or click the CREATE INSTANCE button at the top menu (additional creation).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Provide the example name. Select Enterprise.

5af91e46917260ff.png

  1. Click the CREATE button.

Setup example permissions.

After creating an example, use the following steps to grant the service business relationship associated with the instance permissions on your project:

  1. Navigate to the example particular folio by clicking the instance proper noun.

76ad691f795e1ab3.png

  1. Copy the service account.

6c91836afb72209d.png

  1. Navigate to the IAM Page of your project.
  2. On the IAM permissions page, we will now add together the service account equally a new member and grant it the Cloud Information Fusion API Service Agent role. Click the Add button, so paste the "service account" in the New members field and select Service Management -> Deject Data Fusion API Server Agent role.
  3. ea68b28d917a24b1.png
  4. Click Save.

Once these steps are washed, you can start using Cloud Data Fusion by clicking the View Case link on the Cloud Data Fusion instances page, or the details page of an instance.

Set upwardly the firewall rule.

  1. Navigate to GCP Console -> VPC Network -> Firewall rules to check if the default-allow-ssh rule exists or not.

102adef44bbe3a45.png

  1. If not, add a firewall rule that allows all ingress SSH traffic to the default network.

Using command line:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-let-ssh --direction=INGRESS --priority=grand --network=default --activeness=Let --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging                      

Using UI: Click Create Firewall Rule and fill out the information:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Build a Schema for transformation

Now that we have the Cloud Fusion surround in GCP let's build a schema. Nosotros need this schema for transformation of the CSV data.

  1. In the Cloud Information Fusion window, click the View Example link in the Activeness cavalcade. You volition be redirected to some other page. Click the provided url to open Cloud Data Fusion instance. Your pick to click "Start Tour" or "No, Thanks" button at the Welcome popup.
  2. Expand the "hamburger" menu, select Pipeline -> Studio

6561b13f30e36c3a.png

  1. Under the Transform department in the Plugin palette on the left, double-click on the Wrangler node, which will appear in the Data Pipelines UI.

aa44a4db5fe6623a.png

  1. Point to the Wrangler node and click Properties. Click the Wrangle button, then select a .csv source file (for case, patients.csv), which must accept all data fields to build the desired schema.
  2. Click the Down arrow (Column Transformations) next to each column name (for example, body). 802edca8a97da18.png
  3. By default, the initial import will assume there is only ane column in your information file. To parse it equally a CSV, cull ParseCSV, then select the delimiter and cheque the "Fix start row as header" box every bit appropriate. Click the Apply button.
  4. Click down arrow next to Body field, select Delete Cavalcade to remove Body field. Additionally, you can try out other transformations such as removing columns, changing data type for some columns (default is "cord" type), splitting columns, setting column names, etc.

e6d2cda51ff298e7.png

  1. The "Columns" and "Transformation steps" tabs show output schema and the Wrangler's recipe. Click Utilise at the upper right corner. Click the Validate button. The green "No errors institute" indicates success.

1add853c43f2abee.png

  1. In Wrangler Properties, click the Actions dropdown to Export the desired schema into your local storage for future Import if needed.
  2. Salvage the Wrangler Recipe for time to come usage.
parse-as-csv :body ',' true driblet trunk                      
  1. To close the Wrangler Properties window, click the X button.

5. Build nodes for the pipeline

In this department we volition build the pipeline components.

  1. In the Data Pipelines UI, in the upper left, you should see that Information Pipeline - Batch is selected every bit the pipeline type.

af67c42ce3d98529.png

  1. There are different sections on the left panel every bit Filter, Source, Transform, Analytics, Sink, Conditions and Actions, Error Handlers and Alerts where you tin can select a node or nodes for the pipeline.

c4438f7682f8b19b.png

Source node

  1. Select the Source node.
  2. Under the Source section in the Plugin palette on the left, double-click on the Google Cloud Storage node, which appears in the Information Pipelines UI.
  3. Point to the GCS source node and click Properties.

87e51a3e8dae8b3f.png

  1. Fill in the required fields. Set following fields:
  • Label = {whatever text}
  • Reference name = {whatever text}
  • Project ID = auto detect
  • Path = GCS URL to saucepan in your current project. For example, gs://$BUCKET_NAME/csv/
  • Format = text
  • Path Field = filename
  • Path Filename Only = true
  • Read Files Recursively = truthful
  1. Add field 'filename' to the GCS Output Schema by clicking the + push button.
  2. Click Documentation for detailed explanation. Click the Validate push. The green "No errors found" indicates success.
  3. To shut the GCS Properties, click the Ten push.

Transform node

  1. Select the Transform node.
  2. Nether the Transform section in the Plugin palette on the left, double-click the Wrangler node, which appears in the Data Pipelines UI. Connect GCS source node to Wrangler transform node.
  3. Point to the Wrangler node and click Properties.
  4. Click Actions drop down and select Import to import a saved schema (for example: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json), and paste the saved recipe from previous section.
  5. Or, reuse the Wrangler node from the section: Build a schema for transformation.
  6. Fill in the required fields. Gear up following fields:
  • Label = {any text}
  • Input field name = {*}
  • Precondition = {filename != "patients.csv"} to distinguish each input file (for example,. patients.csv, providers.csv, allergies.csv, etc.) from the Source node.

2426f8f0a6c4c670.png

  1. Add together a JavaScript node to execute the user-provided JavaScript that farther transforms the records. In this codelab, we utilize the JavaScript node to get a timestamp for each tape update. Connect Wrangler transform node to JavaScript transform node. Open JavaScript Properties, and add the following function:

75212f9ad98265a8.png

office transform(input, emitter, context) {   input.TIMESTAMP = (new Date()).getTime()*1000;   emitter.emit(input); }                      
  1. Add the field named TIMESTAMP to the Output Schema (if it doesn't exist) past clicking the + sign. Select the timestamp equally the data type.

4227389b57661135.png

  1. Click Documentation for a detailed explanation. Click the Validate push button to validate all input information. Green "No errors institute" indicates success.
  2. To shut the Transform Properties window, click the X button.

Data masking and de-identification

  1. Yous can select individual data columns by clicking the down pointer in the column and applying masking rules under the Mask information option as per your requirements (for example, SSN column).

bb1eb067dd6e0946.png

  1. You tin can add more than Directives in the Recipe window of the Wrangler node. For example, using the hash directive with the hashing algorithm following this syntax for de-identification purpose:
hash <column> <algorithm> <encode>  <cavalcade>: proper name of the column <algorithm>: Hashing algorithm (i.east. MD5, SHA-i, etc.) <encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to imitation.                      

cbcc9a0932f53197.png

Sink node

  1. Select the sink node.
  2. Nether the Sink section in the Plugin palette on the left, double click on BigQuery node, which will appear in the Data Pipeline UI.
  3. Point to the BigQuery sink node and click Backdrop.

1be711152c92c692.png

  1. Fill in required fields. Set following fields:
  • Label = {whatever text}
  • Reference name = {any text}
  • Project ID = motorcar detect
  • Dataset = BigQuery dataset used in current project (i.e. DATASET_ID)
  • Table = {tabular array name}
  1. Click Documentation for a detailed explanation. Click the Validate button to validate all input data. Greenish "No errors institute" indicates success.

c5585747da2ef341.png

  1. To close the BigQuery Backdrop, click the Ten push.

vi. Build Batch data pipeline

Connecting all nodes in a pipeline

  1. Drag a connexion arrow > on the right edge of the source node and drop on the left border of destination node.
  2. A pipeline can take multiple branches that get input files from the same GCS Source node.

67510ab46bd44d36.png

  1. Name the pipeline.

That's it. You've just created your first Batch information pipeline and can deploy and run the pipeline.

Send pipeline alerts via email (optional)

To utilize the Pipeline Alarm SendEmail feature, the configuration requires a mail service server to be setup for sending mail from a virtual automobile case. See the reference link below for more information:

Sending electronic mail from an instance | Compute Engine Documentation

In this codelab, nosotros ready upwards a mail relay service through Mailgun using the following steps:

  1. Follow the instructions at Sending email with Mailgun | Compute Engine Documentation to set up an account with Mailgun and configure the email relay service. Additional modifications are below.
  2. Add all recipients' email addresses to Mailgun'southward authorized list. This list can exist found in Mailgun>Sending>Overview pick on the left panel.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

Once the recipients click "I Concur" on the email sent from support@mailgun.net, their email addresses are saved in the authorized list to receive pipeline alert emails.

72847c97fd5fce0f.png

  1. Step 3 of "Earlier you begin" section - create a Firewall rule equally following:

75b063c165091912.png

  1. Stride three of "Configuring Mailgun as a mail relay with Postfix". Select Net Site or Internet with smarthost, instead of Local Only as mentioned in the instructions.

8fd8474a4ef18f16.png

  1. Pace 4 of "Configuring Mailgun every bit a mail relay with Postfix". Edit vi /etc/postfix/main.cf to add 10.128.0.0/9 at the end of mynetworks.

249fbf3edeff1ce8.png

  1. Edit six /etc/postfix/main.cf to change default smtp (25) to port 587.

86c82cf48c687e72.png

  1. At the upper-correct corner of Data Fusion studio, click Configure. Click Pipeline alert and click + button to open up the Alerts window. Select SendEmail.

dc079a91f1b0da68.png

  1. Fill out the Email configuration form. Select completion, success, or failure from Run Condition dropdown for each alert type. If Include Workflow Token = false, only the data from the Message field is sent. If Include Workflow Token = truthful, the data from the Message field and Workflow Token detailed data issent. You must use lowercase for Protocol. Use any "fake" email other than your company email address for Sender.

1fa619b6ce28f5e5.png

seven. Configure, Deploy, Run/Schedule Pipeline

db612e62a1c7ab7e.png

  1. In the upper-right corner of Data Fusion studio, click Configure. Select Spark for Engine Config. Click Save in Configure window.

8ecf7c243c125882.png

  1. Click Preview to Preview data**,** and click **Preview** again to toggle back to the previous window. You lot can also **Run** the pipeline in Preview fashion.

b3c891e5e1aa20ae.png

  1. Click Logs to view logs.
  2. Click Save to salve all changes.
  3. Click Import to import saved pipeline configuration when edifice new pipeline.
  4. Click Export to export a pipeline configuration.
  5. Click Deploy to deploy the pipeline.
  6. Once deployed, click Run and wait for the pipeline to run to completion.

bb06001d46a293db.png

  1. Yous tin Duplicate the pipeline by selecting Duplicate nether the Actions push.
  2. You can Export Pipeline Configuration by selecting Export nether the Actions button.
  3. Click Inbound triggers or Outbound triggers on the left or right edge of the Studio window to set pipeline triggers if desired.
  4. Click Schedule to schedule the pipeline to run and load information periodically.

4167fa67550a49d5.png

  1. Summary shows charts of Run history, records, error logs and warnings.

viii. Validation

  1. The Validate pipeline was executed successfully.

7dee6e662c323f14.png

  1. Validate if BigQuery Dataset has all tables.
bq ls $PROJECT_ID:$DATASET_ID                      
                                                  tableId       Type    Labels   Time Partitioning ----------------- ------- -------- -------------------  Allergies         TABLE  Careplans         TABLE  Conditions        Tabular array  Encounters        TABLE  Imaging_Studies   Table  Immunizations     Tabular array  Medications       TABLE  Observations      TABLE  Organizations     TABLE  Patients          TABLE  Procedures        TABLE  Providers         TABLE                                              
  1. Receive alert emails (if configured).

Viewing the results

To view the results after the pipeline runs:

  1. Query the table in the BigQuery UI. GO TO THE BIGQUERY UI
  2. Update the query below to your own project proper noun, dataset, and tabular array.

e32bfd5d965a117f.png

nine. Cleaning up

To avoid incurring charges to your Google Deject Platform account for the resources used in this tutorial:

After yous've finished the tutorial, yous can clean up the resources that you created on GCP so they won't take upward your quota, and yous won't be billed for them in the future. The following sections depict how to delete or plough off these resource.

Deleting the BigQuery dataset

Follow these instructions to delete the BigQuery dataset you created as part of this tutorial.

Deleting the GCS Bucket

Follow these instructions to delete the GCS saucepan you lot created every bit function of this tutorial.

Deleting the Cloud Data Fusion example

Follow these instructions to delete your Cloud Data Fusion instance.

Deleting the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page. GO TO THE PROJECTS PAGE
  2. In the project list, select the project you lot want to delete and click Delete.
  3. In the dialog, blazon the project ID, and then click Close down to delete the projection.

10. Congratulations

Congratulations, you've successfully completed the code lab to ingest healthcare information in BigQuery using Cloud Information Fusion.

You imported CSV data from Google Deject Storage into BigQuery.

You visually built the information integration pipeline for loading, transforming and masking healthcare information in bulk.

You now know the key steps required to beginning your Healthcare Data Analytics journeying with BigQuery on Google Cloud Platform.