read_kafka
table-valued function
Applies to: Databricks SQL Databricks Runtime 13.3 LTS and above
Reads data from an Apache Kafka cluster and returns the data in tabular form.
Can read data from one or more Kafka topics. It supports both batch queries and streaming ingestion.
Syntax
read_kafka([option_key => option_value ] [, ...])
Arguments
This function requires named parameter invocation.
option_key
: The name of the option to configure. You must use backticks (`) for options that contain dots (.
).option_value
: A constant expression to set the option. Accepts literals and scalar functions.
Returns
Records read from an Apache Kafka cluster with the following schema:
key BINARY
: The key of the Kafka record.value BINARY NOT NULL
: The value of the Kafka record.topic STRING NOT NULL
: The name of the Kafka topic the record is read from.partition INT NOT NULL
: The ID of the Kafka partition the record is read from.offset BIGINT NOT NULL
: The offset number of the record in the KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: A timestamp value for the record. ThetimestampType
column defines what this timestamp corresponds to.timestampType INTEGER NOT NULL
: The type of the timestamp specified in thetimestamp
column.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Header values provided as part of the record (if enabled).
Examples
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events',
startingOffsets => 'earliest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
);
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Options
You can find a detailed list of options in the Apache Spark documentation.
Required options
Provide the option below for connecting to your Kafka cluster.
Option |
---|
bootstrapServers Type: String A comma-separated list of host/port pairs pointing to Kafka cluster. Default value: None |
Provide only one of the options below to configure which Kafka topics to pull data from.
Option |
---|
assign Type: String A JSON string that contains the specific topic-partitions to consume from. For example, for '{"topicA":[0,1],"topicB":[2,4]}' , topicA's 0'th and 1st partitions will be consumed from.Default value: None |
subscribe Type: String A comma-separated list of Kafka topics to read from. Default value: None |
subscribePattern Type: String A regular expression matching topics to subscribe to. Default value: None |
Miscellaneous options
read_kafka
can be used in batch queries and in streaming queries. The options below specify which type of query they apply to.
Option |
---|
endingOffsets Type: String Query Type: batch onlyThe offsets to read until for a batch query, either "latest" to specify the latest records, or a JSON string specifying an ending offset for each TopicPartition. In the JSON, -1 as an offset can be used to refer to latest. -2 (earliest) as an offset is not allowed.Default value: "latest" |
endingOffsetsByTimestamp Type: String Query Type: batch onlyA JSON string specifying an ending timestamp to read until for each TopicPartition. The timestamps need to be provided as a long value of the timestamp in milliseconds since 1970-01-01 00:00:00 UTC , for example1686444353000 . See note below on details of behavior with timestamps.endingOffsetsByTimestamp takes precedence over endingOffsets .Default value: None |
endingTimestamp Type: String Query Type: batch onlyA string value of the timestamp in milliseconds since 1970-01-01 00:00:00 UTC , for example "1686444353000" . If Kafka doesn't return the matched offset, the offset will be set to latest. See note below on details of behavior with timestamps. Note: endingTimestamp takes precedence over endingOffsetsByTimestamp andendingOffsets .Default value: None |
includeHeaders Type: Boolean Query Type: streaming and batchWhether to include the Kafka headers in the row. Default value: false |
kafka.<consumer_option> Type: String Query Type: streaming and batchAny Kafka consumer specific options can be passed in with the kafka. prefix. These options need to be surrounded by backticks when provided, otherwise you will get a parser error. You can find the options in the Kafka documentation.Note: You should not set the following options with this function: key.deserializer , value.deserializer , bootstrap.servers , group.id Default value: None |
maxOffsetsPerTrigger Type: Long Query Type: streaming onlyRate limit on the maximum number of offsets or rows processed per trigger interval. The specified total number of offsets will be proportionally split across TopicPartitions. Default value: None |
startingOffsets Type: String Query Type: streaming and batchThe start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a JSON string specifying a starting offset for each TopicPartition. In the JSON, -2 as an offset can be used to refer to earliest, -1 to latest.Note: For batch queries, latest (either implicitly or by using -1 in JSON) is not allowed. For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest. Default value: "latest" for streaming, "earliest" for batch |
startingOffsetsByTimestamp Type: String Query Type: streaming and batchA JSON string specifying a starting timestamp for each TopicPartition. The timestamps need to be provided as a long value of the timestamp in milliseconds since 1970-01-01 00:00:00 UTC , for example 1686444353000 . See note below on details of behavior with timestamps. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp takes precedence over startingOffsets .Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest. Default value: None |
startingOffsetsByTimestampStrategy Type: String Query Type: streaming and batchThis strategy is used when the specified starting offset by timestamp (either global or per partition) doesn't match with the offset Kafka returned. The available strategies are: - "error" : fail the query- "latest" : assigns the latest offset for these partitions so that Spark can read newer records from these partitions in later micro-batches.Default value: "error" |
startingTimestamp Type: String Query Type: streaming and batchA string value of the timestamp in milliseconds since 1970-01-01 00:00:00 UTC , for example "1686444353000" . See note below on details of behavior with timestamps. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy .startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets .Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start earliest. Default value: None |
Note
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. The behavior varies across options if Kafka doesn't return the matched offset - check the description of each option.
Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes
, and doesn't interpret or reason about the value. For more details on KafkaConsumer.offsetsForTimes
, please refer to the documentation. Also, the meaning of timestamp here can vary according to the Kafka configuration (log.message.timestamp.type
). For details, see Apache Kafka documentation.