scan 运算符

适用于:✅Azure 数据资源管理器Azure MonitorMicrosoft Sentinel

基于谓词扫描数据、进行匹配和生成序列。

根据运算符的步骤中定义的谓词确定匹配记录。 谓词取决于前面的步骤生成的状态。 匹配记录的输出取决于运算符的步骤中定义的输入记录和分配。

语法

T | scan [ with_match_id = MatchIdColumnName ] [ declare ( ColumnDeclarations ) ] with ( StepDefinitions )

ColumnDeclarations 语法

ColumnName : ColumnType[= DefaultValue ] [, ... ]

StepDefinition 语法

step StepName [ output = all | last | none] : Condition [ => Column = Assignment [, ... ] ] ;

详细了解语法约定

参数

客户 类型​​ 必需 说明
T string ✔️ 输入表格源。
MatchIdColumnName string 某个类型为 long 的列的名称,在扫描执行过程中会将其追加到输出中。 指示记录匹配的从 0 开始的索引。
ColumnDeclarations string 声明 T 架构的扩展。这些列是在步骤中分配的值。 如果未分配,则返回 DefaultValue。 除非另有指定,否则 DefaultValuenull
StepName string ✔️ 用于在扫描状态下引用条件和分配的值。 步骤名称必须是唯一的。
条件 string ✔️ 一个计算结果为 truefalse 的表达式,定义输入中的哪些记录与步骤匹配。 如果对于步骤的状态或前一个步骤的状态,条件为 true,说明记录与步骤相匹配。
转让 string 当某个记录与步骤相匹配时分配到相应列的标量表达式。
output string 控制步骤在重复匹配时的输出逻辑。 all 输出与步骤匹配的所有记录,last 仅输出步骤的一系列重复匹配项中的最后一条记录,而 none 不会输出任何与步骤匹配的记录。 默认为 all

返回

输入的某个记录与某个步骤的每个匹配项的记录。 输出的架构是在源的架构基础上使用子句中的 declare 列进行了扩展后得到的架构。

扫描逻辑

scan 会按每条记录检查序列化输入数据,将每条记录与每个步骤的条件进行比较,同时考虑每个步骤的当前状态。

状态

可以将 scan 运算符的基础状态视为一个表,其中每个 step 都有一行。 每个步骤都使用列的最新值以及前面所有步骤和当前步骤中声明的变量来维护自己的状态。 如果相关,它还会保留正在进行的序列的匹配 ID。

如果 scan 运算符具有分别名为 s_1s_2 ... s_nn 个步骤,那么步骤 s_k 在其状态中具有 k 条记录,分别对应于 s_1s_2 ... s_kStepName.ColumnName 格式用于引用状态中的值。 例如,s_2.col1 会引用属于步骤 s_2 的列 col1,其状态为 s_k。 有关详细示例,请参阅扫描逻辑演练

状态开始时为空,在扫描的输入记录与步骤匹配时更新。 当前步骤的状态为非空时,步骤称为具有活动序列

匹配逻辑

每个输入记录针对所有步骤按照从最后一个步骤到第一个步骤的反向顺序进行计算。 当记录 r 针对某个步骤 s_k 进行计算时,会应用以下逻辑:

  • 检查 1:如果上一步 (s_k-1) 的状态为非空,并且 r 满足 s_k条件,则会发生匹配。 匹配会导致以下操作:

    1. 清除 s_k 的状态。
    2. s_k-1 的状态提升为 s_k 状态。
    3. 计算 s_k 的分配,并扩展 r
    4. 会将扩展的 r 添加到输出中以及 s_k 的状态中

    注意

    如果检查 1 导致匹配,则忽略检查 2,并且 r 会继续针对 s_k-1 进行计算。

  • 检查 2:如果 s_k 的状态具有活动序列或 s_k 是第一步,并且 r 满足 s_k条件,则会发生匹配。 匹配会导致以下操作:

    1. 计算 s_k 的分配,并扩展 r
    2. 表示处于 s_k 状态的 s_k 的值会被替换为扩展的 r 的值。
    3. 如果 s_k 定义为 output=all,则扩展的 r 会添加到输出中。
    4. 如果 s_k 是第一步,则会开始一个新序列,匹配 ID 会增加 1。 这只在使用 with_match_id 时会影响输出。

完成 s_k 的检查后,r 会继续针对 s_k-1 进行计算。

有关此逻辑的详细示例,请参阅扫描逻辑演练

示例

累计和

计算某个输入列的累计和。 此示例的结果与使用 row_cumsum() 的结果一样。

range x from 1 to 5 step 1 
| scan declare (cumulative_x:long=0) with 
(
    step s1: true => cumulative_x = x + s1.cumulative_x;
)

输出

x cumulative_x
1 1
2 3
3 6
4 10
5 15

具有重置条件的多个列的累积和

计算两个输入列的累积和,每当累积和达到 10 或更多时,将总和值重置为当前记录值。

range x from 1 to 5 step 1
| extend y = 2 * x
| scan declare (cumulative_x:long=0, cumulative_y:long=0) with 
(
    step s1: true => cumulative_x = iff(s1.cumulative_x >= 10, x, x + s1.cumulative_x), 
                     cumulative_y = iff(s1.cumulative_y >= 10, y, y + s1.cumulative_y);
)

输出

x y cumulative_x cumulative_y
1 2 1 2
2 4 3 6
3 6 6 12
4 8 10 8
5 10 5 18

向前填充一列

向前填充一个字符串列。 为每个空值分配上次出现的非空值。

let Events = datatable (Ts: timespan, Event: string) [
    0m, "A",
    1m, "",
    2m, "B",
    3m, "",
    4m, "",
    6m, "C",
    8m, "",
    11m, "D",
    12m, ""
]
;
Events
| sort by Ts asc
| scan declare (Event_filled: string="") with 
(
    step s1: true => Event_filled = iff(isempty(Event), s1.Event_filled, Event);
)

输出

Ts 事件 Event_filled
00:00:00 A A
00:01:00 A
00:02:00 B B
00:03:00 B
00:04:00 B
00:06:00 C C
00:08:00 C
00:11:00 D D
00:12:00 D

会话标记

将输入划分为多个会话:会话在会话的第一个事件后 30 分钟时结束,在此之后将启动新会话。 请注意 with_match_id 标志的使用,它会为扫描的每个不同匹配(会话)分配一个唯一值。 另请注意,在本示例中特别使用了两个步骤,inSessiontrue 作为条件,因此它捕获并输出来自输入的所有记录,而 endSession 捕获距离当前匹配的 sessionStart 值 30 分钟以上的记录。 endSession 步骤具有 output=none,这意味着它不会生成输出记录。 endSession 步骤用于将当前匹配项的状态从 inSession 提升为 endSession,从而允许从当前记录起开始新的匹配(会话)。

let Events = datatable (Ts: timespan, Event: string) [
    0m, "A",
    1m, "A",
    2m, "B",
    3m, "D",
    32m, "B",
    36m, "C",
    38m, "D",
    41m, "E",
    75m, "A"
]
;
Events
| sort by Ts asc
| scan with_match_id=session_id declare (sessionStart: timespan) with 
(
    step inSession: true => sessionStart = iff(isnull(inSession.sessionStart), Ts, inSession.sessionStart);
    step endSession output=none: Ts - inSession.sessionStart > 30m;
)

输出

Ts 事件 sessionStart session_id
00:00:00 A 00:00:00 0
00:01:00 A 00:00:00 0
00:02:00 B 00:00:00 0
00:03:00 D 00:00:00 0
00:32:00 B 00:32:00 1
00:36:00 C 00:32:00 1
00:38:00 D 00:32:00 1
00:41:00 E 00:32:00 1
01:15:00 A 01:15:00 2

启动与暂停之间的事件

查找 5 分钟内发生的、事件 Start 与事件 Stop 之间的所有事件序列。 为每个序列分配匹配 ID。

let Events = datatable (Ts: timespan, Event: string) [
    0m, "A",
    1m, "Start",
    2m, "B",
    3m, "D",
    4m, "Stop",
    6m, "C",
    8m, "Start",
    11m, "E",
    12m, "Stop"
]
;
Events
| sort by Ts asc
| scan with_match_id=m_id with 
(
    step s1: Event == "Start";
    step s2: Event != "Start" and Event != "Stop" and Ts - s1.Ts <= 5m;
    step s3: Event == "Stop" and Ts - s1.Ts <= 5m;
)

输出

Ts 事件 m_id
00:01:00 开始 0
00:02:00 B 0
00:03:00 D 0
00:04:00 Stop 0
00:08:00 开始 1
00:11:00 E 1
00:12:00 Stop 1

计算事件的自定义漏斗图

根据 State 计算序列 Hail ->Tornado ->Thunderstorm Wind 的漏斗图完成情况,并对事件之间的时间间隔设置自定义阈值(1h 内的 Tornado2h 内的 Thunderstorm Wind)。 此示例类似于 funnel_sequence_completion plugin,但灵活性更高。

StormEvents
| partition hint.strategy=native by State 
    (
    sort by StartTime asc
    | scan with 
    (
        step hail: EventType == "Hail";
        step tornado: EventType == "Tornado" and StartTime - hail.StartTime <= 1h;
        step thunderstormWind: EventType == "Thunderstorm Wind" and StartTime - tornado.StartTime <= 2h;
    )
    )
| summarize dcount(State) by EventType

输出

EventType dcount_State
冰雹 50
龙卷风 34
雷雨大风 32

扫描逻辑演练

本部分使用启动和停止之间的事件示例的分步演练演示了扫描逻辑

let Events = datatable (Ts: timespan, Event: string) [
    0m, "A",
    1m, "Start",
    2m, "B",
    3m, "D",
    4m, "Stop",
    6m, "C",
    8m, "Start",
    11m, "E",
    12m, "Stop"
]
;
Events
| sort by Ts asc
| scan with_match_id=m_id with 
(
    step s1: Event == "Start";
    step s2: Event != "Start" and Event != "Stop" and Ts - s1.Ts <= 5m;
    step s3: Event == "Stop" and Ts - s1.Ts <= 5m;
)

状态

scan 运算符的状态视为一个表,其中每个步骤都有一行,每个步骤都有自己的状态。 此状态包含所有前面步骤和当前步骤中列和声明的变量的最新值。 有关详细信息,请参阅状态

对于此示例,可以使用下表表示状态:

步骤 m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 X X
s3

“X”表示特定字段与该步骤无关。

匹配逻辑

本部分对 Events 表中的每条记录遵循匹配逻辑,说明每个步骤中状态和输出的转换。

注意

输入记录针对步骤按照从最后一步 (s3) 到第一步 (s1) 的反向顺序进行计算。

记录 1

Ts 事件
0m "A"

每个步骤的记录计算:

  • s3检查 1 未通过,因为 s2 的状态为空,检查 2 未通过,因为 s3 缺少活动序列。
  • s2检查 1 未通过,因为 s1 的状态为空,检查 2 未通过,因为 s2 缺少活动序列。
  • s1检查 1 不相关,因为没有上一步。 检查 2 未通过,因为记录不符合 Event == "Start" 的条件。 记录 1 将被丢弃,而不会影响状态或输出。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 X X
s3

记录 2

Ts 事件
1m "Start"

每个步骤的记录计算:

  • s3检查 1 未通过,因为 s2 的状态为空,检查 2 未通过,因为 s3 缺少活动序列。
  • s2检查 1 未通过,因为 s1 的状态为空,检查 2 未通过,因为 s2 缺少活动序列。
  • s1检查 1 不相关,因为没有上一步。 检查 2 已通过,因为记录满足 Event == "Start" 的条件。 此匹配会启动一个新序列,并分配 m_id记录 2 及其 m_id (0) 会添加到状态和输出中。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 0 00:01:00 "Start" X X X X
s2 X X
s3

记录 3

Ts 事件
2m "B"

每个步骤的记录计算:

  • s3检查 1 未通过,因为 s2 的状态为空,检查 2 未通过,因为 s3 缺少活动序列。
  • s2检查 1 已通过,因为 s1 的状态为非空,并且记录满足 Ts - s1.Ts < 5m 的条件。 此匹配会导致 s1 的状态被清除,并将 s1 中的序列提升为 s2记录 3 及其 m_id (0) 会添加到状态和输出中。
  • s1检查 1 不相关,因为没有上一步,检查 2 未通过,因为记录不符合 Event == "Start" 的条件。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 0 00:01:00 "Start" 00:02:00 "B" X X
s3

记录 4

Ts 事件
3m “D”

每个步骤的记录计算:

  • s3检查 1 未通过,因为记录不符合 Event == "Stop" 的条件,检查 2 未通过,因为 s3 缺少活动序列。
  • s2检查 1 未通过,因为 s1 的状态为空。 它通过了检查 2,因为它满足 Ts - s1.Ts < 5m 的条件。 记录 4 及其 m_id (0) 会添加到状态和输出中。 此记录中的值会覆盖 s2.Tss2.Event 的先前状态值。
  • s1检查 1 不相关,因为没有上一步,检查 2 未通过,因为记录不符合 Event == "Start" 的条件。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 0 00:01:00 "Start" 00:03:00 “D” X X
s3

记录 5

Ts 事件
4m "Stop"

每个步骤的记录计算:

  • s3检查 1 已通过,因为 s2 为非空,并且满足 Event == "Stop"s3 条件。 此匹配会导致 s2 的状态被清除,并将 s2 中的序列提升为 s3记录 5 及其 m_id (0) 会添加到状态和输出中。
  • s2检查 1 未通过,因为 s1 的状态为空,检查 2 未通过,因为 s2 缺少活动序列。
  • s1检查 1 不相关,因为没有上一步。 检查 2 未通过,因为记录不符合 Event == "Start" 的条件。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 X X
s3 0 00:01:00 "Start" 00:03:00 “D” 00:04:00 "Stop"

记录 6

Ts 事件
6m "C"

每个步骤的记录计算:

  • s3检查 1 未通过,因为 s2 的状态为空,检查 2 未通过,因为 s3 不满足 Event == "Stop"s3 条件。
  • s2检查 1 未通过,因为 s1 的状态为空,检查 2 未通过,因为 s2 缺少活动序列。
  • s1检查 1 未通过,因为没有上一步,检查 2 未通过,因为它不符合 Event == "Start" 的条件。 记录 6 将被丢弃,而不会影响状态或输出。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 X X
s3 0 00:01:00 "Start" 00:03:00 “D” 00:04:00 "Stop"

记录 7

Ts 事件
8m "Start"

每个步骤的记录计算:

  • s3检查 1 未通过,因为 s2 的状态为空,检查 2 未通过,因为它不满足 Event == "Stop" 的条件。
  • s2检查 1 未通过,因为 s1 的状态为空,检查 2 未通过,因为 s2 缺少活动序列。
  • s1检查 1 未通过,因为没有上一步。 它通过了检查 2,因为它满足 Event == "Start" 的条件。 此匹配使用新的 m_ids1 中启动了一个新序列。 记录 7 及其 m_id (1) 会添加到状态和输出中。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 1 00:08:00 "Start" X X X X
s2 X X
s3 0 00:01:00 "Start" 00:03:00 “D” 00:04:00 "Stop"

注意

现在有两个处于该状态的活动序列。

记录 8

Ts 事件
11m "E"

每个步骤的记录计算:

  • s3检查 1 未通过,因为 s2 的状态为空,检查 2 未通过,因为它不满足 Event == "Stop"s3 条件。
  • s2检查 1 已通过,因为 s1 的状态为非空,并且记录满足 Ts - s1.Ts < 5m 的条件。 此匹配会导致 s1 的状态被清除,并将 s1 中的序列提升为 s2记录 8 及其 m_id (1) 会添加到状态和输出中。
  • s1检查 1 不相关,因为没有上一步,检查 2 未通过,因为记录不符合 Event == "Start" 的条件。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 1 00:08:00 "Start" 00:11:00 "E" X X
s3 0 00:01:00 "Start" 00:03:00 “D” 00:04:00 "Stop"

记录 9

Ts 事件
12m "Stop"

每个步骤的记录计算:

  • s3检查 1 已通过,因为 s2 为非空,并且满足 Event == "Stop"s3 条件。 此匹配会导致 s2 的状态被清除,并将 s2 中的序列提升为 s3记录 9 及其 m_id (1) 会添加到状态和输出中。
  • s2检查 1 未通过,因为 s1 的状态为空,检查 2 未通过,因为 s2 缺少活动序列。
  • s1检查 1 未通过,因为没有上一步。 它通过了检查 2,因为它满足 Event == "Start" 的条件。 此匹配使用新的 m_ids1 中启动了一个新序列。

状态:

step m_id s1.Ts s1.Event s2.Ts s2.Event s3.Ts s3.Event
s1 X X X X
s2 X X
s3 1 00:08:00 "Start" 00:11:00 "E" 00:12:00 "Stop"

最终输出

Ts 事件 m_id
00:01:00 开始 0
00:02:00 B 0
00:03:00 D 0
00:04:00 Stop 0
00:08:00 开始 1
00:11:00 E 1
00:12:00 Stop 1