2 minute read

Recently I needed to backload a few years of json data from an API to Snowflake. This felt like a great opportunity to test out Snowpark for Python!

The script below loops over a specified date range, calls an API for that day, and inserts a record into a Snowflake VARIANT table. The first thing we’ll do is import our modules and authenticate to Snowpark:

import os, json, requests

from datetime import date, timedelta
from snowflake.snowpark import Session

from dotenv import load_dotenv
load_dotenv()

account = os.getenv('SNOWFLAKE_ACCT')
user = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
role = 'SYSADMIN'
warehouse = 'MY_WH'
database = 'DEV'
schema = 'MY_SCHEMA'
target_table = 'MY_TABLE'

api_key = os.getenv('MY_API_KEY')

def snowpark_cnxn(account, user, password, role, warehouse, database, schema):
    connection_parameters = {
    'account': account,
    'user': user,
    'password': password,
    'role': role,
    'warehouse': warehouse,
    'database': database,
    'schema': schema
   }
    session = Session.builder.configs(connection_parameters).create()
    return session

session = snowpark_cnxn(account, 
                        user, 
                        password, 
                        role, 
                        warehouse, 
                        database, 
                        schema)

print(session.sql('''SELECT CURRENT_WAREHOUSE(), 
                    CURRENT_DATABASE(), 
                    CURRENT_SCHEMA()''').collect())

In the previous block of code, my Snowflake credentials are hidden in a .env file, and I load them into the python script using the python-dotenv library.

Since the .env file contains your Snowflake secrets, you don’t want to add it to your cloud repo! Make sure you add .env to your gitignore by running the following command in a terminal:

cd <your_repo>
echo '.env' >> .gitignore

Next, I defined a function that will allow us to loop over a date range, incremented by days:

def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days)):
        yield start_date + timedelta(n)

Finally, we can execute the for loop. In this case I simply needed to pass the API key as a header in the API call, your mileage will definitely vary depending on the security requirements of your API:

headers = {'APIKey': f'{api_key}'}       
start_date = date(2019, 1, 1)
end_date = date(2022, 12, 31)

for date in daterange(start_date, end_date):
    url = f'https://api.mywebsite.com/api/data?&startDate={start_date}&endDate={end_date}'
    response = requests.request('GET', url, headers=headers)
    
    formatted_json = json.loads(response.text)
    formatted_json = json.dumps(formatted_json, indent = 4)
    
    # insert to Snowflake
    session.sql(f"""INSERT INTO {target_table} (JSON_DATA, INSERT_DATE)
                    SELECT PARSE_JSON('{formatted_json}'),
                    CURRENT_TIMESTAMP();""").collect()

Note that this did take ~15 minutes to run. For performance reasons, I’d probably look into placing the json file in an internal stage next time, and executing the COPY INTO command to speed things up.

Here is the script in its entirety:

# Import modules
import os, json, requests

from datetime import date, timedelta
from snowflake.snowpark import Session

from dotenv import load_dotenv
load_dotenv()

# Establish Snowflake Connection using Snowpark
account = os.getenv('SNOWFLAKE_ACCT')
user = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
role = 'SYSADMIN'
warehouse = 'MY_WH'
database = 'DEV'
schema = 'MY_SCHEMA'
target_table = 'MY_TABLE'

api_key = os.getenv('MY_API_KEY')

def snowpark_cnxn(account, user, password, role, warehouse, database, schema):
    connection_parameters = {
    'account': account,
    'user': user,
    'password': password,
    'role': role,
    'warehouse': warehouse,
    'database': database,
    'schema': schema
   }
    session = Session.builder.configs(connection_parameters).create()
    return session

session = snowpark_cnxn(account, 
                        user, 
                        password, 
                        role, 
                        warehouse, 
                        database, 
                        schema)

print(session.sql('''SELECT CURRENT_WAREHOUSE(), 
                    CURRENT_DATABASE(), 
                    CURRENT_SCHEMA()''').collect())

def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days)):
        yield start_date + timedelta(n)

# variables
headers = {'APIKey': f'{api_key}'}       
start_date = date(2019, 1, 1)
end_date = date(2022, 12, 31)

# Loop through 4 years worth of API data, insert into Snowflake VARIANT table
for date in daterange(start_date, end_date):
    url = f'https://api.mywebsite.com/api/data?&startDate={start_date}&endDate={end_date}'
    response = requests.request('GET', url, headers=headers)
    
    formatted_json = json.loads(response.text)
    formatted_json = json.dumps(formatted_json, indent = 4)
    
    # insert to Snowflake
    session.sql(f"""INSERT INTO {target_table} (JSON_DATA, INSERT_DATE)
                    SELECT PARSE_JSON('{formatted_json}'),
                    CURRENT_TIMESTAMP();""").collect()