Car buying is fun with Snowflake Part 3

Parag Shah
Analytics Vidhya
Published in
4 min readNov 9, 2020

--

In part 2, entire pipeline to ingest data into Snowflake was automated using Azure Logic App and SnowPipe. JSON Data was loaded in Snowflake landing table with a single 1 column called JSON_DATA.

Ideally, there should also be a datetime column that will contain the date and time of when data was loaded into landing table. However, due to a limitation in SnowPipe it will not allow any additional column when using JSON format. If you try, you will get following error.

Snowflake’s Streams and Tasks feature can be leveraged to move this data into a 2nd landing table with additional columns such as load_dttm (load date time).

Snowflake Stream help with CDC (change data capture). It sort of works like a Kafka topic and will contain 1 row per changes in it base table. In this case VMS_Azure_Blob_LZ1 (landing zone 1)

//Create a stream on VMS_Azure_Blob_LZ1 table
CREATE OR REPLACE STREAM VMS_AZURE_BLOB_LZ1_STREAM ON TABLE “VMS”.”PUBLIC”.”VMS_AZURE_BLOB_LZ1";
//Verify using
SHOW STREAMS;
//Verify that stream works by invoking REST API to load some same data in LZ1 and then run a Select on stream
SELECT * FROM VMS_AZURE_BLOB_LZ1_STREAM;

Next step is to insert data present in stream to Landing Zone 2 table. It will be a simple SQL insert like this

//Create a 2nd Landing table (Seq is used to generate auto incremented ids)create or replace TABLE VMS_AZURE_BLOB_LZ2 (
SEQ_ID NUMBER(38,0) NOT NULL DEFAULT VMS.PUBLIC.VMS_AZURE_BLOB_LZ2_SEQ.NEXTVAL,
LOAD_DTTM TIMESTAMP_NTZ(9) NOT NULL DEFAULT CURRENT_TIMESTAMP(),
JSON_DATA VARIANT NOT NULL
)COMMENT='Data will be inserted from stream and task'
;
//Test and verify that select from Stream works as intended before using it in TaskINSERT INTO "VMS"."PUBLIC"."VMS_AZURE_BLOB_LZ2" (JSON_DATA) (
SELECT
JSON_DATA
FROM
VMS_AZURE_BLOB_LZ1_STREAM
WHERE
"METADATA$ACTION" = 'INSERT'

Remember the whole idea is to automate, so Inserts needs to run automatically. It can be done so using Snowflake Task. It basically works as a Task Scheduler using “cron” time format. In this case it is set as 30 20 * * * to run at 8:30 PM PT after all the dealerships close.

CREATE OR REPLACE TASK VMS_AZURE_BLOB_MOVE_LZ1_TO_LZ2_TASK
WAREHOUSE = TASKS_WH //A specific WH created to be used for Tasks only to show up on bill as separate line item
SCHEDULE = 'USING CRON 30 20 * * * America/Vancouver' //Process new records every night at 20:30HRS
WHEN
SYSTEM$STREAM_HAS_DATA('VMS_AZURE_BLOB_LZ1_STREAM')
AS
INSERT INTO "VMS"."PUBLIC"."VMS_AZURE_BLOB_LZ2" (JSON_DATA) (
SELECT
JSON_DATA
FROM
VMS_AZURE_BLOB_LZ1_STREAM
WHERE
"METADATA$ACTION" = 'INSERT'
);

Notice that in above DDL, warehouse specified is TASKS_WH, it was created specifically to run Tasks and in the monthly billing that Snowflake generates it will come as a separate line time. This way it is easier to monitor and track costs of Tasks by aligning them together so that compute can do few of them together, resulting into cost savings.

Once the task is created, remember to RESUME it, otherwise Task wont run. By default it will be created in suspended state. Using following statement a task is changed to “Resume” state.

ALTER TASK VMS_AZURE_BLOB_MOVE_LZ1_TO_LZ2_TASK RESUME;

Once a row from Snowflake stream is processed (inserted) in to its final destination, it is removed from the stream. It is like popping an item out of the queue. This ensures that same data is not processed again.

Data is now in it final resting place, in this table, each row will now have a LOAD_DTTM value of the time when that row was loaded.

Finally, a view that will parse this complex JSON object and return us tabular data.

create or replace view VMS_AZURE_BLOB_LZ2_VIEW as
select value:ad_id::text as id,value:stock_number::text as stock_number, value:vin::text as vin, value:days_on_lot::int as days_on_lot, value:sale_class::text as sale_class,value:demo::text as demo,
value:sale_expiry::text as sale_expiry, value:vehicle_class::text as vehicle_class, value:year::text as year,
value:make::text as make, value:model::text as model, value:trim::text as trim, value:passenger::text as passenger,
value:retail_price::double as retail_price,value:lowest_price::double as lowest_price, value:asking_price::double as asking_price, value:internet_price::double as internet_price, value:final_price::double as final_price,
value:wholesale_price::double as wholesale_price, value:sales_tax::double as sales_tax,
value:odometer::int as odometer, value:fuel_type::text as fuel_type, value:transmission::text as transmission, value:engine::text as "ENGINE", value:drive_train::text as drive_train, value:doors::text as doors,
value:exterior_color::text as exterior_color, value:vdp_url::text as vdp_url, value:image:image_original::text as image_original, value:manu_program::text as manu_program,
value:manu_exterior_color::text as manu_exterior_color,
value:body_style::text as body_style, value:certified::int as is_certified, value:company_data:company_name::text as company_name,
value:company_data:company_city::text as company_city, value:company_data:company_province::text as company_province, value:company_data:company_sales_email::text as company_sales_email,
value:company_data:company_sales_phone::text as company_sales_phone
from vms.public.VMS_AZURE_BLOB_LZ2
,LATERAL FLATTEN(input => json_data:results);

Query the view to get this data

At this point, entire pipeline is automated. Starting from a timer in Logic App that fetches data from REST APIs, dumping data into a Blog storage, to loading it in Snowflake using a SnowPipe and finally transforming JSON data into the tabular format for easier consumption.

I believe this data is prep and ready for Snowflake’s Data Sharing feature :)

--

--

Parag Shah
Analytics Vidhya

I live in Vancouver, Canada. I am an AI Engineer and Azure Solutions Architect. I enjoy good Coffee and Outdoors. LinkedIn: https://bit.ly/3cbD9gW