How to migrate your on-premise pyspark jobs into GCP, using Dataproc Workflow Templates with Production-Grade Best Practices Standards
What you will learn/find on this article ?
- Some context and history of how and why we are shifting to the cloud paradigm
- A complete pattern example of how to migrate (or create from scratch) your pyspark jobs to GCP with DataProc workflow templates (you can use the same logic for spark and Hadoop migration, also some further references will be given)
- A github repo that you can copy and adapt for your purposes of migration
Whom this article might be useful for ?
- Anyone who wants to migrate his on-premise spark/hadoop infrastructure to GCP, or just want to implement his spark/hadoop workflows on GCP width Dataproc Workflow templates
- Anyone curious
Introduction : a bit of history & context
From vertical to horizontal scaling
Ali Godsi founder of Databricks and Apache Spark explains in this episode’s podcast of invest like the best, how the Big Data paradigm shift has moved us our compute race from vertical scaling (you scale by improving the code performance’s execution and the hardware machine) to horizontal scaling (you scale by distributing your computation through multiple nodes).
The sparkle
One big initiator for this change was the Google’s paper introducing map reduce computational model, Hadoop is a first generation implementation of this paradigm, it came with its challenges and limitations, Spark’s RDD proposed model tackled some of these limitations. As an overview, Spark handles better data in memory, support streaming and batch processing (where Hadoop supports only batch processing) and is up to 100x faster than Hadoop
Yes for big data, No for infrastructure cost maintenance
This Big Data shift paradigm has pushed many companies to implement their hadoop/spark cluster on premise, they did, but soon found themselves with a non-desired additional responsibility and cost of maintaining/monitoring and managing continuously these infrastructures, where sometimes their needs were punctual.
Then came another paradigm, the serverless cloud paradigm where you have at your disposal complete managed ephemeral robust monitored infrastructures, you pay only for what you consume, no extra cost of hiring the IT team to maintain the infrastructure….
Dataproc Workflow Templates
Introducing & defining Dataproc Workflow Templates :
Dataproc is a fully managed and highly scalable service that allows you within other things to set up a cluster and run your pySpark/Spark/hadoop jobs on Google cloud platform
Dataproc Workflow Templates (DWT) is a layer interface with Dataproc, that takes the responsibility of setting up a new Dataproc cluster (or selecting an existing one if desired) executes the jobs it has been configured to execute, then delete the cluster after execution
PS : the jobs are represented on a DAG by DWT
The benefits of using DWT
- It’s abstract for you having to set up a DataProc, then delete the resource after execution
- It can be created by a yaml file, so it’s flexible to modify, add/delete a job or modify the cluster configuration (environement/ hardware)
- It’s easy to schedule
See google’s use case recommendation
PS : You can schedule your DWT on GCP by :
- Using a cloud function/cloud run, see ref
- Using a cloud scheduler, see ref
- Using Cloud composer, see ref or this video
DWT Tutorial step by step
In this section we are going to explore a directory pattern structure, where we will see :
- How to organize your pyspark jobs into selectable modules by a main.py entrypoint
- How to create a DWT and add a cluster to it
- How to package and submit a job to a DWT
- How we can create a DWT from a yaml file
- How to handle 3rd party dependencies or shared modules between jobs
Pre-requisite :
As a pre-requisite you should have :
- Gcloud installed an authorized
- Gsutil installed
- Create a storage bucket in your project GCP
PS : GitHub repoiotry of the example will be linked at the end
1 ) How to organize your pyspark jobs into selectable modules by a main.py entrypoint
Folder structure :
Let’s consider the following project structure :
gcp-dataproc-workflow-template-pyspark-migration-example/
┣ src/
┃ ┣ jobs/
┃ ┃ ┣ load_loans/
┃ ┃ ┃ ┣ __init__.py
┃ ┃ ┃ ┗ main.py
┃ ┃ ┗ __init__.py
┃ ┗ main.py
┣ workflow_templates/
┃ ┗ temp.yaml
┣ .env
┣ .gitignore
┣ LICENSE
┣ Makefile
┗ README.md
How jobs are organized :
The src folder contains 2 important elements :
- A main.py entry point CLI that takes 2 arguments, job=[which job to run] and job-args=[the arguments that the job needs]
- A jobs module folder that contains the jobs submodules (for example in the tree structure the submodule load_loans)
Each submodule expose a function analyze :
def analyze(spark, **kwargs):pass
Then the main.py loads the module and pass the arguments to the job_module.analyze function :
So let’s see how the function analyze of load_loans looks like :
The content of this job is a treatment taken from Gary A. STAFFORD’s tutorial, the analyze function takes an argument gcs_input_path that contains the data path, and gcs_output_path the path where to output the result of the treatment.
In what follows we will go step by step using Makefile of our github repo to create, add a DWT, submit a job and generate a DWT’s yaml.
PS : you should create a .env file at the root directory, take as an example the .env_example file at the root directory
2) How to create a DWT and add a cluster to it
How we create a DWT :
So after having created and filled your .env, having your gsutil and gcloud setup you can use the makefile to execute this command :
make create_cluster
Which execute the following target :
create_template: gcloud dataproc workflow-templates create \ ${TEMPLATE_ID} — region ${REGION}
This command creates a template in GCP, ${TEMPLATE_ID} is the name and an identifier for your DWT, you should be able to see your DWT in the console in section WORKFLOW TEMPLATES
If you double-click on it, you will see that it does not contain any configuration yet :
How add a Dataproc cluster to a DWT :
Now that we have created our DWT we should add a cluster to it by executing the command :
make add_cluster
Which execute the followinWhich execute the following target :g target :
add_cluster: gcloud dataproc workflow-templates set-managed-cluster \ ${TEMPLATE_ID} \ --region ${REGION} \ --zone ${ZONE} \ --cluster-name three-node-cluster \ --master-machine-type n1-standard-2 \ --master-boot-disk-size 500 \ --worker-machine-type n1-standard-2 \ --worker-boot-disk-size 500 \ --num-workers 2 \ --image-version 1.3-deb9
Here we are creating a configuration for a dataproc cluster. As indicated in the documentation the command gcloud dataproc workflow-templates inherit its flags from gcloud dataproc cluster create.
Now if you double-click on the DWT you will see that the cluster configuration has been taken into account :
Here we have a cluster with 3 nodes of machine type n1-standard-2 with 500GB disk size, and if you want to see more how you can customize your cluster, check the ref and let’s see next how we submit our jobs
3) How to package and submit a job to a DWT
Generating distribution
So recalling from the first section how jobs are organized, for our example, all what we have to do is to reproduce in a distribution the main.py and the module jobs, so by running the command :
make build
Which execute the following target :
build: clean mkdir ./dist cp ./src/main.py ./dist cd ./src && zip -x main.py -r ../dist/jobs.zip .
The command creates a directory dist, inside dist, the main.py is copied, the jobs module archived (excluding the main.py) to jobs.zip, which should generate for you the following
dist/
┣ jobs.zip
┗ main.py
The archive will be passed in the argument — py-files, even if it’s not clearly mentioned in the spark ref or google ref it seems that the archive is extracted at the same level of the main.py, which reproduce the folder structure shown above
Make your distribution available on GCP
Now that we have built our distribution, we should make it available to our DWT, in order to do that we push it to an existing bucket GCS by executing the command
make copy_build
Which execute the following target :
copy_build: gsutil cp -r ${ROOTH_PATH}/dist ${BUCKET_NAME}
This command copy your dist generated into your bucket, you should see your dist folder in your bucket :
Job submission
So now that our main.py and jobs.zip are available on GCP we can submit our job by running the command :
make submit_job
Which execute the following target :
submit_job: gcloud dataproc workflow-templates add-job pyspark ${BUCKET_NAME}/dist/main.py \ --step-id ${STEP_ID} \ --py-files=${BUCKET_NAME}/dist/jobs.zip \ --workflow-template ${TEMPLATE_ID} \ --region ${REGION} \ -- --job=${JOB_NAME} \ --job-args="gcs_input_path=${BUCKET_NAME}/data/ibrd-statement-of- loans-historical-data.csv" \ --job-args="gcs_output_path=${BUCKET_NAME}/data/ibrd-summary-large-python"
In the UI you should be able to see the job added in section job detail
Copy data to GCP
Before running our job, we will make the data irbd dataset downloaded from Kaggle available on GCP, so for that purpose we run the command
make copy_irbd_dataset_2gcp
Which will execute the following target :
copy_irbd_dataset_2gcp: cd ${ROOTH_PATH}/data && unzip irbd_data.zip;\ gsutil cp ${ROOTH_PATH}/data/ibrd-statement-of-loans-historical- data.csv ${BUCKET_NAME}/data/ibrd-statement-of-loans-historical- data.csv
We unzip our dataset, then push it to GCP into our bucket
Running our Job
Now that we have submitted our job to our DWT, made the data available to GCP, we will be able to instanciate our workflow and run the jobs it contains, either from the UI, command line, cloud run/function, or cloud composer.
In the UI click on RUN:
Then in WORKFLOWS tab you should see your job running :
Once finished it will generate into your ${BUCKET}/data/ibrd-summary-large-python the job’s output
So here we have submitted one job and that have ran succefully, we could have submitted more, and launch the run.
Unless you specify an order, the jobs run concurently as mentioned in the documentation.
Also keep in mind if one job fails, all the workflow does, and the jobs are stopped.
So let’s see in the next section how we can use yaml files to define our DWT
4) How we can create a DWT from a yaml file
So our DWT is already defined, it’s better to export it as yaml to see how it looks like, for that we run the command :
make export_yaml
Which execute the following target :
export_yaml: gcloud dataproc workflow-templates export ${TEMPLATE_ID} \ --destination workflow_templates/temp.yaml \ --region ${REGION}
This command export our DWT into a file workflow_templates/temp.yaml
Once exported our workflow_templates/temp.yaml will look like that :
Here you can see, we are definining our pyspark jobs (here just one), you can add another ones, following the same pattern, just change the parameters for your new jobs.
Once you have done your modification, you can update your DWT by importing it as specified in the documentation
You can also directly instanciate your workflow and launch your job from your yaml as referenced
5) How to handle 3rd party dependencies or shared modules between jobs
Up to know what we have seen, is a pattern structure to package jobs as modules and submit them in DWT. But any serious job will probably require internal (shared modules) and 3rd party dependencies
Shared modules :
The jobs might be using the same functions/helpers, it’s make no sens to repeat the functions/helpers definition in each job’s module, so the path to go is to define a shared module that will be zipped in jobs.zip, and be put at the same level as jobs folder, considering the following src folder structure :
src/
┣ jobs/
┃ ┣ load_loans/
┃ ┃ ┣ __init__.py
┃ ┃ ┗ main.py
┃ ┗ __init__.py
┣ utils/
┃ ┣ __init__.py
┃ ┗ hello.py
┗ main.py
The command you ran earlier :
make build
will generates a dist that contains the utils module inside jobs.zip, to be sure of that run :
unzip dist/jobs.zip -d ./dist
Run an ls and see the result :
jobs jobs.zip main.py utils
as your job module load_loans is able in his __init__.py to make the import
from jobs.load_loans.main import analyze
your job will be able to make the import as easily as that
from utils.hello import say_hello
So nearly the same logic applies on external 3rd party dependencies, let’s see how it works
3rd party dependencies :
Let’s consider we have dependency on a library, click for example and our requierments.txt looks like that
click==7.1.2
After having tested locally our dependency, we do can install it into a directory by running the command :
pip install -r requirements.txt -t src/libs
This will make click available in directory libs
src/
┣ jobs/
┃ ┣ load_loans/
┃ ┗ __init__.py
┣ libs/
┃ ┣ click/
┃ ┗ click-7.1.2.dist-info/
┣ utils/
┃ ┣ __init__.py
┃ ┗ hello.py
┗ main.py
But if we follow the exact same logic for utils and jobs, we will be obliged to make the import like that
from libs import click
It’s not desirable to make an import containing libs, so the clever idea proposed by Eran Kampf in his excellent medium article, is to generate a zip folder, which its parent folder won’t be the src but the libs, let’s see the result :
At the root directory, run the command :
cd ./src/libs && zip -r ../../dist/libs.zip .
you will see your dist now looks like this :
dist/
┣ jobs.zip
┣ libs.zip
┗ main.py
when you unzip jobs.zip and libs.zip and run an ls :
click click-7.1.2.dist-info jobs jobs.zip libs.zip main.py utils
The click depency and other dependencies, their import behaviour will be like utils module or jobs module import behaviour
So let’s run another build command makefile but, this time it will takes into account the 3rd party dependencies.
Run the command :
make build_with_deps
Which will execute the following target :
build_with_deps: clean cp ./src/main.py ./dist cp ./src/main.py ./dist pip install -r requirements.txt -t src/libs cd ./src && zip -x main.py -x \*libs\* -r ../dist/jobs.zip . cd ./src/libs && zip -r ../../dist/libs.zip .
The target will install requirements into libs, and wen libs.zip, jobs.zip will be unziped, they depedencies will be just modules present on the directory
Just don’t forget to add in your jobs the new archive, for example in your DWT yaml in jobs key, you should add gs://dfwt-example/dist/libs.zip in pythonFileUris :
jobs:- pysparkJob:args:- — job=load_loans- — job-args=gcs_input_path=gs://dfwt-example/data/ibrd-statement-of-loans-historical-data.csv- — job-args=gcs_output_path=gs://dfwt-example/data/ibrd-summary-large-pythonmainPythonFileUri: gs://dfwt-example/dist/main.pypythonFileUris:- gs://dfwt-example/dist/jobs.zip- gs://dfwt-example/dist/libs.zipstepId: ibrd-large-pyspark
NB : There is another way proposed by google to handle dependencies by configuring the cluster environement but i clearly prefer the method, since it’s not GCP product related.
Conclusion
We have seen how we can package our pyspark jobs into modules with their dependencies and have an interface CLI to select them.
This pattern allows us to scale multiple jobs and having without overhead the same entrypoint, all what changes is the entrypoint’s arguments
We have seen also that the DWT, once configured takes the responsability to instanciate a dataproc and run the jobs, you can find the logs of the jobs in stackdriver (or in Cloud Composer if you run it on Cloud Composer)
If you want to see how you can adapt this pattern for spark/hadoop workflows, please see again Gary A. STAFFORD’s tutorial
If you wan to check how you can test your pyspark jobs, give a look again to Eran Kampf’s article
I wish that this tutorial was helpful for you in your journey ! and thanks for reading, here is again the github repo containing all the sources for the examples