How does it work
Astrato appends metadata to the end of each query, subsequently allowing this to be viewed in Snowflake. Snowflake already contains data about which user has accessed each table, warehouse etc.
In this article we will cover how to store this data in a new database, using only one block of code.
The data that Astrato appends is shown below.
--
{
"objectId":"f4ed7820-9884-45ba-b078-86fc260d5511",
"sheetId":"3c2fa835-05ce-47f8-9489-3b99a76adc0b",
"workbookId":"d8ccb7b8-970e-477f-a093-6e862a431e21"
}
To supplement this, you can export the lookup associating workbooks, sheets and users, from the administration section https://app.astrato.io/administration/exports. In the step below, code can be run to create these table, enabling an easy upload of your data. Currently, this is a manual upload for security purposes, since user data contains PII.
Storing the logs
To setup a storage area, you need to create a database called ASTRATO_QUERY_LOGS and populate it weekly. The code to do this is below. Altering the structure means that Astrato will not be able to provide a compatible analytics workbook. We recommend reviewing the scheduled task to ensure this is run at an appropriate time.
βοΈ Edit the warehouse name in the code below since MY_WH
may not exist
-- π ²πππ ΄π °ππ ΄ πππππ ²ππππ ΄ Create the target database, schema & table if it doesn't exist
CREATE DATABASE IF NOT EXISTS ASTRATO_QUERY_LOGS;
CREATE SCHEMA IF NOT EXISTS ASTRATO_LOGS;
create TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY (
QUERY_ID VARCHAR(16777216),
QUERY_TAG VARCHAR(16777216),
QUERY_TEXT VARCHAR(16777216),
EXECUTION_TIMESTAMP TIMESTAMP_LTZ(6),
USER_NAME VARCHAR(16777216),
WAREHOUSE_NAME VARCHAR(16777216),
DATABASE_NAME VARCHAR(16777216),
SCHEMA_NAME VARCHAR(16777216),
START_TIME TIMESTAMP_LTZ(6),
END_TIME TIMESTAMP_LTZ(6),
ERROR_MESSAGE VARCHAR(16777216),
ROWS_PRODUCED NUMBER(38,0),
ROWS_INSERTED NUMBER(38,0),
PERCENTAGE_SCANNED_FROM_CACHE FLOAT,
EXECUTION_TIME NUMBER(38,0),
COMPILATION_TIME NUMBER(38,0),
EXTERNAL_FUNCTION_TOTAL_INVOCATIONS NUMBER(38,0),
BYTES_SPILLED_TO_LOCAL_STORAGE NUMBER(38,0),
BYTES_SPILLED_TO_REMOTE_STORAGE NUMBER(38,0),
BYTES_SENT_OVER_THE_NETWORK NUMBER(38,0),
CREDITS_USED_CLOUD_SERVICES FLOAT,
WAREHOUSE_SIZE VARCHAR(16777216),
WAREHOUSE_TYPE VARCHAR(16777216),
ROLE_NAME VARCHAR(16777216),
ASTRATO_QUERY_META_DATA VARCHAR(16777216),
ASTRATO_USER_ID VARCHAR(16777216),
ASTRATO_OBJECT_ID VARCHAR(16777216),
ASTRATO_SHEET_ID VARCHAR(16777216),
ASTRATO_WORKBOOK_ID VARCHAR(16777216),
BYTES_SCANNED NUMBER(38,0),
QUEUED_PROVISIONING_TIME NUMBER(38,0),
QUEUED_OVERLOAD_TIME NUMBER(38,0)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.COLLECTIONS (
ID VARCHAR(16777216), COLLECTIONNAME VARCHAR(16777216), WORKBOOKNAME VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.GROUPS_AND_MEMBERS (
GROUPID VARCHAR(16777216), GROUPNAME VARCHAR(16777216), DESCRIPTION VARCHAR(16777216), USERID VARCHAR(16777216), EMAIL VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.WORKBOOK_SHEETS (
ID VARCHAR(16777216), SHEETNAME VARCHAR(16777216), WORKBOOKNAME VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.WORKBOOKS (
ID VARCHAR(16777216), NAME VARCHAR(16777216), DESCRIPTION VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.USERS (
ID VARCHAR(16777216), EMAIL VARCHAR(16777216), NAME VARCHAR(16777216), FIRSTNAME VARCHAR(16777216), LASTNAME VARCHAR(16777216)
);
create or replace TABLE ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERIES_PER_TABLE (
QUERY_ID VARCHAR(16777216), DATABASE_NAME VARCHAR(16777216), SCHEMA_NAME VARCHAR(16777216), TABLE_NAME VARCHAR(16777216), TOTAL_MATCHES NUMBER(18,0), MATCH_POSITION NUMBER(38,0), ESTIMATED_CREDITS_USED_PER_TABLE FLOAT
);
--π π Ώππ Ύπ ²π ΄π ³πππ ΄ Create the stored procedure
CREATE OR REPLACE PROCEDURE STORE_ASTRATO_QUERY_HISTORY()
RETURNS STRING
LANGUAGE SQL
AS $$
BEGIN
-- Insert new query history into the target table
INSERT INTO ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY
(
SELECT
QUERY_ID,
QUERY_TAG,
QUERY_TEXT,
START_TIME AS EXECUTION_TIMESTAMP,
USER_NAME,
WAREHOUSE_NAME,
DATABASE_NAME,
SCHEMA_NAME,
START_TIME,
END_TIME,
ERROR_MESSAGE,
ROWS_PRODUCED,
ROWS_INSERTED,
percentage_scanned_from_cache,
EXECUTION_TIME,
COMPILATION_TIME,
EXTERNAL_FUNCTION_TOTAL_INVOCATIONS,
BYTES_SPILLED_TO_LOCAL_STORAGE,
BYTES_SPILLED_TO_REMOTE_STORAGE,
BYTES_SENT_OVER_THE_NETWORK,
credits_used_cloud_services,
warehouse_size,
warehouse_type,
role_name,
split_part(query_text, '--', -1) as astrato_query_meta_data,
TRY_PARSE_JSON(astrato_query_meta_data):"userId"::STRING AS astrato_user_id,
TRY_PARSE_JSON(astrato_query_meta_data):"objectId"::STRING AS astrato_object_id,
TRY_PARSE_JSON(astrato_query_meta_data):"sheetId"::STRING AS astrato_sheet_id,
TRY_PARSE_JSON(astrato_query_meta_data):"workbookId"::STRING AS astrato_workbook_id,
BYTES_SCANNED,
QUEUED_PROVISIONING_TIME,
QUEUED_OVERLOAD_TIME
FROM
SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE
--QUERY_TAG ='Astrato' AND --β οΈUNCOMMENT to store ONLY Astrato activity
QUERY_ID NOT IN ( SELECT QUERY_ID FROM ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY )
and date(START_TIME) >= '2024-06-24'
);
INSERT INTO ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERIES_PER_TABLE(
QUERY_ID,
DATABASE_NAME,
schema_name,
table_name,
total_matches,
match_position,
estimated_credits_used_per_table
)
WITH ExtractedTables AS (
SELECT
QUERY_ID,
DATABASE_NAME,
credits_used_cloud_services,
query_text,
REGEXP_SUBSTR(query_text, '(FROM|JOIN)\\s+([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, 1, 'i') AS match_part,
1 AS match_position
FROM ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY
UNION ALL
SELECT
QUERY_ID,
DATABASE_NAME,
credits_used_cloud_services,
et.query_text,
REGEXP_SUBSTR(et.query_text, '(FROM|JOIN)\\s+([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, match_position + 1, 'i') AS match_part,
match_position + 1
FROM ExtractedTables et
WHERE REGEXP_SUBSTR(et.query_text, '(FROM|JOIN)\\s+([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, et.match_position + 1, 'i') IS NOT NULL
),
MatchedTables AS (
SELECT
QUERY_ID,
DATABASE_NAME,
credits_used_cloud_services,
TRIM(REGEXP_SUBSTR(match_part, '([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, 1, 'i', 1)) AS schema_name,
TRIM(REGEXP_SUBSTR(match_part, '([A-Za-z0-9_]+)\\.([A-Za-z0-9_]+)', 1, 1, 'i', 2)) AS table_name,
match_position,
query_text
FROM ExtractedTables
WHERE match_part IS NOT NULL
)
SELECT
QUERY_ID,
DATABASE_NAME,
schema_name,
table_name,
COUNT(*) OVER (PARTITION BY QUERY_ID) AS total_matches, -- Count total matches per QUERY_ID
match_position,
(credits_used_cloud_services / COUNT(*) OVER (PARTITION BY QUERY_ID)) AS estimated_credits_used_per_table
FROM MatchedTables;
------ππ Ύππ Ίπ ±π Ύπ Ύπ Ί ππ ΄πππ Έπ Ύπ ½π
create or replace table workbook_sessions as
(WITH RankedQueries AS (
SELECT
ASTRATO_USER_ID,
ASTRATO_WORKBOOK_ID,
START_TIME,
END_TIME,
ROW_NUMBER() OVER (
PARTITION BY ASTRATO_USER_ID, ASTRATO_WORKBOOK_ID
ORDER BY START_TIME
) AS QUERY_RANK
FROM ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY
),
SessionizedData AS (
SELECT
R1.ASTRATO_USER_ID,
R1.ASTRATO_WORKBOOK_ID,
R1.START_TIME,
R1.END_TIME,
CASE
WHEN TIMESTAMPDIFF('minute', LAG(R1.END_TIME)
OVER (PARTITION BY R1.ASTRATO_USER_ID, R1.ASTRATO_WORKBOOK_ID ORDER BY R1.START_TIME), R1.START_TIME) > 5 THEN 1
ELSE 0
END AS NEW_SESSION_FLAG
FROM RankedQueries R1
),
SessionGroups AS (
SELECT
ASTRATO_USER_ID,
ASTRATO_WORKBOOK_ID,
START_TIME,
END_TIME,
SUM(NEW_SESSION_FLAG) OVER (
PARTITION BY ASTRATO_USER_ID, ASTRATO_WORKBOOK_ID
ORDER BY START_TIME
) AS SESSION_GROUP
FROM SessionizedData
),
SessionMetrics AS (
SELECT
ASTRATO_USER_ID,
ASTRATO_WORKBOOK_ID,
SESSION_GROUP,
MIN(START_TIME) AS WORKBOOK_SESSION_START_TIME,
MAX(END_TIME) AS WORKBOOK_SESSION_END_TIME
FROM SessionGroups
GROUP BY ASTRATO_USER_ID, ASTRATO_WORKBOOK_ID, SESSION_GROUP
),
LaggedSessionMetrics AS (
SELECT
ASTRATO_USER_ID,
ASTRATO_WORKBOOK_ID,
WORKBOOK_SESSION_START_TIME,
WORKBOOK_SESSION_END_TIME,
LAG(WORKBOOK_SESSION_END_TIME) OVER (
PARTITION BY ASTRATO_USER_ID
ORDER BY WORKBOOK_SESSION_START_TIME ASC
) AS PREVIOUS_WORKBOOK_SESSION_END_TIME
FROM SessionMetrics
),
OverallSessionGroups AS (
SELECT
ASTRATO_USER_ID,
WORKBOOK_SESSION_START_TIME,
WORKBOOK_SESSION_END_TIME,
CASE
WHEN TIMESTAMPDIFF('minute', PREVIOUS_WORKBOOK_SESSION_END_TIME, WORKBOOK_SESSION_START_TIME) > 5 THEN 1
ELSE 0
END AS NEW_OVERALL_SESSION_FLAG
FROM LaggedSessionMetrics
),
OverallSessionIDs AS (
SELECT
ASTRATO_USER_ID,
WORKBOOK_SESSION_START_TIME,
WORKBOOK_SESSION_END_TIME,
SUM(NEW_OVERALL_SESSION_FLAG) OVER (
PARTITION BY ASTRATO_USER_ID -- Scoped by user
ORDER BY WORKBOOK_SESSION_START_TIME
) AS OVERALL_SESSION_ID
FROM OverallSessionGroups
),
ReadableSessions AS (
SELECT
sm.ASTRATO_USER_ID,
sm.ASTRATO_WORKBOOK_ID,
md5(CONCAT(
sm.ASTRATO_USER_ID, '_',
sm.ASTRATO_WORKBOOK_ID, '_',
TO_CHAR(sm.WORKBOOK_SESSION_START_TIME, 'YYYYMMDDHH24MISS')
)) AS WORKBOOK_SESSION_ID,
md5(CONCAT(
sm.ASTRATO_USER_ID, '_',
os.OVERALL_SESSION_ID
)) AS OVERALL_SESSION_ID, -- Scoped and unique
sm.WORKBOOK_SESSION_START_TIME,
sm.WORKBOOK_SESSION_END_TIME,
TIMESTAMPDIFF('second', sm.WORKBOOK_SESSION_START_TIME, sm.WORKBOOK_SESSION_END_TIME) AS WORKBOOK_SESSION_LENGTH_SECONDS
FROM SessionMetrics sm
JOIN OverallSessionIDs os
ON sm.ASTRATO_USER_ID = os.ASTRATO_USER_ID
AND sm.WORKBOOK_SESSION_START_TIME = os.WORKBOOK_SESSION_START_TIME
AND sm.WORKBOOK_SESSION_END_TIME = os.WORKBOOK_SESSION_END_TIME
)
SELECT *
FROM ReadableSessions
WHERE ASTRATO_USER_ID IS NOT NULL AND WORKBOOK_SESSION_LENGTH_SECONDS > 5
)
;
------π Ύπ π ΄ππ °π »π » ππ ΄πππ Έπ Ύπ ½π
create or replace table SESSIONS as
WITH OverallSessions AS (
SELECT
ASTRATO_USER_ID,
OVERALL_SESSION_ID AS SESSION_ID,
MIN(WORKBOOK_SESSION_START_TIME) AS OVERALL_SESSION_START_TIME,
MAX(WORKBOOK_SESSION_END_TIME) AS OVERALL_SESSION_END_TIME,
TIMESTAMPDIFF('second',
MIN(WORKBOOK_SESSION_START_TIME),
MAX(WORKBOOK_SESSION_END_TIME)
) AS OVERALL_SESSION_LENGTH_SECONDS,
COUNT(DISTINCT ASTRATO_WORKBOOK_ID) AS WORKBOOKS_IN_SESSION
FROM ASTRATO_QUERY_LOGS.ASTRATO_LOGS.WORKBOOK_SESSIONS
GROUP BY ASTRATO_USER_ID, OVERALL_SESSION_ID
)
SELECT *
FROM OverallSessions
--ORDER BY ASTRATO_USER_ID, SESSION_ID
;
RETURN 'Astrato Query history updated successfully';
END;
$$;
--CALL STORE_ASTRATO_QUERY_HISTORY();--Uncomment for initial run (recommended)
--π ππ °ππ Ί Schedule the stored procedure using Snowflake tasks (optional)
CREATE OR REPLACE TASK QUERY_HISTORY_TASK
WAREHOUSE = MY_WH
SCHEDULE = 'USING CRON 0 0 * * 0 UTC' -- Runs weekly on Sunday at midnight
AS
CALL STORE_ASTRATO_QUERY_HISTORY();
-- π π Έπ ΄π π ³π °ππ °
select * from ASTRATO_QUERY_LOGS.ASTRATO_LOGS.QUERY_HISTORY fetch 15;
Installing the workbook
Once you have completed the above, create a data connection in Astrato which connects to the database ASTRATO_QUERY_LOGS
. Now you're ready to add/install the workbook.
β