Azure Functions 的 RedisStreamTrigger
RedisStreamTrigger
从流中读取新条目并将这些元素呈现给函数。
层 | Basic | 标准、高级 | Enterprise、Enterprise Flash |
---|---|---|---|
流 | 是 | 是 | 是 |
重要
消耗计划中运行的函数当前不支持 Redis 触发器。
重要
Azure Cache for Redis 缓存扩展尚不支持适用于 Functions 的 Node.js v4 模型。 有关 v4 模型工作原理的更多详细信息,请参阅 Azure Functions Node.js 开发人员指南。 要详细了解 v3 和 v4 之间的差异,请参阅迁移指南。
重要
Azure Cache for Redis 缓存扩展尚不支持适用于 Functions 的 Python v2 模型。 有关 v2 模型工作原理的更多详细信息,请参阅 Azure Functions Python 开发人员指南。
示例
重要
对于 .NET 函数,建议在进程内模型中使用独立辅助角色模型。 有关进程内和隔离辅助角色模型的比较,请参阅 Azure Functions 上 .NET 的独立辅助角色模型与进程内模型之间的差异。
执行模型 | 说明 |
---|---|
进程内 | 函数代码与 Functions 宿主进程在同一进程中运行。 请在开始之前,查看 .NET 支持的版本。 若要了解详细信息,请参阅使用 Azure Functions 开发 C# 类库函数。 |
隔离进程 | 函数代码在单独的 .NET 工作进程中运行。 请在开始之前,查看 .NET 支持的版本。 有关详细信息,请参阅在 C# 中开发隔离进程函数。 |
using Microsoft.Extensions.Logging;
namespace Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.RedisStreamTrigger
{
internal class SimpleStreamTrigger
{
private readonly ILogger<SimpleStreamTrigger> logger;
public SimpleStreamTrigger(ILogger<SimpleStreamTrigger> logger)
{
this.logger = logger;
}
[Function(nameof(SimpleStreamTrigger))]
public void Run(
[RedisStreamTrigger(Common.connectionStringSetting, "streamKey")] string entry)
{
logger.LogInformation(entry);
}
}
}
package com.function.RedisStreamTrigger;
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.redis.annotation.*;
public class SimpleStreamTrigger {
@FunctionName("SimpleStreamTrigger")
public void run(
@RedisStreamTrigger(
name = "req",
connection = "redisConnectionString",
key = "streamTest",
pollingIntervalInMs = 1000,
maxBatchSize = 1)
String message,
final ExecutionContext context) {
context.getLogger().info(message);
}
}
此示例使用相同的 index.js
文件,function.json
文件中包含绑定数据。
index.js
文件如下所示:
module.exports = async function (context, entry) {
context.log(entry);
}
在function.json
中,下面是绑定数据:
{
"bindings": [
{
"type": "redisStreamTrigger",
"connection": "redisConnectionString",
"key": "streamTest",
"pollingIntervalInMs": 1000,
"maxBatchSize": 16,
"name": "entry",
"direction": "in"
}
],
"scriptFile": "index.js"
}
此示例使用相同的 run.ps1
文件,function.json
文件中包含绑定数据。
run.ps1
文件如下所示:
param($entry, $TriggerMetadata)
Write-Host ($entry | ConvertTo-Json)
在function.json
中,下面是绑定数据:
{
"bindings": [
{
"type": "redisStreamTrigger",
"connection": "redisConnectionString",
"key": "streamTest",
"pollingIntervalInMs": 1000,
"maxBatchSize": 16,
"name": "entry",
"direction": "in"
}
],
"scriptFile": "run.ps1"
}
Python v1 编程模型要求在函数文件夹中的单独 function.json 文件中定义绑定。 有关详细信息,请参阅 Python 开发人员指南。
此示例使用相同的 __init__.py
文件,function.json
文件中包含绑定数据。
__init__.py
文件如下所示:
import logging
def main(entry: str):
logging.info(entry)
在function.json
中,下面是绑定数据:
{
"bindings": [
{
"type": "redisStreamTrigger",
"connection": "redisConnectionString",
"key": "streamTest",
"pollingIntervalInMs": 1000,
"maxBatchSize": 16,
"name": "entry",
"direction": "in"
}
],
"scriptFile": "__init__.py"
}
特性
参数 | 说明 | 需要 | 默认 |
---|---|---|---|
Connection |
包含缓存连接字符串的应用程序设置的名称,例如:<cacheName>.redis.cache.chinacloudapi.cn:6380,password... |
是 | |
Key |
要从中读取的键。 | 是 | |
PollingIntervalInMs |
轮询 Redis 服务器的频率(以毫秒为单位)。 | 可选 | 1000 |
MessagesPerWorker |
每个函数辅助角色应处理的消息数。 用于确定函数应缩放到的辅助角色数。 | 可选 | 100 |
Count |
一次从 Redis 中拉取的元素数。 | 可选 | 10 |
DeleteAfterProcess |
指示函数是否在处理后删除流条目。 | 可选 | false |
批注
参数 | 步骤 | 需要 | 默认 |
---|---|---|---|
name |
entry |
是 | |
connection |
包含缓存连接字符串的应用程序设置的名称,例如:<cacheName>.redis.cache.chinacloudapi.cn:6380,password... |
是 | |
key |
要从中读取的键。 | 是 | |
pollingIntervalInMs |
轮询 Redis 的频率(以毫秒为单位)。 | 可选 | 1000 |
messagesPerWorker |
每个函数辅助角色应处理的消息数。 用于确定应将函数范围扩展到的辅助角色数。 | 可选 | 100 |
count |
一次从 Redis 中读取的条目数。 系统会并行处理条目。 | 可选 | 10 |
deleteAfterProcess |
是否在函数运行后删除流条目。 | 可选 | false |
配置
下表解释了在 function.json 文件中设置的绑定配置属性。
function.json 属性 | 说明 | 需要 | 默认 |
---|---|---|---|
type |
是 | ||
deleteAfterProcess |
可选 | false |
|
connection |
包含缓存连接字符串的应用程序设置的名称,例如:<cacheName>.redis.cache.chinacloudapi.cn:6380,password... |
是 | |
key |
要从中读取的键。 | 是 | |
pollingIntervalInMs |
轮询 Redis 的频率(以毫秒为单位)。 | 可选 | 1000 |
messagesPerWorker |
(可选)每个函数辅助角色应处理的消息数。 用于确定函数应缩放到的辅助角色数 | 可选 | 100 |
count |
一次从 Redis 中读取的条目数。 这些是并行处理的。 | 可选 | 10 |
name |
是 | ||
direction |
是 |
有关完整示例,请参阅示例部分。
使用情况
RedisStreamTrigger
Azure 函数从流中读取新条目,并将这些条目呈现到函数中。
触发器以可配置的固定间隔轮询 Redis,并使用 XREADGROUP
从流中读取元素。
函数的所有实例的使用者组是函数的名称,对于 StreamTrigger 示例,为 SimpleStreamTrigger
。
每个函数实例会使用 WEBSITE_INSTANCE_ID
或创建一个随机 GUID,用作其在组中的使用者名称,以确保函数的横向扩展实例不会从流中读取相同的消息。
类型 | 描述 |
---|---|
byte[] |
来自通道的消息。 |
string |
来自通道的消息。 |
Custom |
触发器使用 Json.NET 序列化将通道中的消息从 string 映射到自定义类型。 |