GCP data Processing
BigQuery
Analytical Workload - Storage+ Processing
DataFlow - Apache Beam
DataProc - Spark, Hadoop
Data Fusion
Cloud Composer
Data Prep
Cloud PubSub
BigQuery - Warehousing solution, for analytical workload - which can be use for storage + processing purpose, with SQL query
DataFlow - Apache Beam version inside GCP, with one unified programing lang. you can gain write your complete data pipeline job and execute inside the dataflow.
DataProc: - Apache Spark, Hadoop solution inside the goole, whenever you want left and shift, your spark and hadoop job, on premises to GCP, DataProc is your choose.
Data Fusion - write your complete pipeline, without single line of code, you simplet drag and drop, you can create your data pipeline, with GCP fusion
Cloud Composer - Airflow, to Authorize, monitor, and schedule your workflow.
Data Prep - Data preparation, like data cleaning, data wrangling, data transformation, with some intelligent suggestion automatically provide - part of ML/AI - ETL pipeline come under this part.
Cloud Pubsub - Asynchronous communication to decople to different system,
Cloud Bigquery
Data warehouse solution in GCP
Like Relational database - SQL schema
Serverless
Built using BigTable + GCP Infrastructure
BigQuery is Columnar storage
This is for Analytical database
- not for Transactional purpose
Exabyte scale
Query using
- Standard SQL
- legacy SQL
Big Query can query from external data source.
- Cloud storage, SQL,Big Table
Bigquery can load data from various sources
- CSV,JSON,Avro, SQL and many more
Query is very expensive
$5 approx. for 1 TB of data scanned
Before query execution do dry run.
Alternative to OpenSource Apache Hive.
How to access BigQuery
- Cloud console
- bq - command line tool
- Client library - written in C#,GO,Java,Node.js,PHP,Python, and Ruby
Project - Datasets - Tables - Jobs
in Bigquery console add project and star it, and use dataset under project
Search project - search public project - bigquery-public-data - ml_datasets - iris
in above - here
Project-
Dataset - ml_datasets
Table - iris
How to create Query -
Whenever you to do a query - it will create a temp table, and that have some expiry also, for 24 hours, after that the temp table will remove.
before 24 hours, if you do same query execution it will not charge, if you do same query after 24 hours, after expiry of temp table, it will scan whole data and it will charge you for that
Any kind of warehouse solution, its advisable never ever use * ,
if you do, it incur too much of cost.
Lets try another dataset
bigquery-public-data.covid19_open_data.covid19_open_data
Project - bigquery-public-data
Dataset - covid19_open_data
Table - covid19_open_data
SELECT sepal_length,sepal_width FROM `bigquery-public-data.covid19_open_data.covid19_open_data` LIMIT 10;
create a data file bq.csv
cat bq.csv
name,age,data_join
ankit,32,1987-05-25
john,19,1990-12-06
manjeet,32,1991-07-19
create dataset
Query on Bigquery
Pubsub
Synchronous
Application - Notification Service - DB
Asynchronous
Application - Notification Service - Pubsub - DB
So here things are decoupled, application and notification in 1 part, pubsub in b/w and BD on another end.
so if even db side is down, you still will not loss data, it will remain in pubsub, and once it recover it can get it from pubsub
How PubSub works
Fully-managed asynchronous
- messaging-service
Scale to billions of message per day
publisher- App send message to Topic
Push & Pull way to access messages
- Pull-Subscriber pull message
- Pull-Message will be sent to subscriber via webhook
One topic - Multiple Subscriber
One subscriber - Multiple Topic
Google Pubsub
Fully-managed Pubsub system inside GCP
Serverless
Auto-scaling and auto-provisioning with support from zero to hundreds of GB/second
Topic- Storage reference
Publisher send message to topic at pubsub.googleapis.com
Push - Pull way to access message
Once subscriber receive message ack is sent.
Cloud Pubsub act as staging environment for many GCP services
Advantage Pubsub
Durability of data will increase
Highly Scalable
Decoupling b/w both system,(Publisher & Subscriber)
- Application don't synchronously communicate with Notification service
- Application(Publisher) is not dependent on Notification service (Subscriber)
Create Topic
After you create Topic
by default it create 1 subscription itself
You need to create Subscription
After creating subscription
Go to topic and publish msg
after publish msg go to subscription
and pull the msg
Pubsub via Python
To see the authorized user for cloud shell
gcloud auth list
install pubsub python module
pip3 install google-cloud-pubsub
$ python3
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_name = 'projects/atomic-matrix-401102/topics/my-first-topic'
data = '{"name" : "Ankit"}'
msg = bytes(data,'utf-8')
publisher.publish(topic_name,msg)
pubsub with schema (Schema enforcement)
create schema
{
"type": "record",
"name": "Avro",
"fields": [
{
"name": "firstname",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}
test msg against schema
{
"firstname":"ankit",
"age": 34
}
once msg is valid, create the schema
Now create a topic with the schema we created above
try to publish a msg hello from the schema topic it will give you invalid format error.
now try with below test msg type that will work, it will publish the msg
{
"firstname":"manjeet",
"age": 34
}
Pubsub with gcloud
to see topics list
gcloud pubsub topics list
create new topic
gcloud pubsub topics create topic-from-gcloud
subscribe to a topic
gcloud pubsub subscriptions create sub-from-gcloud --topic=topic-from-gcloud
publish a msg to a topic
gcloud pubsub topics publish topic-from-gcloud --message="msg-from-gcloud"
fetch msg from a topic
gcloud pubsub subscriptions pull sub-from-gcloud --auto-ack
Cloud Dataflow
Managed service for variety of data processing
An advanced unified programming model to implement batch and streaming data processing jobs that run on various execution engine/runner
Cloud version of Apache Beam = (Batch + Stream)
Serverless, Fully managed
Horizontal autoscaling of worker
Jobs created with
Pre-define template
Notebook instance
write data pipeline job in java, python, SQL
From cloud shell/Local machine
How Dataflow works
Write Job in Java, python GO
Unified API for both batch + stream processing
- No need to separately handle Batch & streaming data
Execution
- Direct Runner
- Apache Flink
- Apache Spark
- Cloud DataFlow
Apache Beam
pipeline
- A pipeline is a graph of transformations that a user constructs that defines the data processing they want to do.
a series of data processing steps is nothing but a pipeline, so sequencelly 1 transformation will create another transformation and so on
IO-Transform
- https://beam.apache.org/documentation/io/built-in/
Pcollection
- Fundamental data type in Beam
Ptransform
- The operations executed within a pipeline
- https://beam.apache.org/documentation/programming-guide/#transforms
Runner - Execution engine
Hands on
- Pre-defined Template
- Notebook Instance
- Execute Job From Shell with Direct Runner & DataFlow
Pre-defined Template
create bucket
manjeet-bigdata-255
create 2 folder
output
temp
Create job
word count
and after it complete go to bucket
check under output
what result is there ?
Another Job
Create anohter job - Text Files on Cloud Storage to Pub/Sub
Notebook Instance
Install instance from Workbench option
Notebook Instance
Install instance from Workbench option
steps
https://www.cs.uic.edu/~sloan/CLASSES/java/KeyboardReader.java
download above url file and
upload to GCS
create function - will find only those line which start with import statement
return only those line
in notebook use below code
import apache_beam as beam
p = beam.Pipeline()
def my_grep(line, term):
if line.startswitch(term):
yield line
searchTerm = 'import'
input1 = 'gs://manjeet-bigdata-255/KeyboardReader.java'
output1 = 'gs://manjeet-bigdata-255/upload/'
(p
| 'getinput' >> beam.io.ReadFromText(input1)
| 'findimport' >> beam.FlatMap(lambda line : my_grep(line, searchTerm))
| 'write' >> beam.io.writeToText(output1)
)
p.run()
after running above code
Now go to bucket
and check output file,
you must see
import
import
there
Submit custom Job
CODE
import apache_beam as beam
argv = [
'--project=atomic-matrix-401102 ',
'--job_name=dfjob-from-cli',
'--save_main_session',
'--staging_location=gs://manjeet-bigdata-255/temp/',
'--temp_location=gs://manjeet-bigdata-255/temp/',
'--region=us-central1',
'--runner=DataflowRunner'
]
p = beam.Pipeline(argv=argv)
def my_grep(line, term):
if line.startswitch(term):
yield line
searchTerm = 'import'
input1 = 'gs://manjeet-bigdata-255/KeyboardReader.java'
output1 = 'gs://manjeet-bigdata-255/upload/'
(p
| 'getinput' >> beam.io.ReadFromText(input1)
| 'findimport' >> beam.FlatMap(lambda line : my_grep(line, searchTerm))
| 'write' >> beam.io.writeToText(output1)
)
p.run()
Save above code and upload in cloud shell
and also check account name in shell by below cmd, and give permission job creation permission
check account on cell
gcloud auth list
also check if you have apache_beam module
python3
import apache_beam
if apache_beam not available install apache_beam
pip3 install 'apache-beam[gcp]'
now run the python file
python3 dataflow-demo.py
check on Dataflow ui, it should show you the job dfjob-from-cli
Cloud DataProc
Managed Hadoop & Spark Service inside GCP
Lift/shift Existing Hadoop/Spark based job
Cluster type
- Standard(1 master, N workers)
- Single Node(1 master, 0 workers)
- High Availability(3 masters, N workers)
Worker node regular VM or Preemptible VM(Cost reduction)
Job Supported:
- Hadoop,SparkR,Spark,SparkSQL,Hive,Pig,PySpark
Demo
- Spark,PySpark,Notebook Instance
Create Cloud DataProc Cluster
- Spark JOB
- Submit Pyspark Job
we would like to calculate value of pi
The job will calculate the value of pi on multiple machine in distributed fashion
Spark Job
we would like to calculate value of pi
The job will calculate the value of pi on multiple machine in distributed fashion
main Class or jar
org.apache.spark.examples.SparkPi
Jar files
file:///usr/lib/spark/examples/jars/spark-examples.jar
Arguments
1000 - give 1000 iteration to calculate value of pi
Submit
How to submit pyspark job
write a code
cat pyspark-job.py
#!/usr/bin/python
import pyspark
sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)
upload to GCS bucket
In job give path
Main python file
gs://manjeet-bigdata-255/pyspark_job.py
How to submit pyspark job from VM, machine directly?
login to VM ssh
in shell type
pyspark it will start spark session
here you need to use same above code
cat pyspark-job.py
#!/usr/bin/python
import pyspark
sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)
moment we enter pyspark shell automatically spark context created with the sc variable, so we do not need to execute sc = pyspark.SparkContext()
import pyspark
rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)
then see the console -
How to submit job in Notebook Instance.
Create Notebook instance - from Workbench
Open jupiter notebook
and check below code
import pyspark
sc
rdd = sc.parallelize(['Hello','world!'])
words = sorted(rdd.collect())
print(words)
Cloud Fusion
Fully-managed, cloud native solution to quickly building data pipelines
Code free, Drag-n-drop tool
150+ preconfigured connectors & tranformations
Build with Open-source CDAP
3 Edition are available
- Developer
- Basic
- Enterprise
Pricing:
- https://cloud.google.com/data-fusion/pricing#cloud-data-fusion-pricing
Create fusion instance
get csv data
upload data in GCS bucket.
login to fusion instance
check in connection for GCS
Select bucket - select csv data file.
you will see the data on cloud fusion screen.
the data in fusion instance - do transformation step with that.
create pipeline - select batch pipeline
selecte sink - bigquery - align
select on bigquery properties fill * info a below
Dataset Project ID
atomic-matrix-401102
Dataset
fusion_ds
Table
fusion_out
Validate from top corner - and if no error click on cross icon and deploy
Run the pipeline it will take some time once it success need to check in bigquery - if sink data is there in table
Cloud Composer
Fully Managed Apache Airflow which in GCP
Airflow is a workflow & orchestration engine
With Airflow, one can programmatically schedule and monitor workflows
Workflows are defined as directed acyclic graphs(DAGs)
DAGs are written in Python 3.x
Build-in integration for Other GCP services
- Google BigQuery,
- Cloud Dataflow & Dataproc,
- Cloud Datastore
- Cloud Storage,
- Cloud Pub/Sub, and Cloud ML Engine
Cloud composer Instance - its not a serverless
In GCP composer install on K8s and instance.
open composer - create instance
after creating composer
you can see in Kubernetes Cluster you will see composer cluster there.
another thing you will see storage bucket for DAG
Write first DAG in cloud Composer
so for another DAG, you need to upload in same bucket on same location.
and Airflow will automatically detact it
dage code
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
dag = DAG('hello_world_dag', description='Hello world example', schedule_interval='*/1 * * * *', start_date=datetime(2021, 1, 1), catchup=False)
def print_hello():
return "Hello World!"
dummy_task_1 = DummyOperator(
task_id='dummy_task',
retries=0,
dag=dag
)
hello_task_2 = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag
)
dummy_task_1 >> hello_task_2
You can incorporate different operator which is going to interact with diff. gcp services like bigquery, GCS, and create your own DAG for complete pipeline workflow
Data loss prevention API
Fully managed service designed to help you discover, classify, and protect your most sensitive data.
PII data
- Person's name, Credit Card Number, SSN
Apply API on Cloud Storage, Big Query Data
DLP work upon Free from Text, Structured & Unstructured data(image)
What to do with this Data
- Identify sensitive data
- De-identify data
Masking and Encryption
- re-identify(in case want to recover original data)
De-identification of data
Redaction - remove sensitive data
Replacement - replace with some tokens(Like info_type)
Masking - Replace one/more character with some other char (Very most common type)
Encryption - Encrypt Sensitive Data
DLP API Demo
https://cloud.google.com/dlp/demo/#!/
Templates, Infotypes & Match Likelihood
Templates
Configuration which define for
- Inspection of Jobs
- De-identification Jobs
Once Template defined, can be reused for other Jobs
INFOTYPES
what to scan for
- Like Credit Card
- SSN
- Age
Type of INFOTYPES
- Built-in
US_SSN,EMAIL_ADD
120+ built-in type defined
- Stored
Custom Infotype
Based on fixed words, regular expr. Custom Dict.
Hands on
Create INFO_TYPE
- Build-IN infotypes
- Stored INFOTYPE
Create inspect Template
go to configuration
create template
Define template
template type
Below is the Inspect template with using built-in info type EMAIL and SSN
Create Job for Inspection
Job ID
DLP-JOB-1
Storage type - GCS
Location - bucket path to the file we downloaded from video and uploaded in GCS
Configure detection
Template name inspect template you created earlier
projects/atomic-matrix-401102/locations/global/inspectTemplates/EMAIL_SSN_INSPECT
infoTypes - remove all
by defalut it will select some, but you using template you can unselect all of them
Add action
check
Save to BigQuery
Project ID
Dataset ID
Table ID - if you dont give it will create automatically
Schedule
Create
Configure detection
Template name - inspect template you created earlier
projects/atomic-matrix-401102/locations/global/inspectTemplates/EMAIL_SSN_INSPECT
infoTypes - remove all
by defalut it will select some, but you using template you can unselect all of them
Add action
check
Save to BigQuery
Project ID
Dataset ID
Table ID - if you dont give it will create automatically
Schedule
Create Template for De-identification
Template ID
MASK_SSN
Display name
mask first 3 char of SSN
Transformation method
Transformation
Mask with character
Masking character
X
Mask all characters
3
InfoTypes to transform
InfoTypes - MANAGE INFOTYPES
Create
Data Catalog
Most organizations today are dealing with a large and growing number of data assets.
Data stakeholders (consumers, producers, and administrators) face a number of challenges:
- Searching for insightful data
- Understanding data
- Making data useful
Data Catalog
- A fully managed and highly scalable data discovery and metadata management service.
- Single place to discover all data, asset across all project
Using Data catalog
- Search data assets
- tag data
How Data Catalog works
So suppose you have data in Bigquery, Pubsub etc.
as you made changes in bigquery, pubsub, within matter of some sec or min
datacatalog find there is new table or update in bigquery or pubsub.
For bigger organization this tool very useful
for data asset managment
Metadata
Technical metadata
- For Bigquery, Pubsub these metadata resides inside individual products
- Technical meta data being registered by catalog automatically
Business Metadata
- Attach Tag to existing data asset
- Define some Tag template and attach metadata
No comments:
Post a Comment