Monday, December 11, 2023

GCP Data Processing

 GCP data Processing


BigQuery
 Analytical Workload - Storage+ Processing
DataFlow - Apache Beam
DataProc - Spark, Hadoop
Data Fusion
Cloud Composer

Data Prep
Cloud PubSub



BigQuery - Warehousing solution, for analytical workload - which can be use for storage + processing purpose, with SQL query
DataFlow - Apache Beam version inside GCP, with one unified programing lang.  you can gain write your complete data pipeline job and execute inside the dataflow.

DataProc: - Apache Spark, Hadoop solution inside the goole, whenever you want left and shift, your spark and hadoop job, on premises to GCP, DataProc is your choose.

Data Fusion - write your complete pipeline, without single line of code, you simplet drag and drop, you can create your data pipeline, with GCP fusion

Cloud Composer - Airflow, to Authorize, monitor, and schedule your workflow.

Data Prep - Data preparation, like data cleaning, data wrangling, data transformation, with some intelligent suggestion automatically provide - part of ML/AI - ETL pipeline come under this part.

Cloud Pubsub - Asynchronous communication to decople to different system,

Cloud Bigquery


 

Data warehouse solution in GCP
Like Relational database - SQL schema
Serverless
Built using BigTable + GCP Infrastructure
BigQuery is Columnar storage
This is for Analytical database

  •  not for Transactional purpose

Exabyte scale
Query using

  •  Standard SQL
  •  legacy SQL

Big Query can query from external data source.

  •  Cloud storage, SQL,Big Table

Bigquery can load data from various sources

  •  CSV,JSON,Avro, SQL and many more

Query is very expensive
$5 approx. for 1 TB of data scanned
Before query execution do dry run.
Alternative to OpenSource Apache Hive.
How to access BigQuery

  •  Cloud console
  •  bq - command line tool
  •  Client library - written in C#,GO,Java,Node.js,PHP,Python, and Ruby
hierarchy
Project - Datasets - Tables - Jobs

 in Bigquery console add project and star it, and use dataset under project

 Search project - search public project - bigquery-public-data - ml_datasets  - iris

in above - here 

Project-     

Dataset -  ml_datasets  

Table -  iris

How to create Query - 

SELECT * FROM `bigquery-public-data.ml_datasets.iris` LIMIT 10;


 

Whenever you to do a query - it will create a temp table, and that have some expiry also, for 24 hours, after that the temp table will remove.


 

before 24 hours, if you do same query execution it will not charge, if you do same query after 24 hours, after expiry of temp table, it will scan whole data and it will charge you for that

Any kind of warehouse solution, its advisable never ever use * ,
if you do, it incur too much of cost. 


Lets try another dataset
bigquery-public-data.covid19_open_data.covid19_open_data
Project -  bigquery-public-data
Dataset - covid19_open_data
Table - covid19_open_data

SELECT sepal_length,sepal_width FROM `bigquery-public-data.covid19_open_data.covid19_open_data` LIMIT 10;

create a data file bq.csv
cat bq.csv
name,age,data_join
ankit,32,1987-05-25
john,19,1990-12-06
manjeet,32,1991-07-19

create dataset

 

 Query on Bigquery

 

Pubsub

Synchronous
Application - Notification Service - DB

Asynchronous
Application - Notification Service - Pubsub - DB 

 

So here things are decoupled, application and notification in 1 part, pubsub in b/w and BD on another end.
so if even db side is down, you still will not loss data, it will remain in pubsub, and once it recover it can get it from pubsub

How PubSub works

Fully-managed asynchronous

  •  messaging-service

Scale to billions of message per day
publisher- App send message to Topic
Push & Pull way to access messages

  •  Pull-Subscriber pull message
  •  Pull-Message will be sent to subscriber via webhook

One topic - Multiple Subscriber
One subscriber - Multiple Topic

Google Pubsub
Fully-managed Pubsub system inside GCP
Serverless
Auto-scaling and auto-provisioning with support from zero to hundreds of GB/second
Topic- Storage reference
Publisher send message to topic at pubsub.googleapis.com
Push - Pull way to access message
Once subscriber receive message ack is sent.
Cloud Pubsub act as staging environment for many GCP services

Advantage Pubsub
Durability of data will increase
Highly Scalable
Decoupling b/w both system,(Publisher & Subscriber)

  •  Application don't synchronously communicate with Notification service
  •  Application(Publisher) is not dependent on Notification service (Subscriber)


Create Topic
After you create Topic
by default it create 1 subscription itself

You need to create Subscription

After creating subscription
Go to topic and publish msg

after publish msg go to subscription
and pull the msg

Pubsub via Python


To see the authorized user for cloud shell
gcloud auth list

install pubsub python module
pip3 install google-cloud-pubsub

$ python3
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()

topic_name = 'projects/atomic-matrix-401102/topics/my-first-topic'

data = '{"name" : "Ankit"}'
msg = bytes(data,'utf-8')
publisher.publish(topic_name,msg)

pubsub with schema (Schema enforcement)

create schema
{
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "firstname",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int"
    }
  ]
}


test msg against schema
{
"firstname":"ankit",
"age": 34
}
once msg is valid, create the schema

Now create a topic with the schema we created above

try to publish a msg
hello from the schema topic it will give you invalid format error.


 

now try with below test msg type that will work, it will publish the msg
{
"firstname":"manjeet",
"age": 34
}

 

Pubsub with gcloud


to see topics list
gcloud pubsub topics list

create new topic
gcloud pubsub topics create topic-from-gcloud

subscribe to a topic
gcloud pubsub subscriptions create sub-from-gcloud --topic=topic-from-gcloud

publish a msg to a topic
gcloud pubsub topics publish topic-from-gcloud --message="msg-from-gcloud"

fetch msg from a topic
gcloud pubsub subscriptions pull sub-from-gcloud --auto-ack 


Cloud Dataflow

Managed service for variety of data processing
An advanced unified programming model to implement batch and streaming data processing jobs that run on various execution engine/runner
Cloud version of Apache Beam = (Batch + Stream)
Serverless, Fully managed
Horizontal autoscaling of worker
Jobs created with
 Pre-define template
 Notebook instance
  write data pipeline job in java, python, SQL
 From cloud shell/Local machine

 

How Dataflow works
Write Job in Java, python GO
Unified API for both batch + stream processing

  •  No need to separately handle Batch & streaming data

Execution

  •  Direct Runner
           Scaling issue
  •  Apache Flink
  •  Apache Spark
  •  Cloud DataFlow    

Apache Beam


pipeline

  •  A pipeline is a graph of transformations that a user constructs that defines the data processing they want to do.

a series of data processing steps is nothing but a pipeline, so sequencelly 1 transformation will create another transformation and so on
 

IO-Transform

  •  https://beam.apache.org/documentation/io/built-in/

 


Pcollection

  •  Fundamental data type in Beam

Ptransform

  •  The operations executed within a pipeline
  •  https://beam.apache.org/documentation/programming-guide/#transforms

Runner - Execution engine

 

Hands on

  • Pre-defined Template
  • Notebook Instance
  • Execute Job From Shell with Direct Runner & DataFlow

Pre-defined Template
create bucket
manjeet-bigdata-255
create 2 folder
output
temp

Create job
word count
and after it complete go to bucket
check under output
what result is there ?

 

Another Job

Create anohter job - Text Files on Cloud Storage to Pub/Sub


 Notebook Instance

Install instance from Workbench option

 

 

Notebook Instance
Install instance from Workbench option


steps
https://www.cs.uic.edu/~sloan/CLASSES/java/KeyboardReader.java

download above url file and
upload to GCS
create function - will find only those line which start with import statement
return only those line

in notebook use below code


import apache_beam as beam

p = beam.Pipeline()

def my_grep(line, term):
   if line.startswitch(term):
      yield line

 searchTerm = 'import'

input1 = 'gs://manjeet-bigdata-255/KeyboardReader.java'
output1 = 'gs://manjeet-bigdata-255/upload/'

(p
| 'getinput' >> beam.io.ReadFromText(input1)
| 'findimport' >> beam.FlatMap(lambda line : my_grep(line, searchTerm))
| 'write' >> beam.io.writeToText(output1)
)    

p.run()



 

after running above code

Now go to bucket
and check output file,
you must see
import
import

there

 Submit custom Job

CODE

import apache_beam as beam

argv = [
      '--project=atomic-matrix-401102 ',
      '--job_name=dfjob-from-cli',
      '--save_main_session',
      '--staging_location=gs://manjeet-bigdata-255/temp/',
      '--temp_location=gs://manjeet-bigdata-255/temp/',
      '--region=us-central1',
      '--runner=DataflowRunner'
   ]

p = beam.Pipeline(argv=argv)

def my_grep(line, term):
   if line.startswitch(term):
      yield line

 searchTerm = 'import'

input1 = 'gs://manjeet-bigdata-255/KeyboardReader.java'
output1 = 'gs://manjeet-bigdata-255/upload/'

(p
| 'getinput' >> beam.io.ReadFromText(input1)
| 'findimport' >> beam.FlatMap(lambda line : my_grep(line, searchTerm))
| 'write' >> beam.io.writeToText(output1)
)    

p.run()

Save above code and upload in cloud shell

and also check account name in shell by below cmd, and give permission job creation permission

check account on cell
gcloud auth list

also check if you have apache_beam module
python3
import apache_beam

if apache_beam not available install apache_beam
pip3 install 'apache-beam[gcp]'

now run the python file
python3 dataflow-demo.py

check on Dataflow ui, it should show you the job dfjob-from-cli

 

Cloud DataProc

Managed Hadoop & Spark Service inside GCP
Lift/shift Existing Hadoop/Spark based job
Cluster type

  •  Standard(1 master, N workers)
  •  Single Node(1 master, 0 workers)
  •  High Availability(3 masters, N workers)

Worker node regular VM or Preemptible VM(Cost reduction)
Job Supported:

  •  Hadoop,SparkR,Spark,SparkSQL,Hive,Pig,PySpark

Demo

  •  Spark,PySpark,Notebook Instance

 

Hands On
Create Cloud DataProc Cluster
  •  Spark JOB
  •  Submit Pyspark Job


 

 Spark Job

we would like to calculate value of pi
The job will calculate the value of pi on multiple machine in distributed fashion
 

Spark Job

we would like to calculate value of pi
The job will calculate the value of pi on multiple machine in distributed fashion

main Class or jar
org.apache.spark.examples.SparkPi

Jar files
file:///usr/lib/spark/examples/jars/spark-examples.jar

Arguments
1000   - give 1000 iteration to calculate value of pi

Submit

 

How to submit pyspark job


write a code
cat pyspark-job.py
#!/usr/bin/python
import pyspark
sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)

upload to GCS bucket

In job give path
Main python file
gs://manjeet-bigdata-255/pyspark_job.py

 

 

 

How to submit pyspark job from VM, machine directly?
login to VM ssh

in shell type
pyspark        it will start spark session
here you need to use same above code
cat pyspark-job.py
#!/usr/bin/python
import pyspark
sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)
    
moment we enter pyspark shell automatically spark context created with the sc variable, so we do not need to execute sc = pyspark.SparkContext()
import pyspark
rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)

then see the console -

How to submit job in Notebook Instance.

Create Notebook instance - from Workbench


Open jupiter notebook
and check below code
import pyspark

sc

rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)



Cloud Fusion

Fully-managed, cloud native solution to quickly building data pipelines
Code free, Drag-n-drop tool
150+ preconfigured connectors & tranformations
Build with Open-source CDAP
3 Edition are available

  •  Developer
  •  Basic
  •  Enterprise

Pricing:

  •  https://cloud.google.com/data-fusion/pricing#cloud-data-fusion-pricing

Create fusion instance

get csv data
upload data in GCS bucket.

login to fusion instance
check in connection for GCS
Select bucket - select csv data file.
you will see the data on cloud fusion screen.


 

 the data in fusion instance - do transformation step with that.

create pipeline - select batch pipeline
 

selecte sink - bigquery - align

select  on bigquery properties fill * info a below

Dataset Project ID
atomic-matrix-401102

Dataset
fusion_ds

Table
fusion_out


Validate from top corner - and if no error click on cross icon and deploy


Run the pipeline it will take some time once it success need to check in bigquery - if sink data is there in table


Cloud Composer   

Fully Managed Apache Airflow which in GCP
Airflow is a workflow & orchestration engine
With Airflow, one can programmatically schedule and monitor workflows
Workflows are defined as directed acyclic graphs(DAGs)
DAGs are written in Python 3.x
Build-in integration for Other GCP services

  •  Google BigQuery,
  •  Cloud Dataflow & Dataproc,
  •  Cloud Datastore
  •  Cloud Storage,
  •  Cloud Pub/Sub, and Cloud ML Engine

Cloud composer Instance - its not a serverless
In GCP composer install on K8s and instance.

open composer - create instance

after creating composer
you can see in Kubernetes Cluster you will see composer cluster there.


another thing you will see storage bucket for DAG

Write first DAG in cloud Composer

so for another DAG, you need to upload in same bucket on same location.
and Airflow will automatically detact  it

dage code

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG('hello_world_dag', description='Hello world example', schedule_interval='*/1 * * * *', start_date=datetime(2021, 1, 1), catchup=False)

def print_hello():
    return "Hello World!"

dummy_task_1 = DummyOperator(
    task_id='dummy_task',
    retries=0,
    dag=dag
)

hello_task_2 = PythonOperator(
    task_id='hello_task',
    python_callable=print_hello,
    dag=dag
)

dummy_task_1 >> hello_task_2



You can incorporate different operator which is going to interact with diff. gcp services like bigquery, GCS, and create your own DAG for complete pipeline workflow


Data loss prevention API

Fully managed service designed to help you discover, classify, and protect your most sensitive data.
PII data

  •  Person's name, Credit Card Number, SSN

Apply API on Cloud Storage, Big Query Data
DLP work upon Free from Text, Structured & Unstructured data(image)
What to do with this Data

  •  Identify sensitive data
  •  De-identify data

             Masking and Encryption

  •  re-identify(in case want to recover original data)
 there 120 built-in identifycation type or more
 

 

De-identification of data


Redaction - remove sensitive data
Replacement - replace with some tokens(Like info_type)
Masking - Replace one/more character with some other char (Very most common type)
Encryption - Encrypt Sensitive Data

 

DLP API Demo

https://cloud.google.com/dlp/demo/#!/

 

 Templates, Infotypes & Match Likelihood


Templates
Configuration which define for

  •  Inspection of Jobs
  •  De-identification Jobs

Once Template defined, can be reused for other Jobs

 

 

INFOTYPES

what to scan for

  •  Like Credit Card
  •  SSN
  •  Age


Type of INFOTYPES

  •  Built-in

            US_SSN,EMAIL_ADD
           120+ built-in type defined

  •  Stored

           Custom Infotype
           Based on fixed words, regular expr. Custom Dict.

Hands on

Create INFO_TYPE

  •  Build-IN infotypes

  •  Stored INFOTYPE

Create inspect Template
go to configuration
create template
Define template
template type

Below is the Inspect template with using built-in info type EMAIL and SSN



Create Job for Inspection
Job ID
DLP-JOB-1

Storage type - GCS
Location - bucket path to the file we downloaded from video and uploaded in GCS

Configure detection
Template name          inspect template you created earlier
projects/atomic-matrix-401102/locations/global/inspectTemplates/EMAIL_SSN_INSPECT

infoTypes - remove all
by defalut it will select some, but you using template you can unselect all of them

Add action
check
Save to BigQuery
Project ID
Dataset ID
Table ID - if you dont give it will create automatically

Schedule

Create


Configure detection

Template name          - inspect template you created earlier
projects/atomic-matrix-401102/locations/global/inspectTemplates/EMAIL_SSN_INSPECT

infoTypes - remove all
by defalut it will select some, but you using template you can unselect all of them

Add action
check
Save to BigQuery
Project ID
Dataset ID
Table ID - if you dont give it will create automatically

Schedule

Create Template for De-identification


Template ID
MASK_SSN

Display name
mask first 3 char of SSN

Transformation method
Transformation
Mask with character

Masking character
X

Mask all characters
3

InfoTypes to transform
InfoTypes - MANAGE INFOTYPES

 

 

     Create


Data Catalog

Most organizations today are dealing with a large and growing number of data assets.
Data stakeholders (consumers, producers, and administrators) face a number of challenges:

  •  Searching for insightful data
  •  Understanding data
  •  Making data useful

Data Catalog

  •  A fully managed and highly scalable data discovery and metadata management service.
  •  Single place to discover all data, asset across all project

Using Data catalog

  •  Search data assets
  •  tag data

How Data Catalog works


So suppose you have data in Bigquery, Pubsub etc.
as you made changes in bigquery, pubsub, within matter of some sec or min
datacatalog find there is new table or update in bigquery or pubsub.

For bigger organization this tool very useful
for data asset managment


Metadata
Technical metadata

  •  For Bigquery, Pubsub these metadata resides inside individual products
  •  Technical meta data being registered by catalog automatically

Business Metadata

  •  Attach Tag to existing data asset
  •  Define some Tag template and attach metadata




No comments:

Post a Comment