How to migrate your on-premise pyspark jobs into GCP, using Dataproc Workflow Templates with Production-Grade Best Practices Standards

What you will learn/find on this article ?

  • Some context and history of how and why we are shifting to the cloud paradigm

Whom this article might be useful for ?

  • Anyone who wants to migrate his on-premise spark/hadoop infrastructure to GCP, or just want to implement his spark/hadoop workflows on GCP width Dataproc Workflow templates

Introduction : a bit of history & context

From vertical to horizontal scaling

Ali Godsi founder of Databricks and Apache Spark explains in this episode’s podcast of invest like the best, how the Big Data paradigm shift has moved us our compute race from vertical scaling (you scale by improving the code performance’s execution and the hardware machine) to horizontal scaling (you scale by distributing your computation through multiple nodes).

The sparkle

One big initiator for this change was the Google’s paper introducing map reduce computational model, Hadoop is a first generation implementation of this paradigm, it came with its challenges and limitations, Spark’s RDD proposed model tackled some of these limitations. As an overview, Spark handles better data in memory, support streaming and batch processing (where Hadoop supports only batch processing) and is up to 100x faster than Hadoop

Yes for big data, No for infrastructure cost maintenance

This Big Data shift paradigm has pushed many companies to implement their hadoop/spark cluster on premise, they did, but soon found themselves with a non-desired additional responsibility and cost of maintaining/monitoring and managing continuously these infrastructures, where sometimes their needs were punctual.

Then came another paradigm, the serverless cloud paradigm where you have at your disposal complete managed ephemeral robust monitored infrastructures, you pay only for what you consume, no extra cost of hiring the IT team to maintain the infrastructure….

Dataproc Workflow Templates

Introducing & defining Dataproc Workflow Templates :

Dataproc is a fully managed and highly scalable service that allows you within other things to set up a cluster and run your pySpark/Spark/hadoop jobs on Google cloud platform

Dataproc Workflow Templates (DWT) is a layer interface with Dataproc, that takes the responsibility of setting up a new Dataproc cluster (or selecting an existing one if desired) executes the jobs it has been configured to execute, then delete the cluster after execution

PS : the jobs are represented on a DAG by DWT

The benefits of using DWT

  • It’s abstract for you having to set up a DataProc, then delete the resource after execution

See google’s use case recommendation

PS : You can schedule your DWT on GCP by :

  • Using a cloud function/cloud run, see ref

DWT Tutorial step by step

In this section we are going to explore a directory pattern structure, where we will see :

  1. How to organize your pyspark jobs into selectable modules by a main.py entrypoint

Pre-requisite :

As a pre-requisite you should have :

PS : GitHub repoiotry of the example will be linked at the end

1 ) How to organize your pyspark jobs into selectable modules by a main.py entrypoint

Folder structure :

Let’s consider the following project structure :

gcp-dataproc-workflow-template-pyspark-migration-example/
┣ src/
┃ ┣ jobs/
┃ ┃ ┣ load_loans/
┃ ┃ ┃ ┣ __init__.py
┃ ┃ ┃ ┗ main.py
┃ ┃ ┗ __init__.py
┃ ┗ main.py
┣ workflow_templates/
┃ ┗ temp.yaml
┣ .env
┣ .gitignore
┣ LICENSE
┣ Makefile
┗ README.md

How jobs are organized :

The src folder contains 2 important elements :

  • A main.py entry point CLI that takes 2 arguments, job=[which job to run] and job-args=[the arguments that the job needs]

Each submodule expose a function analyze :

def analyze(spark, **kwargs):pass

Then the main.py loads the module and pass the arguments to the job_module.analyze function :

So let’s see how the function analyze of load_loans looks like :

The content of this job is a treatment taken from Gary A. STAFFORD’s tutorial, the analyze function takes an argument gcs_input_path that contains the data path, and gcs_output_path the path where to output the result of the treatment.

In what follows we will go step by step using Makefile of our github repo to create, add a DWT, submit a job and generate a DWT’s yaml.

PS : you should create a .env file at the root directory, take as an example the .env_example file at the root directory

2) How to create a DWT and add a cluster to it

How we create a DWT :

So after having created and filled your .env, having your gsutil and gcloud setup you can use the makefile to execute this command :

make create_cluster

Which execute the following target :

create_template:  gcloud dataproc workflow-templates create \  ${TEMPLATE_ID} — region ${REGION}

This command creates a template in GCP, ${TEMPLATE_ID} is the name and an identifier for your DWT, you should be able to see your DWT in the console in section WORKFLOW TEMPLATES

If you double-click on it, you will see that it does not contain any configuration yet :

How add a Dataproc cluster to a DWT :

Now that we have created our DWT we should add a cluster to it by executing the command :

make add_cluster

Which execute the followinWhich execute the following target :g target :

add_cluster:  gcloud dataproc workflow-templates set-managed-cluster \  ${TEMPLATE_ID} \  --region ${REGION} \  --zone ${ZONE} \  --cluster-name three-node-cluster \  --master-machine-type n1-standard-2 \  --master-boot-disk-size 500 \  --worker-machine-type n1-standard-2 \  --worker-boot-disk-size 500 \  --num-workers 2 \  --image-version 1.3-deb9

Here we are creating a configuration for a dataproc cluster. As indicated in the documentation the command gcloud dataproc workflow-templates inherit its flags from gcloud dataproc cluster create.

Now if you double-click on the DWT you will see that the cluster configuration has been taken into account :

Here we have a cluster with 3 nodes of machine type n1-standard-2 with 500GB disk size, and if you want to see more how you can customize your cluster, check the ref and let’s see next how we submit our jobs

3) How to package and submit a job to a DWT

Generating distribution

So recalling from the first section how jobs are organized, for our example, all what we have to do is to reproduce in a distribution the main.py and the module jobs, so by running the command :

make build

Which execute the following target :

build: clean  mkdir ./dist  cp ./src/main.py ./dist  cd ./src && zip -x main.py -r ../dist/jobs.zip .

The command creates a directory dist, inside dist, the main.py is copied, the jobs module archived (excluding the main.py) to jobs.zip, which should generate for you the following

dist/
┣ jobs.zip
┗ main.py

The archive will be passed in the argument — py-files, even if it’s not clearly mentioned in the spark ref or google ref it seems that the archive is extracted at the same level of the main.py, which reproduce the folder structure shown above

Make your distribution available on GCP

Now that we have built our distribution, we should make it available to our DWT, in order to do that we push it to an existing bucket GCS by executing the command

make copy_build

Which execute the following target :

copy_build:  gsutil cp -r ${ROOTH_PATH}/dist ${BUCKET_NAME}

This command copy your dist generated into your bucket, you should see your dist folder in your bucket :

Job submission

So now that our main.py and jobs.zip are available on GCP we can submit our job by running the command :

make submit_job

Which execute the following target :

submit_job:  gcloud dataproc workflow-templates add-job pyspark       ${BUCKET_NAME}/dist/main.py \  --step-id ${STEP_ID} \  --py-files=${BUCKET_NAME}/dist/jobs.zip \  --workflow-template ${TEMPLATE_ID} \  --region ${REGION} \  -- --job=${JOB_NAME} \  --job-args="gcs_input_path=${BUCKET_NAME}/data/ibrd-statement-of-   loans-historical-data.csv" \  --job-args="gcs_output_path=${BUCKET_NAME}/data/ibrd-summary-large-python"

In the UI you should be able to see the job added in section job detail

Copy data to GCP

Before running our job, we will make the data irbd dataset downloaded from Kaggle available on GCP, so for that purpose we run the command

make copy_irbd_dataset_2gcp

Which will execute the following target :

copy_irbd_dataset_2gcp:  cd  ${ROOTH_PATH}/data && unzip irbd_data.zip;\  gsutil cp ${ROOTH_PATH}/data/ibrd-statement-of-loans-historical-     data.csv ${BUCKET_NAME}/data/ibrd-statement-of-loans-historical-  data.csv

We unzip our dataset, then push it to GCP into our bucket

Running our Job

Now that we have submitted our job to our DWT, made the data available to GCP, we will be able to instanciate our workflow and run the jobs it contains, either from the UI, command line, cloud run/function, or cloud composer.

In the UI click on RUN:

Then in WORKFLOWS tab you should see your job running :

Once finished it will generate into your ${BUCKET}/data/ibrd-summary-large-python the job’s output

So here we have submitted one job and that have ran succefully, we could have submitted more, and launch the run.

Unless you specify an order, the jobs run concurently as mentioned in the documentation.

Also keep in mind if one job fails, all the workflow does, and the jobs are stopped.

So let’s see in the next section how we can use yaml files to define our DWT

4) How we can create a DWT from a yaml file

So our DWT is already defined, it’s better to export it as yaml to see how it looks like, for that we run the command :

make export_yaml

Which execute the following target :

export_yaml:  gcloud dataproc workflow-templates export ${TEMPLATE_ID} \  --destination workflow_templates/temp.yaml \  --region ${REGION}

This command export our DWT into a file workflow_templates/temp.yaml

Once exported our workflow_templates/temp.yaml will look like that :

Here you can see, we are definining our pyspark jobs (here just one), you can add another ones, following the same pattern, just change the parameters for your new jobs.

Once you have done your modification, you can update your DWT by importing it as specified in the documentation

You can also directly instanciate your workflow and launch your job from your yaml as referenced

5) How to handle 3rd party dependencies or shared modules between jobs

Up to know what we have seen, is a pattern structure to package jobs as modules and submit them in DWT. But any serious job will probably require internal (shared modules) and 3rd party dependencies

Shared modules :

The jobs might be using the same functions/helpers, it’s make no sens to repeat the functions/helpers definition in each job’s module, so the path to go is to define a shared module that will be zipped in jobs.zip, and be put at the same level as jobs folder, considering the following src folder structure :

src/
┣ jobs/
┃ ┣ load_loans/
┃ ┃ ┣ __init__.py
┃ ┃ ┗ main.py
┃ ┗ __init__.py
┣ utils/
┃ ┣ __init__.py
┃ ┗ hello.py
┗ main.py

The command you ran earlier :

make build 

will generates a dist that contains the utils module inside jobs.zip, to be sure of that run :

unzip dist/jobs.zip -d ./dist

Run an ls and see the result :

jobs jobs.zip main.py utils

as your job module load_loans is able in his __init__.py to make the import

from jobs.load_loans.main import analyze

your job will be able to make the import as easily as that

from utils.hello import say_hello

So nearly the same logic applies on external 3rd party dependencies, let’s see how it works

3rd party dependencies :

Let’s consider we have dependency on a library, click for example and our requierments.txt looks like that

click==7.1.2

After having tested locally our dependency, we do can install it into a directory by running the command :

pip install -r requirements.txt -t src/libs

This will make click available in directory libs

src/
┣ jobs/
┃ ┣ load_loans/
┃ ┗ __init__.py
┣ libs/
┃ ┣ click/
┃ ┗ click-7.1.2.dist-info/
┣ utils/
┃ ┣ __init__.py
┃ ┗ hello.py
┗ main.py

But if we follow the exact same logic for utils and jobs, we will be obliged to make the import like that

from libs import click

It’s not desirable to make an import containing libs, so the clever idea proposed by Eran Kampf in his excellent medium article, is to generate a zip folder, which its parent folder won’t be the src but the libs, let’s see the result :

At the root directory, run the command :

cd ./src/libs && zip -r ../../dist/libs.zip .

you will see your dist now looks like this :

dist/
┣ jobs.zip
┣ libs.zip
┗ main.py

when you unzip jobs.zip and libs.zip and run an ls :

click click-7.1.2.dist-info jobs jobs.zip libs.zip main.py utils

The click depency and other dependencies, their import behaviour will be like utils module or jobs module import behaviour

So let’s run another build command makefile but, this time it will takes into account the 3rd party dependencies.

Run the command :

make build_with_deps

Which will execute the following target :

build_with_deps: clean  cp ./src/main.py ./dist  cp ./src/main.py ./dist  pip install -r requirements.txt -t src/libs  cd ./src && zip -x main.py -x \*libs\* -r ../dist/jobs.zip .  cd ./src/libs && zip -r ../../dist/libs.zip .

The target will install requirements into libs, and wen libs.zip, jobs.zip will be unziped, they depedencies will be just modules present on the directory

Just don’t forget to add in your jobs the new archive, for example in your DWT yaml in jobs key, you should add gs://dfwt-example/dist/libs.zip in pythonFileUris :

jobs:- pysparkJob:args:- — job=load_loans- — job-args=gcs_input_path=gs://dfwt-example/data/ibrd-statement-of-loans-historical-data.csv- — job-args=gcs_output_path=gs://dfwt-example/data/ibrd-summary-large-pythonmainPythonFileUri: gs://dfwt-example/dist/main.pypythonFileUris:- gs://dfwt-example/dist/jobs.zip- gs://dfwt-example/dist/libs.zipstepId: ibrd-large-pyspark

NB : There is another way proposed by google to handle dependencies by configuring the cluster environement but i clearly prefer the method, since it’s not GCP product related.

Conclusion

We have seen how we can package our pyspark jobs into modules with their dependencies and have an interface CLI to select them.

This pattern allows us to scale multiple jobs and having without overhead the same entrypoint, all what changes is the entrypoint’s arguments

We have seen also that the DWT, once configured takes the responsability to instanciate a dataproc and run the jobs, you can find the logs of the jobs in stackdriver (or in Cloud Composer if you run it on Cloud Composer)

If you want to see how you can adapt this pattern for spark/hadoop workflows, please see again Gary A. STAFFORD’s tutorial

If you wan to check how you can test your pyspark jobs, give a look again to Eran Kampf’s article

I wish that this tutorial was helpful for you in your journey ! and thanks for reading, here is again the github repo containing all the sources for the examples

Full stack devops based in Paris, python advocate, let’s connect on Linkedin : https://www.linkedin.com/in/hamza-senhaji-rhazi-72170678/