In the first part of this blog series, we took a high-level look at the Snowpark offering from Snowflake and how it compares to other similar technologies in the market.
In this second part we will explore some specific Snowpark APIs related to data transformation. As we know, Snowpark provides a Spark like programming API where users can process data that’s loaded in Snowflake and then also perform data science processing on top of the data. It follows the lazy execution strategy and works very similar to Spark APIs.
Below are some important Snowpark API constructs used for data transformation.
Session
This is the top-level abstraction provided in Snowpark. Everything gets initiated and driven by the Session object. Session allows the developer to work with Snowflake objects, files on stages and execute queries and return the results.
Internally the Snowpark Session API uses the Snowflake Connector for Python library to connect to Snowflake.
e.g.
session = Session.builder.configs(snowflake_conn_prop).create()
session.sql(f"USE ROLE {rolename}").collect()
Note the .collect() method at the end of every statement. This forces immediate execution of the command by Snowflake. Readers familiar with Spark construct would find this very similar to what they have been used to in Spark pipelines. It’s not necessary to add a collect at end of every statement. As snowpark evaluates lazily, it will cache all the statements till it encounters an action (like collect). Then it will find the most optimal way to combine the statements and execute it on the warehouse.
As we covered in the earlier post, Snowpark has native support for python packages using anaconda repository. The Session API provides 3 important methods that allow us to add any Python package or library and make it available to our Snowpark code when it runs in a virtual warehouse. This library can either come from an internal stage or could be available as part of the Anaconda repository.
- add_import (path[, import_path]) : Registers a remote file in stage or a local file as an import of a user-defined function (UDF).
- add_packages (*packages): Allows to add third party python libraries.
- add_requirements (file_path): Allows you to specify a requirements.txt file which could list all the python packages needed by your snowpark udf.
Dataframe
This is another important abstraction in Snowpark. It provides a lazily-evaluated relational dataset that contains a collection of ‘rows and columns with a column name and a type. The Dataframe api has a rich set of methods and are categorized broadly into two types:
- Transformations: These types of methods produce a new DataFrame from one or more existing DataFrames. Note that transformations are lazy and don’t cause the DataFrame to be evaluated immediately till an action is encountered.
- joins, groupby, filters, distinct, drop, flatten are some of the common ones used.
e.g.
dfDemo = session.table("DEMOGRAPHICS")
dfServ = session.table("SERVICES")
dfJoin = dfDemo.join(dfServ,dfDemo.col("CUSTOMERID") == dfServ.col("CUSTOMERID")
).select(dfDemo.CUSTOMERID.alias('CUSTOMERID'), '*')
dfServ.count()
- Actions: These types of methods cause the DataFrames and related commands to be evaluated in Snowflake. When you call a method that performs an action, Snowpark execute the SQL queries that it has cached till that point.
e.g.
dfServ.count()
or
dfServ.to_pandas()
dfServ.show()
Input/Output
This category of snowpark API’s allows to interact with files stored on Snowflake stages. This will be useful for processing data that is stored on your data lake (like S3 or Azure Blob Storage). Using this API you can load the files (via Snowflake stage) and make it available as a DataFrame or write your transformed data back to your data lake if needed or keep it in Snowflake tables. The input output API has native support for standard file types viz. json, csv, parquet, xml, avro, orc and works with compressed or uncompressed files.
e.g.
filename = "raw_telco_data.parquet"
stagename = "rawdata"
rawtable = "RAW_PARQUET_DATA"
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable, FORCE= True)
df_raw.show() #.show()
Combining the various methods in the above three Snowpark API categories developers can develop end-to-end data transformation modules for both files-based data sources (ETL) and/or data that’s already loaded into Snowflake tables (ELT). The fact that all this happens inside virtual warehouses without your data ever leaving Snowflake is very attractive.
Please go through the example Jupyter Notebook (telco_preprocess.ipynb) provided on my Github repo here. Note that the code is borrowed from Churn Prediction example provided at Snowpark examples repository.
In the last part of this blog series, we will go over the machine learning module support that Snowpark provides. Using it, all your machine learning code including running predictions can all be executed directly inside Snowflake. That is, without doubt, the most interesting and exciting feature of Snowpark. Stay tuned!