shuffle 查询
shuffle
查询是一种保留语义的转换,可以结合支持 shuffle
策略的一组运算符使用。 根据涉及的数据,使用 shuffle
策略进行查询可以提高性能。 如果 shuffle
键(join
键、summarize
键、make-series
键或 partition
键)的基数较高,并且常规运算符查询达到查询限制,则使用 shuffle 查询策略效果更好。
可以在 shuffle 命令中使用以下运算符:
若要使用 shuffle
查询策略,请添加表达式 hint.strategy = shuffle
或 hint.shufflekey = <key>
。 使用 hint.strategy=shuffle
时,所有键会导致运算符数据随机排布。 当复合键唯一,但每个键不够唯一时,请使用此表达式,以便使用 shuffle 运算符的所有键来随机排布数据。
使用 shuffle 策略对数据进行分区时,数据负载将在所有群集节点之间分担。 每个节点处理数据的一个分区。 默认分区数等于群集节点数。
可以使用 hint.num_partitions = total_partitions
语法来重写分区数,该语法将控制分区数。 如果群集具有少量的群集节点且默认分区数较小,而查询失败或需要较长的执行时间,则此语法非常有用。
注意
使用大量分区可能会消耗更多的群集资源并降低性能。 从 hint.strategy = shuffle
着手,谨慎选择分区数,然后逐渐开始增加分区。
在某些情况下,会忽略 hint.strategy = shuffle
,且不会在 shuffle
策略中运行查询。 下列情况下测试输入无效:
join
运算符在左侧或右侧有另一个与shuffle
兼容的运算符(join
、summarize
、make-series
或partition
)。summarize
运算符出现在查询中另一个与shuffle
兼容的运算符(join
、summarize
、make-series
或partition
)之后。
语法
With hint.strategy
= shuffle
T |
DataExpression |
join
hint.strategy
= shuffle
(
DataExpression )
T |
summarize
hint.strategy
= shuffle
DataExpression
T |
Query |
partition hint.strategy
= shuffle
(
SubQuery )
使用 hint.shufflekey
= hint.shufflekey
T |
DataExpression |
join
hint.shufflekey
= key (
DataExpression )
T |
summarize
hint.shufflekey
= key DataExpression
T |
make-series
hint.shufflekey
= key DataExpression
T |
Query |
partition hint.shufflekey
= key (
SubQuery )
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
T | string |
✔️ | 要由运算符处理其数据的表格源。 |
DataExpression | string |
隐式或显式表格转换表达式。 | |
查询 | string |
对 T 记录运行的转换表达式。 | |
键 | string |
使用 join 键、summarize 键、make-series 键或 partition 键。 |
|
SubQuery | string |
转换表达式。 |
注意
必须根据所选语法指定 DataExpression 或 Query。
示例
将 summarize 与 shuffle 一起使用
使用 summarize
运算符的 shuffle
策略查询会在所有群集节点上分担负载,其中的每个节点会处理一个数据分区。
StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count
输出
Count |
---|
67 |
将 join 与 shuffle 一起使用
StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle
(
StormEvents
| where EventType has "Hail"
| project EpisodeId, State, DamageProperty
)
on State
| count
输出
Count |
---|
103 |
将 make-series 与 shuffle 一起使用
StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by State
输出
状态 | sum_DamageProperty | StartTime |
---|---|---|
NORTH DAKOTA | [60000,0,0] | ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"] |
NORTH CAROLINA | [20000,0,1000] | ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"] |
ATLANTIC NORTH | [0,0,0] | ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"] |
将 partition 与 shuffle 一起使用
StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
top 3 by DamageProperty
| project EpisodeId, State, DamageProperty
)
| count
输出
Count |
---|
22345 |
Compare hint.strategy=shuffle and hint.shufflekey=key
使用 hint.strategy=shuffle
时,所有键会导致随机运算符随机排布。 在以下示例中,查询使用 EpisodeId
和 EventId
作为键将数据随机排布:
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| count
输出
Count |
---|
14 |
下面的查询使用 hint.shufflekey = key
。 以上查询相当于此查询。
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
输出
Count |
---|
14 |
使用多个键随机排布数据
在某些情况下,将忽略 hint.strategy=shuffle
,且不会在无序策略中运行查询。 例如,在以下示例中,join 的左侧是 summarize,因此使用 hint.strategy=shuffle
不会将 shuffle 策略应用于查询:
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
输出
EpisodeId | EventId | ... | EpisodeId1 | EventId1 | ... |
---|---|---|---|---|---|
1030 | 4407 | ... | 1030 | 4407 | ... |
1030 | 13721 | ... | 1030 | 13721 | ... |
2477 | 12530 | ... | 2477 | 12530 | ... |
2103 | 10237 | ... | 2103 | 10237 | ... |
2103 | 10239 | ... | 2103 | 10239 | ... |
... | ... | ... | ... | ... | ... |
若要克服此问题并在 shuffle 策略中运行,请选择 summarize
和 join
运算通用的键。 在本例中,此键为 EpisodeId
。 使用提示 hint.shufflekey
将 join
的随机排布键指定为 hint.shufflekey = EpisodeId
:
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
输出
EpisodeId | EventId | ... | EpisodeId1 | EventId1 | ... |
---|---|---|---|---|---|
1030 | 4407 | ... | 1030 | 4407 | ... |
1030 | 13721 | ... | 1030 | 13721 | ... |
2477 | 12530 | ... | 2477 | 12530 | ... |
2103 | 10237 | ... | 2103 | 10237 | ... |
2103 | 10239 | ... | 2103 | 10239 | ... |
... | ... | ... | ... | ... | ... |
将 summarize 与 shuffle 一起使用以提高性能
此示例中,将 summarize
运算符与 shuffle
策略一起使用可以提高性能。 源表有 150M 记录,分组依据键的基数是 10M,它分布在 10 个群集节点上。
如果使用 summarize
运算符而不使用 shuffle
策略,查询将在 1 分 8 秒之后结束,内存使用量峰值大约为 3 GB:
orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey
| where o_totalprice < 1000
| count
输出
计数 |
---|
1086 |
将 shuffle
策略与 summarize
一起使用时,查询将在大约 7 秒之后结束,内存使用量峰值为 0.43 GB:
orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey
| where o_totalprice < 1000
| count
输出
计数 |
---|
1086 |
以下示例演示了具有两个群集节点的群集上的性能,其中某个表包含 6000 万条记录,group by 键的基数为 200 万。
在不使用 hint.num_partitions
的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:
lineitem
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey
| consume
如果将分区数设置为 10,则查询将在 23 秒后结束:
lineitem
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey
| consume
将 join 与 shuffle 一起使用以提高性能
以下示例演示如何将 shuffle
策略与 join
运算符一起使用以提高性能。
示例是在包含 10 个节点的群集上采样的,数据分散在所有这些节点上。
查询的左侧源表包含 1500 万条记录,join
键的基数大约为 1400 万。 查询的右侧源包含 1.5 亿条记录,join
键的基数为 1000 万。 查询在大约 28 秒之后结束,内存使用量峰值为 1.43 GB:
customer
| join
orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey
将 shuffle
策略与 join
运算符一起使用时,查询将在大约 4 秒之后结束,内存使用量峰值为 0.3 GB:
customer
| join
hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey
在另一个示例中,我们尝试对符合以下条件的较大数据集运行相同的查询:
join
的左侧源为 1.5 亿,键的基数为 1.48 亿。join
的右侧源为 15 亿,键的基数大约为 1 亿。
仅使用 join
运算符的查询在 4 分钟之后达到限制并超时。 但是,将 shuffle
策略与 join
运算符一起使用时,查询将在大约 34 秒之后结束,内存使用量峰值为 1.23 GB。
以下示例演示了对具有两个群集节点的群集的改进,表中包含 6000 万条记录,join
键的基数为 200 万。
在不使用 hint.num_partitions
的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
hint.shufflekey = l_partkey part
on $left.l_partkey == $right.p_partkey
| consume
如果将分区数设置为 10,则查询将在 23 秒之后结束:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
hint.shufflekey = l_partkey hint.num_partitions = 10 part
on $left.l_partkey == $right.p_partkey
| consume