This article shows you how to ingest JSON formatted data into an Azure Data Explorer database. You'll start with simple examples of raw and mapped JSON, continue to multi-lined JSON, and then tackle more complex JSON schemas containing arrays and dictionaries. The examples detail the process of ingesting JSON formatted data using Kusto Query Language (KQL), C#, or Python.
Note
We don't recommend using .ingest
management commands in production scenarios. Instead, use a data connector or programmatically ingest data using one of the Kusto client libraries.
Prerequisites
- A Microsoft account or a Microsoft Entra user identity. An Azure subscription isn't required.
- An Azure Data Explorer cluster and database. Create a cluster and database.
Azure Data Explorer supports two JSON file formats:
json
: Line separated JSON. Each line in the input data has exactly one JSON record. This format supports parsing of comments and single-quoted properties. For more information, see JSON Lines.
multijson
: Multi-lined JSON. The parser ignores the line separators and reads a record from the previous position to the end of a valid JSON.
Note
When ingesting using the get data experience, the default format is multijson
. The format can handle multiline JSON records and arrays of JSON records. When a parsing error is encountered, the entire file is discarded. To ignore invalid JSON records, select the option to "Ignore data format errors.", which will switch the format to json
(JSON Lines).
If you're using the JSON Line format (json
), lines that don't represent a valid JSON records are skipped during parsing.
Ingestion of JSON formatted data requires you to specify the format using ingestion property. Ingestion of JSON data requires mapping, which maps a JSON source entry to its target column. When ingesting data, use the IngestionMapping
property with its ingestionMappingReference
(for a pre-defined mapping) ingestion property or its IngestionMappings
property. This article will use the ingestionMappingReference
ingestion property, which is pre-defined on the table used for ingestion. In the examples below, we'll start by ingesting JSON records as raw data to a single column table. Then we'll use the mapping to ingest each property to its mapped column.
Simple JSON example
The following example is a simple JSON, with a flat structure. The data has temperature and humidity information, collected by several devices. Each record is marked with an ID and timestamp.
{
"timestamp": "2019-05-02 15:23:50.0369439",
"deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
}
Ingest raw JSON records
In this example, you ingest JSON records as raw data to a single column table. The data manipulation, using queries, and update policy is done after the data is ingested.
Use Kusto Query Language to ingest data in a raw JSON format.
Sign in to https://dataexplorer.azure.cn.
Select Add cluster.
In the Add cluster dialog box, enter your cluster URL in the form https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/
, then select Add.
Paste in the following command, and select Run to create the table.
.create table RawEvents (Event: dynamic)
This query creates a table with a single Event
column of a dynamic data type.
Create the JSON mapping.
.create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
This command creates a mapping, and maps the JSON root path $
to the Event
column.
Ingest data into the RawEvents
table.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Use C# to ingest data in raw JSON format.
Create the RawEvents
table.
var kustoUri = "https://<clusterName>.<region>.kusto.chinacloudapi.cn/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
Create the JSON mapping.
var tableMappingName = "RawEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Events", Properties = new Dictionary<string, string> { { "path", "$" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
This command creates a mapping, and maps the JSON root path $
to the Event
column.
Ingest data into the RawEvents
table.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.chinacloudapi.cn/";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var blobPath = "https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Note
Data is aggregated according to batching policy, resulting in a latency of a few minutes.
Use Python to ingest data in raw JSON format.
Create the RawEvents
table.
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/"
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
KUSTO_CLIENT = KustoClient(KCSB_DATA)
TABLE = "RawEvents"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Create the JSON mapping.
MAPPING = "RawEventMapping"
CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ingest data into the RawEvents
table.
INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.chinacloudapi.cn/"
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
BLOB_PATH = 'https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Note
Data is aggregated according to batching policy, resulting in a latency of a few minutes.
Ingest mapped JSON records
In this example, you ingest JSON records data. Each JSON property is mapped to a single column in the table.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
.create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
Create the JSON mapping.
.create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
In this mapping, as defined by the table schema, the timestamp
entries will be ingested to the column Time
as datetime
data types.
Ingest data into the Events
table.
.ingest into table Events ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
The file 'simple.json' has a few line-separated JSON records. The format is json
, and the mapping used in the ingest command is the FlatEventMapping
you created.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
var tableName = "Events";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("Time", "System.DateTime"),
Tuple.Create("Device", "System.String"),
Tuple.Create("MessageId", "System.String"),
Tuple.Create("Temperature", "System.Double"),
Tuple.Create("Humidity", "System.Double")
}
);
await kustoClient.ExecuteControlCommandAsync(command);
Create the JSON mapping.
var tableMappingName = "FlatEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Time", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.timestamp" } } },
new() { ColumnName = "Device", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.deviceId" } } },
new() { ColumnName = "MessageId", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.messageId" } } },
new() { ColumnName = "Temperature", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.temperature" } } },
new() { ColumnName = "Humidity", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.humidity" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
In this mapping, as defined by the table schema, the timestamp
entries will be ingested to the column Time
as datetime
data types.
Ingest data into the Events
table.
var blobPath = "https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
The file 'simple.json' has a few line-separated JSON records. The format is json
, and the mapping used in the ingest command is the FlatEventMapping
you created.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
TABLE = "Events"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Create the JSON mapping.
MAPPING = "FlatEventMapping"
CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ingest data into the Events
table.
BLOB_PATH = 'https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
The file 'simple.json' has a few line separated JSON records. The format is json
, and the mapping used in the ingest command is the FlatEventMapping
you created.
Ingest multi-lined JSON records
In this example, you ingest multi-lined JSON records. Each JSON property is mapped to a single column in the table. The file 'multilined.json' has a few indented JSON records. The format multijson
indicates to read records by the JSON structure.
Ingest data into the Events
table.
.ingest into table Events ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Ingest data into the Events
table.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Ingest data into the Events
table.
MAPPING = "FlatEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/multilined.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Ingest JSON records containing arrays
Array data types are an ordered collection of values. Ingestion of a JSON array is done by an update policy. The JSON is ingested as-is to an intermediate table. An update policy runs a pre-defined function on the RawEvents
table, reingesting the results to the target table. We'll ingest data with the following structure:
{
"records":
[
{
"timestamp": "2019-05-02 15:23:50.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
},
{
"timestamp": "2019-05-02 15:23:51.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "57de2821-7581-40e4-861e-ea3bde102364",
"temperature": 33.7529423105311,
"humidity": 75.4787976739364
}
]
}
Create an update policy
function that expands the collection of records
so that each value in the collection receives a separate row, using the mv-expand
operator. We'll use table RawEvents
as a source table and Events
as a target table.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
The schema received by the function must match the schema of the target table. Use getschema
operator to review the schema.
EventRecordsExpand() | getschema
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents
intermediate table and ingest the results into the Events
table. Define a zero-retention policy to avoid persisting the intermediate table.
.alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
Ingest data into the RawEvents
table.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Review data in the Events
table.
Events
Create an update function that expands the collection of records
so that each value in the collection receives a separate row, using the mv-expand
operator. We'll use table RawEvents
as a source table and Events
as a target table.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Note
The schema received by the function must match the schema of the target table.
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents
intermediate table and ingest its results into the Events
table. Define a zero-retention policy to avoid persisting the intermediate table.
command = ".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
await kustoClient.ExecuteControlCommandAsync(command);
Ingest data into the RawEvents
table.
var blobPath = "https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Review data in the Events
table.
Create an update function that expands the collection of records
so that each value in the collection receives a separate row, using the mv-expand
operator. We'll use table RawEvents
as a source table and Events
as a target table.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Note
The schema received by the function has to match the schema of the target table.
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents
intermediate table and ingest its results into the Events
table. Define a zero-retention policy to avoid persisting the intermediate table.
CREATE_UPDATE_POLICY_COMMAND =
""".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ingest data into the RawEvents
table.
TABLE = "RawEvents"
MAPPING = "RawEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/array.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Review data in the Events
table.
Related content