How does it work
Astrato appends metadata to the end of each query, subsequently allowing this to be viewed in Snowflake. Snowflake already contains data about which user has accessed each table, warehouse etc.
In this article we will cover how to store this data in a new database, using only one block of code.
The data that Astrato appends is shown below.
--
{
"objectId":"f4ed7820-9884-45ba-b078-86fc260d5511",
"sheetId":"3c2fa835-05ce-47f8-9489-3b99a76adc0b",
"workbookId":"d8ccb7b8-970e-477f-a093-6e862a431e21"
}
To supplement this, you can export the lookup associating workbooks, sheets and users, from the administration section https://app.astrato.io/administration/exports. In the step below, code can be run to create these table, enabling an easy upload of your data. Currently, this is a manual upload for security purposes, since user data contains PII.
Storing the logs
To setup a storage area, you need to create a database called ASTRATO_QUERY_LOGS and populate it weekly. The code to do this is below. Altering the structure means that Astrato will not be able to provide a compatible analytics workbook. We recommend reviewing the scheduled task to ensure this is run at an appropriate time.
βΉοΈ Edit the warehouse name
-- π ²πππ ΄π °ππ ΄ πππππ ²ππππ ΄ Create the target database, schema & table if it doesn't exist
CREATE DATABASE IF NOT EXISTS ASTRATO_QUERY_LOGS;
CREATE SCHEMA IF NOT EXISTS ASTRATO_LOGS;
create TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY (
QUERY_ID VARCHAR(16777216),
QUERY_TAG VARCHAR(16777216),
QUERY_TEXT VARCHAR(16777216),
EXECUTION_TIMESTAMP TIMESTAMP_LTZ(6),
USER_NAME VARCHAR(16777216),
WAREHOUSE_NAME VARCHAR(16777216),
DATABASE_NAME VARCHAR(16777216),
SCHEMA_NAME VARCHAR(16777216),
START_TIME TIMESTAMP_LTZ(6),
END_TIME TIMESTAMP_LTZ(6),
ERROR_MESSAGE VARCHAR(16777216),
ROWS_PRODUCED NUMBER(38,0),
ROWS_INSERTED NUMBER(38,0),
PERCENTAGE_SCANNED_FROM_CACHE FLOAT,
EXECUTION_TIME NUMBER(38,0),
COMPILATION_TIME NUMBER(38,0),
EXTERNAL_FUNCTION_TOTAL_INVOCATIONS NUMBER(38,0),
BYTES_SPILLED_TO_LOCAL_STORAGE NUMBER(38,0),
BYTES_SPILLED_TO_REMOTE_STORAGE NUMBER(38,0),
BYTES_SENT_OVER_THE_NETWORK NUMBER(38,0),
CREDITS_USED_CLOUD_SERVICES FLOAT,
WAREHOUSE_SIZE VARCHAR(16777216),
WAREHOUSE_TYPE VARCHAR(16777216),
ROLE_NAME VARCHAR(16777216),
ASTRATO_QUERY_META_DATA VARCHAR(16777216),
ASTRATO_USER_ID VARCHAR(16777216),
ASTRATO_OBJECT_ID VARCHAR(16777216),
ASTRATO_SHEET_ID VARCHAR(16777216),
ASTRATO_WORKBOOK_ID VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.COLLECTIONS (
ID VARCHAR(16777216), COLLECTIONNAME VARCHAR(16777216), WORKBOOKNAME VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.GROUPS_AND_MEMBERS (
GROUPID VARCHAR(16777216), GROUPNAME VARCHAR(16777216), DESCRIPTION VARCHAR(16777216), USERID VARCHAR(16777216), EMAIL VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.WORKBOOK_SHEETS (
ID VARCHAR(16777216), SHEETNAME VARCHAR(16777216), WORKBOOKNAME VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.WORKBOOKS (
ID VARCHAR(16777216), NAME VARCHAR(16777216), DESCRIPTION VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.USERS (
ID VARCHAR(16777216), EMAIL VARCHAR(16777216), NAME VARCHAR(16777216), FIRSTNAME VARCHAR(16777216), LASTNAME VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERIES_PER_TABLE (
QUERY_ID VARCHAR(16777216), DATABASE_NAME VARCHAR(16777216), SCHEMA_NAME VARCHAR(16777216), TABLE_NAME VARCHAR(16777216), TOTAL_MATCHES NUMBER(18,0), MATCH_POSITION NUMBER(38,0), ESTIMATED_CREDITS_USED_PER_TABLE FLOAT
);
--π π Ώππ Ύπ ²π ΄π ³πππ ΄ Create the stored procedure
CREATE OR REPLACE PROCEDURE STORE_ASTRATO_QUERY_HISTORY()
RETURNS STRING
LANGUAGE SQL
AS $$
BEGIN
-- Insert new query history into the target table
INSERT INTO ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY
(
SELECT
QUERY_ID,
QUERY_TAG,
QUERY_TEXT,
START_TIME AS EXECUTION_TIMESTAMP,
USER_NAME,
WAREHOUSE_NAME,
DATABASE_NAME,
SCHEMA_NAME,
START_TIME,
END_TIME,
ERROR_MESSAGE,
ROWS_PRODUCED,
ROWS_INSERTED,
percentage_scanned_from_cache,
EXECUTION_TIME,
COMPILATION_TIME,
EXTERNAL_FUNCTION_TOTAL_INVOCATIONS,
BYTES_SPILLED_TO_LOCAL_STORAGE,
BYTES_SPILLED_TO_REMOTE_STORAGE,
BYTES_SENT_OVER_THE_NETWORK,
credits_used_cloud_services,
warehouse_size,
warehouse_type,
role_name,
split_part(query_text, '--', -1) as astrato_query_meta_data,
TRY_PARSE_JSON(astrato_query_meta_data):"userId"::STRING AS astrato_user_id,
TRY_PARSE_JSON(astrato_query_meta_data):"objectId"::STRING AS astrato_object_id,
TRY_PARSE_JSON(astrato_query_meta_data):"sheetId"::STRING AS astrato_sheet_id,
TRY_PARSE_JSON(astrato_query_meta_data):"workbookId"::STRING AS astrato_workbook_id
FROM
SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE
QUERY_TAG = 'Astrato'
AND QUERY_ID NOT IN ( SELECT QUERY_ID FROM ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY )
and date(START_TIME) >= '2024-06-24'
);
INSERT INTO QUERIES_PER_TABLE (
QUERY_ID,
DATABASE_NAME,
schema_name,
table_name,
total_matches,
match_position,
estimated_credits_used_per_table
)
WITH ExtractedTables AS (
SELECT
QUERY_ID,
DATABASE_NAME,
credits_used_cloud_services,
query_text,
REGEXP_SUBSTR(query_text, '(FROM|JOIN)\\s+([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, 1, 'i') AS match_part,
1 AS match_position
FROM ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY
UNION ALL
SELECT
QUERY_ID,
DATABASE_NAME,
credits_used_cloud_services,
et.query_text,
REGEXP_SUBSTR(et.query_text, '(FROM|JOIN)\\s+([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, match_position + 1, 'i') AS match_part,
match_position + 1
FROM ExtractedTables et
WHERE REGEXP_SUBSTR(et.query_text, '(FROM|JOIN)\\s+([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, et.match_position + 1, 'i') IS NOT NULL
),
MatchedTables AS (
SELECT
QUERY_ID,
DATABASE_NAME,
credits_used_cloud_services,
TRIM(REGEXP_SUBSTR(match_part, '([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, 1, 'i', 1)) AS schema_name,
TRIM(REGEXP_SUBSTR(match_part, '([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, 1, 'i', 2)) AS table_name,
match_position,
query_text
FROM ExtractedTables
WHERE match_part IS NOT NULL
)
SELECT
QUERY_ID,
DATABASE_NAME,
schema_name,
table_name,
COUNT(*) OVER (PARTITION BY QUERY_ID) AS total_matches, -- Count total matches per QUERY_ID
match_position,
(credits_used_cloud_services / COUNT(*) OVER (PARTITION BY QUERY_ID)) AS estimated_credits_used_per_table
FROM MatchedTables;
RETURN 'Astrato Query history updated successfully';
END;
$$;
--CALL STORE_QUERY_HISTORY();--Uncomment for initial run (recommended)
--π ππ °ππ Ί Schedule the stored procedure using Snowflake tasks (optional)
CREATE OR REPLACE TASK QUERY_HISTORY_TASK
WAREHOUSE = MY_WH
SCHEDULE = 'USING CRON 0 0 * * 0 UTC' -- Runs weekly on Sunday at midnight
AS
CALL STORE_ASTRATO_QUERY_HISTORY();
-- π π Έπ ΄π π ³π °ππ °
select * from ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY fetch 15;