This approach creates a query history layer on top of BigQuery using the built-in INFORMATION_SCHEMA.JOBS_BY_PROJECT and JOBS_BY_USER views. It captures executed SQL statements, execution metadata, and performance metrics, and makes them queryable as a structured dataset.
Unlike aggregated monitoring tools, this method provides row-level visibility into historical queries, which can be enriched with Astrato metadata embedded in SQL comments. This enables direct correlation between BigQuery workload, users, dashboards, and business activity.
How does it work
Astrato appends metadata to the end of each query, subsequently allowing this to be viewed in BigQuery. BigQuery already contains data about which database user has accessed each table, warehouse etc.
In this article we will cover how to store this data in a new database, using only one block of code.
The data that Astrato appends is shown below.
--
{
"objectId":"f4ed7820-9884-45ba-b078-86fc260d5511",
"sheetId":"3c2fa835-05ce-47f8-9489-3b99a76adc0b",
"workbookId":"d8ccb7b8-970e-477f-a093-6e862a431e21"
}
To supplement this, you can export the lookup associating workbooks, sheets and users, from the administration section https://app.astrato.io/administration/exports.
Currently, this is a manual upload for security purposes, since user data contains PII. The tables can be directly uploaded to BigQuery. These tables/exports from Astrato are required to determine the names of users, workbooks and groups.
-- 2) Create the history table (partitioned by creation_time)
CREATE TABLE IF NOT EXISTS `ASTRATO.query_history` (
-- job metadata
job_id STRING,
project_id STRING,
user_email STRING,
creation_time TIMESTAMP,
start_time TIMESTAMP,
end_time TIMESTAMP,
state STRING,
statement_type STRING,
priority STRING,
total_bytes_processed INT64,
total_bytes_billed INT64,
total_slot_ms INT64,
cache_hit BOOL,
error_reason STRING,
error_message STRING,
query_text STRING,
-- raw labels (keep for debugging / future keys)
labels ARRAY<STRUCT<key STRING, value STRING>>,
-- flattened Astrato labels
astrato_type STRING, -- labels: type
astrato_user_id STRING, -- labels: user_id
role_type STRING, -- labels: role_type
object_id STRING, -- labels: object_id
sheet_id STRING, -- labels: sheet_id
workbook_id STRING, -- labels: workbook_id
workbook_state STRING, -- labels: workbook_state
-- optional: capture field/dimension label keys like f1, d1, d2 as arrays of values
-- (keeps your screenshot data usable without hardcoding a max count)
field_keys ARRAY<STRING>, -- keys matching f\d+
field_values ARRAY<STRING>, -- corresponding values
dimension_keys ARRAY<STRING>, -- keys matching d\d+
dimension_values ARRAY<STRING> -- corresponding values
)
PARTITION BY DATE(creation_time)
CLUSTER BY user_email, workbook_id, sheet_id, object_id;
-- 3) Incremental upsert from INFORMATION_SCHEMA (past 8 days by default)
MERGE `ASTRATO.query_history` T
USING (
WITH src AS (
SELECT
job_id,
project_id,
user_email,
creation_time,
start_time,
end_time,
state,
statement_type,
priority,
total_bytes_processed,
total_bytes_billed,
total_slot_ms,
cache_hit,
error_result.reason AS error_reason,
error_result.message AS error_message,
query AS query_text,
labels
FROM `region-eu`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE job_type = 'QUERY'
AND state = 'DONE'
AND statement_type != 'SCRIPT'
AND creation_time BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 8 DAY) AND CURRENT_TIMESTAMP()
-- Only keep jobs that have Astrato labels (choose the key that’s guaranteed for you)
AND EXISTS (
SELECT 1
FROM UNNEST(labels) l
WHERE l.key IN ('workbook_id','sheet_id','object_id','user_id','type')
)
),
flattened AS (
SELECT
s.*,
-- Core Astrato metadata
MAX(IF(l.key = 'type', l.value, NULL)) AS astrato_type,
MAX(IF(l.key = 'user_id', l.value, NULL)) AS astrato_user_id,
MAX(IF(l.key = 'role_type', l.value, NULL)) AS role_type,
MAX(IF(l.key = 'object_id', l.value, NULL)) AS object_id,
MAX(IF(l.key = 'sheet_id', l.value, NULL)) AS sheet_id,
MAX(IF(l.key = 'workbook_id', l.value, NULL)) AS workbook_id,
MAX(IF(l.key = 'workbook_state', l.value, NULL)) AS workbook_state,
-- Dynamic “f#” and “d#” keys (e.g. f1, d1, d2)
ARRAY_AGG(IF(REGEXP_CONTAINS(l.key, r'^f\d+$'), l.key, NULL) IGNORE NULLS ORDER BY l.key) AS field_keys,
ARRAY_AGG(IF(REGEXP_CONTAINS(l.key, r'^f\d+$'), l.value, NULL) IGNORE NULLS ORDER BY l.key) AS field_values,
ARRAY_AGG(IF(REGEXP_CONTAINS(l.key, r'^d\d+$'), l.key, NULL) IGNORE NULLS ORDER BY l.key) AS dimension_keys,
ARRAY_AGG(IF(REGEXP_CONTAINS(l.key, r'^d\d+$'), l.value, NULL) IGNORE NULLS ORDER BY l.key) AS dimension_values
FROM src s
LEFT JOIN UNNEST(s.labels) l
GROUP BY
job_id, project_id, user_email, creation_time, start_time, end_time, state, statement_type, priority,
total_bytes_processed, total_bytes_billed, total_slot_ms, cache_hit, error_reason, error_message, query_text, labels
)
SELECT * FROM flattened
) S
ON T.job_id = S.job_id
WHEN MATCHED THEN UPDATE SET
project_id = S.project_id,
user_email = S.user_email,
creation_time = S.creation_time,
start_time = S.start_time,
end_time = S.end_time,
state = S.state,
statement_type = S.statement_type,
priority = S.priority,
total_bytes_processed = S.total_bytes_processed,
total_bytes_billed = S.total_bytes_billed,
total_slot_ms = S.total_slot_ms,
cache_hit = S.cache_hit,
error_reason = S.error_reason,
error_message = S.error_message,
query_text = S.query_text,
labels = S.labels,
astrato_type = S.astrato_type,
astrato_user_id = S.astrato_user_id,
role_type = S.role_type,
object_id = S.object_id,
sheet_id = S.sheet_id,
workbook_id = S.workbook_id,
workbook_state = S.workbook_state,
field_keys = S.field_keys,
field_values = S.field_values,
dimension_keys = S.dimension_keys,
dimension_values = S.dimension_values
WHEN NOT MATCHED THEN
INSERT (
job_id, project_id, user_email, creation_time, start_time, end_time, state,
statement_type, priority, total_bytes_processed, total_bytes_billed, total_slot_ms,
cache_hit, error_reason, error_message, query_text,
labels,
astrato_type, astrato_user_id, role_type, object_id, sheet_id, workbook_id, workbook_state,
field_keys, field_values, dimension_keys, dimension_values
)
VALUES (
S.job_id, S.project_id, S.user_email, S.creation_time, S.start_time, S.end_time, S.state,
S.statement_type, S.priority, S.total_bytes_processed, S.total_bytes_billed, S.total_slot_ms,
S.cache_hit, S.error_reason, S.error_message, S.query_text,
S.labels,
S.astrato_type, S.astrato_user_id, S.role_type, S.object_id, S.sheet_id, S.workbook_id, S.workbook_state,
S.field_keys, S.field_values, S.dimension_keys, S.dimension_values
);
Example “business questions” queries
-- a) Most expensive workbooks by bytes processed (last 7 days)
SELECT
workbook_id,
SUM(total_bytes_processed) AS bytes_processed,
COUNT(*) AS query_count
FROM `ASTRATO.query_history`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
AND workbook_id IS NOT NULL
GROUP BY workbook_id
ORDER BY bytes_processed DESC;
-- b) Slowest sheets by average elapsed time (end-start) (last 7 days)
SELECT
sheet_id,
AVG(TIMESTAMP_DIFF(end_time, start_time, MILLISECOND)) AS avg_ms,
COUNT(*) AS query_count
FROM `ASTRATO.query_history`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
AND sheet_id IS NOT NULL
AND start_time IS NOT NULL
AND end_time IS NOT NULL
GROUP BY sheet_id
ORDER BY avg_ms DESC;
-- c) Queries by Astrato user (last 30 days)
SELECT
astrato_user_id,
COUNT(*) AS queries,
SUM(total_bytes_processed) AS bytes_processed
FROM `ASTRATO.query_history`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
AND astrato_user_id IS NOT NULL
GROUP BY astrato_user_id
ORDER BY bytes_processed DESC;

