阿里云Flink实时计算DEMO

统计网站实时充值数据情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
-- 日志
CREATE TEMPORARY TABLE sls_table_temp (
__event__ STRING,
__event__app__ STRING,
__event__time__ BIGINT,
ts AS TO_TIMESTAMP (FROM_UNIXTIME (__event__time__)),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND
--为Rowtime定义Watermark。
)
with (
'connector' = 'sls',
'endpoint' = '',
'accessId' = '',
'accessKey' = '',
'project' = '',
'logstore' = '',
'startTime' = ''
);

-- 订单表
CREATE TEMPORARY VIEW order_table_temp AS
SELECT event_name,
JSON_VALUE (event_str, '$.et.ai.si') AS site_id,
JSON_VALUE (event_str, '$.et.ai.li') AS link_id,
JSON_VALUE (event_str, '$.et.kv.orderNum') AS order_num,
JSON_VALUE (event_str, '$.et.kv.amount') AS amount,
JSON_VALUE (event_str, '$.et.kv.chapterId') AS chapter_id,
JSON_VALUE (event_str, '$.et.kv.activityId') AS activity_id,
JSON_VALUE (event_str, '$.et.kv.pushId') AS push_id,
JSON_VALUE (event_str, '$.et.kv.page') AS page,
JSON_VALUE (event_str, '$.et.kv.type') AS order_type,
JSON_VALUE (event_str, '$.et.kv.bookId') AS book_id,
FROM_UNIXTIME (
CAST (
JSON_VALUE (event_str, '$.et.kv.orderTime') AS BIGINT
) / 1000
) order_time,
app
from (
select __event__ AS event_str,
JSON_VALUE (__event__, '$.app') AS app,
JSON_VALUE (__event__, '$.et.en') AS event_name
from
sls_table_temp
)
where
app = 'matrix_app'
AND event_name = 'order';

-- 充值表
CREATE TEMPORARY VIEW recharge_table_temp AS
SELECT event_name,
JSON_VALUE (event_str, '$.et.kv.isFirst') AS is_first,
JSON_VALUE (event_str, '$.et.kv.uid') AS user_id,
JSON_VALUE (event_str, '$.et.kv.orderNum') AS order_num,
JSON_VALUE (event_str, '$.et.kv.status') AS recharge_status,
FROM_UNIXTIME (
CAST (
JSON_VALUE (event_str, '$.et.kv.rechargeTime') AS BIGINT
) / 1000
) recharge_time,
app
from (
select __event__ AS event_str,
JSON_VALUE (__event__, '$.app') AS app,
JSON_VALUE (__event__, '$.et.en') AS event_name
from
sls_table_temp
)
where
app = 'matrix_app'
AND event_name = 'recharge';

-- 双表联合
CREATE TEMPORARY VIEW user_order_recharge_temp AS
select *
from
order_table_temp o left
join recharge_table_temp r
on o.order_num = r.order_num;

-- 统计
SELECT site_id,
SUM (
CASE
WHEN recharge_status = 1
AND order_type = 'normal' THEN amount ELSE 0
END
) normal_recharge_amount,
COUNT (
DISTINCT
CASE
WHEN recharge_status = 1
AND order_type = 'normal' THEN user_id
END
) normal_recharge_users,
SUM (
CASE
WHEN recharge_status = 1
AND order_type = 'normal' THEN 1 ELSE 0
END
) normal_orders,
SUM (
CASE
WHEN recharge_status IS NULL
AND order_type = 'normal' THEN 1 ELSE 0
END
) normal_orders_not_pay,
SUM (
CASE
WHEN recharge_status = 1
AND order_type = 'monthly' THEN amount ELSE 0
END
) vip_recharge_amount,
COUNT (
DISTINCT
CASE
WHEN recharge_status = 1
AND order_type = 'monthly' THEN user_id
END
) vip_recharge_users,
SUM (
CASE
WHEN recharge_status = 1
AND order_type = 'monthly' THEN 1 ELSE 0
END
) vip_orders,
SUM (
CASE
WHEN recharge_status IS NULL
AND order_type = 'monthly' THEN 1 ELSE 0
END
) vip_orders_not_pay
from
user_order_recharge_temp
group by site_id;
作者

Zachary Darius

发布于

2021-06-21

更新于

2021-06-21

许可协议

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×