-- 002_daily_summary.sql -- 状态时长定时快照表 + 聚合函数 + pg_cron 调度说明 -- ============================================================ -- 1. 快照存储表 -- ============================================================ CREATE TABLE IF NOT EXISTS state_summaries ( id BIGSERIAL PRIMARY KEY, user_id VARCHAR(128) NOT NULL, state VARCHAR(64) NOT NULL, duration_secs BIGINT NOT NULL DEFAULT 0, period_start BIGINT NOT NULL, period_end BIGINT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (user_id, state, period_start, period_end) ); CREATE INDEX IF NOT EXISTS idx_summaries_user_period ON state_summaries (user_id, period_start DESC); -- ============================================================ -- 2. 聚合函数:滑动窗口计算指定时间范围内各状态时长 -- - 调度器只需传入 period_start / period_end -- - 内部用 LEAD() 窗口函数求相邻检查点差值 -- - ON CONFLICT 保证幂等(重复执行不产生重复行) -- ============================================================ CREATE OR REPLACE FUNCTION aggregate_checkpoint_durations( p_start BIGINT, p_end BIGINT ) RETURNS BIGINT AS $$ DECLARE affected BIGINT; BEGIN INSERT INTO state_summaries (user_id, state, duration_secs, period_start, period_end) SELECT user_id, state, SUM(duration) AS duration_secs, p_start, p_end FROM ( SELECT user_id, state, LEAD(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp) - timestamp AS duration FROM checkpoints WHERE timestamp >= p_start AND timestamp <= p_end ) sub WHERE duration > 0 GROUP BY user_id, state ON CONFLICT (user_id, state, period_start, period_end) DO UPDATE SET duration_secs = EXCLUDED.duration_secs; GET DIAGNOSTICS affected = ROW_COUNT; RETURN affected; END; $$ LANGUAGE plpgsql; -- ============================================================ -- 3. pg_cron 调度(需要超级用户手动执行一次) -- ============================================================ -- -- -- 安装扩展(仅需一次) -- CREATE EXTENSION IF NOT EXISTS pg_cron; -- -- -- 每 5 分钟执行一次聚合 -- SELECT cron.schedule( -- 'aggregate-5min', -- '*/5 * * * *', -- $$ SELECT aggregate_checkpoint_durations( -- FLOOR(EXTRACT(EPOCH FROM now() - INTERVAL '5 minutes'))::BIGINT, -- FLOOR(EXTRACT(EPOCH FROM now()))::BIGINT -- ); $$ -- ); -- -- -- 每小时整点执行一次(日报表用) -- SELECT cron.schedule( -- 'aggregate-hourly', -- '0 * * * *', -- $$ SELECT aggregate_checkpoint_durations( -- FLOOR(EXTRACT(EPOCH FROM now() - INTERVAL '1 hour'))::BIGINT, -- FLOOR(EXTRACT(EPOCH FROM now()))::BIGINT -- ); $$ -- ); -- -- -- 查看 cron 任务状态 -- SELECT * FROM cron.job; -- -- -- 取消(如需) -- SELECT cron.unschedule('aggregate-5min');