scan 运算符
基于谓词扫描数据、进行匹配和生成序列。
根据运算符的步骤中定义的谓词确定匹配记录。 谓词取决于前面的步骤生成的状态。 匹配记录的输出取决于运算符的步骤中定义的输入记录和分配。
语法
T | scan
[ with_match_id
=
MatchIdColumnName ] [ declare
(
ColumnDeclarations )
] with
(
StepDefinitions )
ColumnDeclarations 语法
ColumnName :
ColumnType[=
DefaultValue ] [,
... ]
StepDefinition 语法
step
StepName [ output
= all
| last
| none
] :
Condition [ =>
Column =
Assignment [,
... ] ] ;
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
T | string |
✔️ | 输入表格源。 |
MatchIdColumnName | string |
某个类型为 long 的列的名称,在扫描执行过程中会将其追加到输出中。 指示记录匹配的从 0 开始的索引。 |
|
ColumnDeclarations | string |
声明 T 架构的扩展。这些列是在步骤中分配的值。 如果未分配,则返回 DefaultValue。 除非另有指定,否则 DefaultValue 为 null 。 |
|
StepName | string |
✔️ | 用于在扫描状态下引用条件和分配的值。 步骤名称必须是唯一的。 |
条件 | string |
✔️ | 一个计算结果为 true 或 false 的表达式,定义输入中的哪些记录与步骤匹配。 如果对于步骤的状态或前一个步骤的状态,条件为 true ,说明记录与步骤相匹配。 |
转让 | string |
当某个记录与步骤相匹配时分配到相应列的标量表达式。 | |
output |
string |
控制步骤在重复匹配时的输出逻辑。 all 输出与步骤匹配的所有记录,last 仅输出步骤的一系列重复匹配项中的最后一条记录,而 none 不会输出任何与步骤匹配的记录。 默认为 all 。 |
返回
输入的某个记录与某个步骤的每个匹配项的记录。 输出的架构是在源的架构基础上使用子句中的 declare
列进行了扩展后得到的架构。
扫描逻辑
scan
会按每条记录检查序列化输入数据,将每条记录与每个步骤的条件进行比较,同时考虑每个步骤的当前状态。
状态
可以将 scan
运算符的基础状态视为一个表,其中每个 step
都有一行。 每个步骤都使用列的最新值以及前面所有步骤和当前步骤中声明的变量来维护自己的状态。 如果相关,它还会保留正在进行的序列的匹配 ID。
如果 scan 运算符具有分别名为 s_1,s_2 ... s_n 的 n 个步骤,那么步骤 s_k 在其状态中具有 k 条记录,分别对应于 s_1,s_2 ... s_k。 StepName.ColumnName 格式用于引用状态中的值。 例如,s_2.col1
会引用属于步骤 s_2 的列 col1
,其状态为 s_k。 有关详细示例,请参阅扫描逻辑演练。
状态开始时为空,在扫描的输入记录与步骤匹配时更新。 当前步骤的状态为非空时,步骤称为具有活动序列。
匹配逻辑
每个输入记录针对所有步骤按照从最后一个步骤到第一个步骤的反向顺序进行计算。 当记录 r 针对某个步骤 s_k 进行计算时,会应用以下逻辑:
检查 1:如果上一步 (s_k-1) 的状态为非空,并且 r 满足 s_k 的条件,则会发生匹配。 匹配会导致以下操作:
- 清除 s_k 的状态。
- 将 s_k-1 的状态提升为 s_k 状态。
- 计算 s_k 的分配,并扩展 r。
- 会将扩展的 r 添加到输出中以及 s_k 的状态中。
注意
如果检查 1 导致匹配,则忽略检查 2,并且 r 会继续针对 s_k-1 进行计算。
检查 2:如果 s_k 的状态具有活动序列或 s_k 是第一步,并且 r 满足 s_k 的条件,则会发生匹配。 匹配会导致以下操作:
- 计算 s_k 的分配,并扩展 r。
- 表示处于 s_k 状态的 s_k 的值会被替换为扩展的 r 的值。
- 如果 s_k 定义为
output=all
,则扩展的 r 会添加到输出中。 - 如果 s_k 是第一步,则会开始一个新序列,匹配 ID 会增加
1
。 这只在使用with_match_id
时会影响输出。
完成 s_k 的检查后,r 会继续针对 s_k-1 进行计算。
有关此逻辑的详细示例,请参阅扫描逻辑演练。
示例
累计和
计算某个输入列的累计和。 此示例的结果与使用 row_cumsum() 的结果一样。
range x from 1 to 5 step 1
| scan declare (cumulative_x:long=0) with
(
step s1: true => cumulative_x = x + s1.cumulative_x;
)
输出
x | cumulative_x |
---|---|
1 | 1 |
2 | 3 |
3 | 6 |
4 | 10 |
5 | 15 |
具有重置条件的多个列的累积和
计算两个输入列的累积和,每当累积和达到 10 或更多时,将总和值重置为当前记录值。
range x from 1 to 5 step 1
| extend y = 2 * x
| scan declare (cumulative_x:long=0, cumulative_y:long=0) with
(
step s1: true => cumulative_x = iff(s1.cumulative_x >= 10, x, x + s1.cumulative_x),
cumulative_y = iff(s1.cumulative_y >= 10, y, y + s1.cumulative_y);
)
输出
x | y | cumulative_x | cumulative_y |
---|---|---|---|
1 | 2 | 1 | 2 |
2 | 4 | 3 | 6 |
3 | 6 | 6 | 12 |
4 | 8 | 10 | 8 |
5 | 10 | 5 | 18 |
向前填充一列
向前填充一个字符串列。 为每个空值分配上次出现的非空值。
let Events = datatable (Ts: timespan, Event: string) [
0m, "A",
1m, "",
2m, "B",
3m, "",
4m, "",
6m, "C",
8m, "",
11m, "D",
12m, ""
]
;
Events
| sort by Ts asc
| scan declare (Event_filled: string="") with
(
step s1: true => Event_filled = iff(isempty(Event), s1.Event_filled, Event);
)
输出
Ts | 事件 | Event_filled |
---|---|---|
00:00:00 | A | A |
00:01:00 | A | |
00:02:00 | B | B |
00:03:00 | B | |
00:04:00 | B | |
00:06:00 | C | C |
00:08:00 | C | |
00:11:00 | D | D |
00:12:00 | D |
会话标记
将输入划分为多个会话:会话在会话的第一个事件后 30 分钟时结束,在此之后将启动新会话。 请注意 with_match_id
标志的使用,它会为扫描的每个不同匹配(会话)分配一个唯一值。 另请注意,在本示例中特别使用了两个步骤,inSession
有 true
作为条件,因此它捕获并输出来自输入的所有记录,而 endSession
捕获距离当前匹配的 sessionStart
值 30 分钟以上的记录。 endSession
步骤具有 output=none
,这意味着它不会生成输出记录。 endSession
步骤用于将当前匹配项的状态从 inSession
提升为 endSession
,从而允许从当前记录起开始新的匹配(会话)。
let Events = datatable (Ts: timespan, Event: string) [
0m, "A",
1m, "A",
2m, "B",
3m, "D",
32m, "B",
36m, "C",
38m, "D",
41m, "E",
75m, "A"
]
;
Events
| sort by Ts asc
| scan with_match_id=session_id declare (sessionStart: timespan) with
(
step inSession: true => sessionStart = iff(isnull(inSession.sessionStart), Ts, inSession.sessionStart);
step endSession output=none: Ts - inSession.sessionStart > 30m;
)
输出
Ts | 事件 | sessionStart | session_id |
---|---|---|---|
00:00:00 | A | 00:00:00 | 0 |
00:01:00 | A | 00:00:00 | 0 |
00:02:00 | B | 00:00:00 | 0 |
00:03:00 | D | 00:00:00 | 0 |
00:32:00 | B | 00:32:00 | 1 |
00:36:00 | C | 00:32:00 | 1 |
00:38:00 | D | 00:32:00 | 1 |
00:41:00 | E | 00:32:00 | 1 |
01:15:00 | A | 01:15:00 | 2 |
启动与暂停之间的事件
查找 5 分钟内发生的、事件 Start
与事件 Stop
之间的所有事件序列。 为每个序列分配匹配 ID。
let Events = datatable (Ts: timespan, Event: string) [
0m, "A",
1m, "Start",
2m, "B",
3m, "D",
4m, "Stop",
6m, "C",
8m, "Start",
11m, "E",
12m, "Stop"
]
;
Events
| sort by Ts asc
| scan with_match_id=m_id with
(
step s1: Event == "Start";
step s2: Event != "Start" and Event != "Stop" and Ts - s1.Ts <= 5m;
step s3: Event == "Stop" and Ts - s1.Ts <= 5m;
)
输出
Ts | 事件 | m_id |
---|---|---|
00:01:00 | 开始 | 0 |
00:02:00 | B | 0 |
00:03:00 | D | 0 |
00:04:00 | Stop | 0 |
00:08:00 | 开始 | 1 |
00:11:00 | E | 1 |
00:12:00 | Stop | 1 |
计算事件的自定义漏斗图
根据 State
计算序列 Hail
->Tornado
->Thunderstorm Wind
的漏斗图完成情况,并对事件之间的时间间隔设置自定义阈值(1h
内的 Tornado
和 2h
内的 Thunderstorm Wind
)。 此示例类似于 funnel_sequence_completion plugin,但灵活性更高。
StormEvents
| partition hint.strategy=native by State
(
sort by StartTime asc
| scan with
(
step hail: EventType == "Hail";
step tornado: EventType == "Tornado" and StartTime - hail.StartTime <= 1h;
step thunderstormWind: EventType == "Thunderstorm Wind" and StartTime - tornado.StartTime <= 2h;
)
)
| summarize dcount(State) by EventType
输出
EventType | dcount_State |
---|---|
冰雹 | 50 |
龙卷风 | 34 |
雷雨大风 | 32 |
扫描逻辑演练
本部分使用启动和停止之间的事件示例的分步演练演示了扫描逻辑:
let Events = datatable (Ts: timespan, Event: string) [
0m, "A",
1m, "Start",
2m, "B",
3m, "D",
4m, "Stop",
6m, "C",
8m, "Start",
11m, "E",
12m, "Stop"
]
;
Events
| sort by Ts asc
| scan with_match_id=m_id with
(
step s1: Event == "Start";
step s2: Event != "Start" and Event != "Stop" and Ts - s1.Ts <= 5m;
step s3: Event == "Stop" and Ts - s1.Ts <= 5m;
)
状态
将 scan
运算符的状态视为一个表,其中每个步骤都有一行,每个步骤都有自己的状态。 此状态包含所有前面步骤和当前步骤中列和声明的变量的最新值。 有关详细信息,请参阅状态。
对于此示例,可以使用下表表示状态:
步骤 | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | X | X | |||||
s3 |
“X”表示特定字段与该步骤无关。
匹配逻辑
本部分对 Events
表中的每条记录遵循匹配逻辑,说明每个步骤中状态和输出的转换。
注意
输入记录针对步骤按照从最后一步 (s3
) 到第一步 (s1
) 的反向顺序进行计算。
记录 1
Ts | 事件 |
---|---|
0m | "A" |
每个步骤的记录计算:
s3
:检查 1 未通过,因为s2
的状态为空,检查 2 未通过,因为s3
缺少活动序列。s2
:检查 1 未通过,因为s1
的状态为空,检查 2 未通过,因为s2
缺少活动序列。s1
:检查 1 不相关,因为没有上一步。 检查 2 未通过,因为记录不符合Event == "Start"
的条件。 记录 1 将被丢弃,而不会影响状态或输出。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | X | X | |||||
s3 |
记录 2
Ts | 事件 |
---|---|
1m | "Start" |
每个步骤的记录计算:
s3
:检查 1 未通过,因为s2
的状态为空,检查 2 未通过,因为s3
缺少活动序列。s2
:检查 1 未通过,因为s1
的状态为空,检查 2 未通过,因为s2
缺少活动序列。s1
:检查 1 不相关,因为没有上一步。 检查 2 已通过,因为记录满足Event == "Start"
的条件。 此匹配会启动一个新序列,并分配m_id
。 记录 2 及其m_id
(0
) 会添加到状态和输出中。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | 0 | 00:01:00 | "Start" | X | X | X | X |
s2 | X | X | |||||
s3 |
记录 3
Ts | 事件 |
---|---|
2m | "B" |
每个步骤的记录计算:
s3
:检查 1 未通过,因为s2
的状态为空,检查 2 未通过,因为s3
缺少活动序列。s2
:检查 1 已通过,因为s1
的状态为非空,并且记录满足Ts - s1.Ts < 5m
的条件。 此匹配会导致s1
的状态被清除,并将s1
中的序列提升为s2
。 记录 3 及其m_id
(0
) 会添加到状态和输出中。s1
:检查 1 不相关,因为没有上一步,检查 2 未通过,因为记录不符合Event == "Start"
的条件。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | 0 | 00:01:00 | "Start" | 00:02:00 | "B" | X | X |
s3 |
记录 4
Ts | 事件 |
---|---|
3m | “D” |
每个步骤的记录计算:
s3
:检查 1 未通过,因为记录不符合Event == "Stop"
的条件,检查 2 未通过,因为s3
缺少活动序列。s2
:检查 1 未通过,因为s1
的状态为空。 它通过了检查 2,因为它满足Ts - s1.Ts < 5m
的条件。 记录 4 及其m_id
(0
) 会添加到状态和输出中。 此记录中的值会覆盖s2.Ts
和s2.Event
的先前状态值。s1
:检查 1 不相关,因为没有上一步,检查 2 未通过,因为记录不符合Event == "Start"
的条件。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | 0 | 00:01:00 | "Start" | 00:03:00 | “D” | X | X |
s3 |
记录 5
Ts | 事件 |
---|---|
4m | "Stop" |
每个步骤的记录计算:
s3
:检查 1 已通过,因为s2
为非空,并且满足Event == "Stop"
的s3
条件。 此匹配会导致s2
的状态被清除,并将s2
中的序列提升为s3
。 记录 5 及其m_id
(0
) 会添加到状态和输出中。s2
:检查 1 未通过,因为s1
的状态为空,检查 2 未通过,因为s2
缺少活动序列。s1
:检查 1 不相关,因为没有上一步。 检查 2 未通过,因为记录不符合Event == "Start"
的条件。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | X | X | |||||
s3 | 0 | 00:01:00 | "Start" | 00:03:00 | “D” | 00:04:00 | "Stop" |
记录 6
Ts | 事件 |
---|---|
6m | "C" |
每个步骤的记录计算:
s3
:检查 1 未通过,因为s2
的状态为空,检查 2 未通过,因为s3
不满足Event == "Stop"
的s3
条件。s2
:检查 1 未通过,因为s1
的状态为空,检查 2 未通过,因为s2
缺少活动序列。s1
:检查 1 未通过,因为没有上一步,检查 2 未通过,因为它不符合Event == "Start"
的条件。 记录 6 将被丢弃,而不会影响状态或输出。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | X | X | |||||
s3 | 0 | 00:01:00 | "Start" | 00:03:00 | “D” | 00:04:00 | "Stop" |
记录 7
Ts | 事件 |
---|---|
8m | "Start" |
每个步骤的记录计算:
s3
:检查 1 未通过,因为s2
的状态为空,检查 2 未通过,因为它不满足Event == "Stop"
的条件。s2
:检查 1 未通过,因为s1
的状态为空,检查 2 未通过,因为s2
缺少活动序列。s1
:检查 1 未通过,因为没有上一步。 它通过了检查 2,因为它满足Event == "Start"
的条件。 此匹配使用新的m_id
在s1
中启动了一个新序列。 记录 7 及其m_id
(1
) 会添加到状态和输出中。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | 1 | 00:08:00 | "Start" | X | X | X | X |
s2 | X | X | |||||
s3 | 0 | 00:01:00 | "Start" | 00:03:00 | “D” | 00:04:00 | "Stop" |
注意
现在有两个处于该状态的活动序列。
记录 8
Ts | 事件 |
---|---|
11m | "E" |
每个步骤的记录计算:
s3
:检查 1 未通过,因为s2
的状态为空,检查 2 未通过,因为它不满足Event == "Stop"
的s3
条件。s2
:检查 1 已通过,因为s1
的状态为非空,并且记录满足Ts - s1.Ts < 5m
的条件。 此匹配会导致s1
的状态被清除,并将s1
中的序列提升为s2
。 记录 8 及其m_id
(1
) 会添加到状态和输出中。s1
:检查 1 不相关,因为没有上一步,检查 2 未通过,因为记录不符合Event == "Start"
的条件。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | 1 | 00:08:00 | "Start" | 00:11:00 | "E" | X | X |
s3 | 0 | 00:01:00 | "Start" | 00:03:00 | “D” | 00:04:00 | "Stop" |
记录 9
Ts | 事件 |
---|---|
12m | "Stop" |
每个步骤的记录计算:
s3
:检查 1 已通过,因为s2
为非空,并且满足Event == "Stop"
的s3
条件。 此匹配会导致s2
的状态被清除,并将s2
中的序列提升为s3
。 记录 9 及其m_id
(1
) 会添加到状态和输出中。s2
:检查 1 未通过,因为s1
的状态为空,检查 2 未通过,因为s2
缺少活动序列。s1
:检查 1 未通过,因为没有上一步。 它通过了检查 2,因为它满足Event == "Start"
的条件。 此匹配使用新的m_id
在s1
中启动了一个新序列。
状态:
step | m_id | s1.Ts | s1.Event | s2.Ts | s2.Event | s3.Ts | s3.Event |
---|---|---|---|---|---|---|---|
s1 | X | X | X | X | |||
s2 | X | X | |||||
s3 | 1 | 00:08:00 | "Start" | 00:11:00 | "E" | 00:12:00 | "Stop" |
最终输出
Ts | 事件 | m_id |
---|---|---|
00:01:00 | 开始 | 0 |
00:02:00 | B | 0 |
00:03:00 | D | 0 |
00:04:00 | Stop | 0 |
00:08:00 | 开始 | 1 |
00:11:00 | E | 1 |
00:12:00 | Stop | 1 |