By Daniel Foley, Data Scientist

Image

 

Introduction

 

Recently I have been trying to find better ways to improve my workflow as a data scientist. I tend to spend a decent chunk of my time modelling and building ETLs in my job. This has meant that more and more I need to rely on tools to reliably and efficiently handle large datasets. I quickly realised that using pandas for manipulating these datasets is not always a good approach and this prompted me to look into other alternatives.

In this post, I want to share some of the tools that I have been exploring recently and show you how I use them and how they helped improve the efficiency of my workflow. The two I will talk about in particular are Snowflake and Dask. Two very different tools but ones that complement each other well especially as part of the ML Lifecycle. My hope is that after reading this post you will have a good understanding of what Snowflake and Dask are, how they can be used effectively and be able to get up and running with your own use cases.

More specifically, I want to show you how you can build an ETL pipeline using Snowflake and Python to generate training data for a machine learning task. I then want to introduce Dask and Saturn Cloud and show you how you can take advantage of parallel processing in the cloud to really speed up the ML training process so you can increase your productivity as a data scientist.

 

Building ETLs in Snowflake and Python

 

Before we jump into coding I better briefly explain what Snowflake is. This is a question I recently asked when my team decided to start using it. At a high level, it is a data warehouse in the cloud. After playing around with it for a while I realised how powerful it was. I think for me, one of the most useful features is the virtual warehouses that you can use. A virtual warehouse gives you access to the same data but is completely independent of other virtual warehouses so compute resources are not shared across teams. This has proven very useful as it removes any potential for performance issues caused by other users executing queries throughout the day. This has resulted in less frustration and time wasted waiting for queries to run.

Since we are going to be using Snowflake I will briefly outline how you can set it up and start experimenting with it yourself. We need to do the following:

  • Get a Snowflake account set up
  • Get our data into Snowflake
  • Write and test our queries using SQL and the Snowflake UI
  • Write a Python class that can execute our queries to generate our final dataset for modelling

Setting up an account is as easy as signing up for a free trial on their website. Once you have done that you can download the snowsql CLI here. This will make it straightforward to add data to Snowflake. After following these steps we can try and connect to Snowflake using our credentials and the command line.

snowsql -a <account_name> -u <user_name>

 

You can find your account name in the URL when you log in to the Snowflake UI. It should look something like this: xxxxx.europe-west2.gcp. Ok, let’s move onto the next step and get our data into Snowflake. There are a few steps we need to follow here namely:

  • Create our virtual warehouse
  • Create a database
  • Define and Create our tables
  • Create a staging table for our CSV files
  • Copying the data into our tables

Luckily this isn’t too difficult and we can do this entirely using the snowsql CLI. For this project, I will be using a smaller dataset than I would like but unfortunately, I cannot use any of my company’s data and it can be pretty difficult to find large suitable datasets online. I did however find some transaction data from Dunnhumby which is freely available on Kaggle. Just for kicks though I create a much larger synthetic dataset using this data to test how well Dask handles the challenge compared to sklearn.

First of all, we need to set up a virtual warehouse and a database using the following commands in the Snowflake UI.

create or replace warehouse analytics_wh with
warehouse_size=”X-SMALL”
auto_suspend=180
auto_resume=true
initially_suspended=true;

create or replace database dunnhumby;

Our data consists of 6 CSVs which we will convert into 6 tables. I won’t spend too much time going over the dataset as this post is more about using Snowflake and Dask rather than interpreting data.

Below are the commands we can use to create our tables. All you will need to know in advance is what columns and data types you will be working with.

create or replace table campaign_desc ( 
description string, 
campaign number,
start_day number,
end_day number );

create or replace table campaign_table ( 
description string, 
Household_key number, 
campaign number );

create or replace table coupon ( 
COUPON_UPC number, 
product_id number, 
campaign number );

create or replace table coupon_redempt ( 
household_key number, 
day number, 
coupon_upc number, 
campaign number );

create or replace table transactions ( 
household_key number, 
BASKET_ID number, 
day number, 
product_id number, 
quantity number, 
sales_value number, 
store_id number, 
retail_disc decimal, 
trans_time number, 
week_no number, 
coupon_disc decimal, 
coupon_match_disc decimal );

create or replace table demographic_data ( 
age_dec string, 
marital_status_code string, 
income_desc string, 
homeowner_desc string, 
hh_comp_desc string, 
household_size_desc string, 
kid_category_desc string, 
Household_key number);

 

Now that we have our tables created we can start thinking about how to get data into them. For this, we will need to stage our CSV files. This is basically just an intermediary step so Snowflake can directly load the files from our stage into our tables. We can use the PUT command to put local files in our stage and then the COPY INTO command to instruct Snowflake where to put this data.

use database dunnhumby;

create or replace stage dunnhumby_stage;

PUT file://campaigns_table.csv @dunnhumby.public.dunnhumby_stage;

PUT file://campaigns_desc.csv @dunnhumby.public.dunnhumby_stage;

PUT file://coupon.csv @dunnhumby.public.dunnhumby_stage;

PUT file://coupon_d=redempt.csv @dunnhumby.public.dunnhumby_stage; 
PUT file://transaction_data.csv @dunnhumby.public.dunnhumby_stage; 
PUT file://demographics.csv @dunnhumby.public.dunnhumby_stage;

 

As a quick check, you can run this command to check what is in the staging area.

ls @dunnhumby.public.dunnhumby_stage;

 

Now we just need to copy the data into our tables using the queries below. You can execute these either in the Snowflake UI or in the command line after logging into Snowflake.

copy into campaign_table 
from @dunnhumby.public.dunnhumby_stage/campaigns_table.csv.gz 
file_format = ( type = csv
skip_header=1 
error_on_column_count_mismatch = false 
field_optionally_enclosed_by=’”’);

copy into campaign_desc 
from @dunnhumby.public.dunnhumby_stage/campaign_desc.csv.gz 
file_format = ( type = csv
skip_header=1 
error_on_column_count_mismatch = false 
field_optionally_enclosed_by=’”’);

copy into coupon 
from @dunnhumby.public.dunnhumby_stage/coupon.csv.gz 
file_format = ( type = csv
skip_header=1 
error_on_column_count_mismatch = false 
field_optionally_enclosed_by=’”’);

copy into coupon_redempt 
from @dunnhumby.public.dunnhumby_stage/coupon_redempt.csv.gz 
file_format = ( type = csv
skip_header=1 
error_on_column_count_mismatch = false 
field_optionally_enclosed_by=’”’);

copy into transactions 
from @dunnhumby.public.dunnhumby_stage/transaction_data.csv.gz 
file_format = ( type = csv
skip_header=1 
error_on_column_count_mismatch = false 
field_optionally_enclosed_by=’”’);

copy into demographic_data 
from @dunnhumby.public.dunnhumby_stage/demographics.csv.gz 
file_format = ( type = csv skip_header=1 
error_on_column_count_mismatch = false 
field_optionally_enclosed_by=’”’);

 

Ok great, with any luck we have our data in our tables first try. Oh, if only it was that simple, this whole process took me a few tries to get right (beware of spelling things wrong). Hopefully, you can follow along with this and be good to go. We are getting closer to the interesting stuff but the steps above are a vital part of the process so make sure you understand each of these steps.

 

Writing our Pipeline in SQL

 
In this next step, we will be writing the queries to generate our target, our features and then finally produce a training data set. One approach to creating a dataset for modelling is to read this data into memory and use pandas to create new features and join all the data frames together. This is typically the approach you see on Kaggle and in other online tutorials. The issue with this is that it is not very efficient, particularly when you are working with any reasonably sized datasets. For this reason, it is a much better idea to outsource the heavy lifting to something like Snowflake which handles massive datasets extremely well and will likely save you a huge amount of time. I won’t be spending much time diving into the specifics of our dataset here as it isn’t really vital for what I am trying to show. In general, though, you would want to spend a considerable amount of time exploring and understanding your data before you start modelling. The goal of these queries will be to preprocess the data and create some simple features which we can later use in our models.

 

Target Definition

 
Obviously, a vital component of supervised machine learning is defining an appropriate target to predict. For our use case, we will be predicting churn by calculating whether or not a user makes another visit within two weeks after a cutoff week. The choice of 2 weeks is pretty arbitrary and will depend on the specific problem we are trying to solve but let’s just assume that it is fine for this project. In general, you would want to carefully analyse your customers to understand the distribution in gaps between visits to arrive at a suitable definition of churn.

The main idea here is that for each table we want to have one row per household_key containing values for each of our features.

 

Campaign Features

 

 

Transaction Features

 
Below we create some simple metrics based on aggregate statistics such as the average, the max and standard deviation.

 

Demographic Features

 
This dataset has lots of missing data so I decided to use imputation here. There are plenty of techniques out there for missing data from dropping the missing data, to advanced imputation methods. I have just made life easy for myself here and replaced missing values with the mode. I wouldn’t necessarily recommend taking this approach in general as understanding why this data is missing is really important in deciding how to deal with it but for the purposes of this example, I will go ahead and take the easy approach. We first compute the mode for each of our features and then use coalesce to replace each row with the mode if data is missing.

 

Training Data

 
Finally, we build a query for our training data by joining our main tables together and end up with a table containing our target, our campaign, transactions and demographic features which we can use to build a model.

As a brief aside, for those interested in learning more about the features and nuances of Snowflake I would recommend the following book: Snowflake Cookbook. I started reading this book and it is full of really helpful information on how to use Snowflake and goes into far more detail than I do here.

 

Python Code for ETL

 
The final piece we require for this ETL is to write a script to execute it. Now, this is only really required if you plan on running an ETL like this regularly but this is good practice and makes it much easier to run the ETL as and when needed.

Let’s briefly discuss the main components of our EtlTraining class. Our class takes one input which is the cutoff week. This is due to the way data is defined in our dataset but ordinarily, this would be in a date format that corresponds to the cutoff date we want to choose for generating training data.

We initialise a list of our queries so we can easily loop through these and execute them. We also create a dictionary containing our parameters which we pass to our Snowflake connection. Here we use environment variables that we set up in Saturn Cloud. Here is a guide on how to do this. It is not too difficult to connect to Snowflake, all we need to do is use the Snowflake connector and pass in our dictionary of credentials. We implement this in the Snowflake connect method and return this connection as an attribute.

To make these queries a little bit easier to run I save each query as a python string variable in the ml_query_pipeline.py file. The execute_etl method does exactly what it says on the tin. We loop through each query, format it, execute it and finish off by closing the Snowflake connection.

To run this ETL we can simply type the commands below into the terminal. (where ml_pipeline is the name of the script above.)

python -m ml_pipeline -w 102 -j ‘train’

 

As a brief aside, you will probably want to run an ETL like this at regular intervals. For example, if you want to make daily predictions then you will need to generate a dataset like this each day to pass to your model so you can identify which of your customers are likely to churn. I won’t go into this in detail here but in my job, we use Airflow to orchestrate our ETLs so I would recommend checking it out if you are interested. In fact, I recently bought a book ‘Data Pipelines with Apache Airflow’ which I think is great and really gives some solid examples and advice on how to use airflow.

 

Dask and Modeling

 

Now that we have our data pipeline built, we can begin to think about modelling. The other main goal I have for this post is to highlight the advantages of using Dask as part of the ML development process and show you guys how easy it is to use.

For this part of the project, I also used Saturn Cloud which is a really nice tool I came across recently that allows us to harness the power of Dask across a cluster of computers in the cloud. The main advantages of using Saturn for me are that it is really easy to share your work, super simple to scale up your compute as and when you need it and it has a free tier option. Model development in general is a really good use case for Dask as we usually want to train a bunch of different models and see what works best. The faster we can do this the better as we have more time to focus on other important aspects of model development. Similar to Snowflake you just need to sign up here and you can very quickly spin up an instance of Jupyter lab and start experimenting with it yourself.

Now, I realise at this point I have mentioned Dask a few times but have never really explained what it is. So let me take a moment to give you a very high-level overview of Dask and why I think it is awesome. Very simply, Dask is a python library that takes advantage of parallel computing to allow you to process and perform operations on very large datasets. And, the best part is, if you are already familiar with Python, then Dask should be very straightforward as the syntax is very similar.

The graph below highlights the main components of Dask.



Source: Dask Documentation

Collections allow us to create a graph of tasks which can then be executed across multiple computers. Some of these data structures probably sound pretty familiar such as arrays and data frames and they are similar to what you would find in python but with some important differences. For example, you can think of a Dask data frame as a bunch of pandas data frames built in such a way that allows us to perform operations in parallel.

Moving on from collections we have the scheduler. Once we create the task graph the scheduler handles the rest for us. It manages the workflow and sends these tasks to either a single machine or distributes them across a cluster. Hopefully, that gives you a very brief overview of how Dask works. For more info, I suggest checking out the documentation or this book. Both are very good resources to dig deeper into this topic.

 

Python Code for Modelling

 
When modelling, I tend to have a small number of go-to algorithms that I will always try out first. This will generally give me a good idea of what might be suited to the specific problem I have. These models are Logistic Regression, Random Forest and GradientBoosting. In my experience, when working with tabular data these algorithms will usually give you pretty good results. Below we build a sklearn modelling pipeline using these 3 models. The exact models we use here are not really important as the pipeline should work for any sklearn classification model, this is just my preference.

Without further ado, let’s dive into the code. Luckily we outsourced most of our preprocessing to Snowflake so we don’t have to mess around with our training data too much here but we will add a few additional steps using sklearn pipelines.

The first code snippet below shows the pipeline when using sklearn. Notice our dataset is a plain old pandas data frame and our preprocessing steps are all carried out using sklearn methods. There is nothing particularly out of the ordinary going on here. We are reading in our data from the table produced by our Snowflake ETL and passing this into a sklearn pipeline. The usual modelling steps apply here. We split the dataset into train and test and do some preprocessing, namely impute missing values using the median, scale the data and one-hot encode our categorical data. I am a big fan of sklearn pipelines and basically use them whenever I develop models nowadays, they really facilitate clean and concise code.

How does this pipeline perform on a dataset with about 2 million rows? Well, running this model without any hyperparameter tuning takes about 34 minutes. Ouch, kinda slow. You can imagine how prohibitively long this would take if we wanted to do any type of hyperparameter tuning. Ok, so not ideal but let’s see how Dask handles the challenge.

 

Dask ML Python Code

 
Our goal here is to see if we can beat the sklearn pipeline above, spoiler alert, we definitely can. The cool thing about Dask is that the barrier to entry when you are already familiar with python is pretty low. We can get this pipeline up and running in Dask with only a few changes.

The first change you probably will notice is that we have some different imports. One of the key differences between this pipeline and the previous one is that we will be using a Dask data frame instead of a pandas data frame to train our model. You can think of a Dask data frame as a bunch of pandas data frames where we can perform computations on each one at the same time. This is the core of Dask’s parallelism and is what is going to reduce the training time for this pipeline.

Notice we use @dask.delayed as a decorator to our load_training_data function. This instructs Dask to parallelise this function for us.

We are also going to import some preprocessing and pipeline methods from Dask and most importantly, we will need to import SaturnCluster which will allow us to create a cluster for training our models. Another key difference with this code is that we use dask.persist after our train test split. Before this point, none of our functions has actually been computed due to Dask’s lazy evaluation. Once we use the persist method though we are telling Dask to send our data to the workers and execute the tasks we have created up until this point and leave these objects on the cluster.

Finally, we train our models using the delayed method. Again, this enables us to create our pipeline in a lazy way. The pipeline is not executed until we reach this code:

fit_pipelines = dask.compute(*pipelines_)

 

This time it only took us around 10 minutes to run this pipeline on the exact same dataset. That is a speedup by a factor of 3.4, not too shabby. Now, if we wanted to, we could speed this up even more by scaling up our compute resources at the touch of a button in Saturn.

 

Deploying our Pipeline

 

I mentioned earlier that you will probably want to run a pipeline like this quite regularly using something like airflow. It just so happens that if you don’t want the initial hassle of setting everything up for airflow Saturn Cloud offers a simple alternative with Jobs. Jobs allow us to package up our code and run it at regular intervals or as needed. All you need to do is go to an existing project and click on create a job. Once we do that, it should look like the following:



Source: Saturn

 

From here, all we need to do is make sure our python files above are in the directory in the image and we can enter our python command above

python -m ml_pipeline -w 102 -j 'train'

 

We can also set up a schedule using cron syntax to run the ETL on a daily basis if we like. For those interested, here is a Tutorial that goes into all the nitty-gritty.

 

Conclusions and Takeaways

 

Well, we have reached the end of our project at this point. Now obviously I have left out some key parts of the ML development cycle such as hyperparameter tuning and deploying our model but perhaps I will leave that for another day. Do I think you should try Dask? I am no expert by any means but from what I have seen so far it certainly seems really useful and I am super excited to experiment more with it and find more opportunities to incorporate it into my daily work as a data scientist. Hopefully, you found this useful and you too can see some of the advantages of Snowflake and Dask and you will start experimenting with them on your own.

 

Resources

 

 

Some of my other posts you may find interesting

 
Let’s Build a Streaming Data Pipeline

 
Gaussian Mixture Modelling (GMM)
 

A Bayesian Approach to Time Series Forecasting
 

Note: Some of the links in this post are affiliate links.

 
Bio: Daniel Foley is a former Economist turned Data Scientist working in the mobile gaming industry.

Original. Reposted with permission.

Related:



Source link

Leave a Reply

Your email address will not be published.