Connect to Apache Kafka on HDInsight through an Azure Virtual Network

Learn how to directly connect to Apache Kafka on HDInsight through an Azure Virtual Network. This document provides information on connecting to Kafka using the following configurations:

  • From resources in an on-premises network. This connection is established by using a VPN device (software or hardware) on your local network.
  • From a development environment using a VPN software client.

Note

We recommend that you use the Azure Az PowerShell module to interact with Azure. To get started, see Install Azure PowerShell. To learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.

Architecture and planning

HDInsight does not allow direct connection to Kafka over the public internet. Instead, Kafka clients (producers and consumers) must use one of the following connection methods:

  • Run the client in the same virtual network as Kafka on HDInsight. This configuration is used in the Start with Apache Kafka on HDInsight document. The client runs directly on the HDInsight cluster nodes or on another virtual machine in the same network.

  • Connect a private network, such as your on-premises network, to the virtual network. This configuration allows clients in your on-premises network to directly work with Kafka. To enable this configuration, perform the following tasks:

    1. Create a virtual network.

    2. Create a VPN gateway that uses a site-to-site configuration. The configuration used in this document connects to a VPN gateway device in your on-premises network.

    3. Create a DNS server in the virtual network.

    4. Configure forwarding between the DNS server in each network.

    5. Create a Kafka on HDInsight cluster in the virtual network.

      For more information, see the Connect to Apache Kafka from an on-premises network section.

  • Connect individual machines to the virtual network using a VPN gateway and VPN client. To enable this configuration, perform the following tasks:

    1. Create a virtual network.

    2. Create a VPN gateway that uses a point-to-site configuration. This configuration can be used with both Windows and macOS clients.

    3. Create a Kafka on HDInsight cluster in the virtual network.

    4. Configure Kafka for IP advertising. This configuration allows the client to connect using broker IP addresses instead of domain names.

    5. Download and use the VPN client on the development system.

      For more information, see the Connect to Apache Kafka with a VPN client section.

      Warning

      This configuration is only recommended for development purposes because of the following limitations:

      • Each client must connect using a VPN software client.
      • The VPN client does not pass name resolution requests to the virtual network, so you must use IP addressing to communicate with Kafka. IP communication requires additional configuration on the Kafka cluster.

For more information on using HDInsight in a virtual network, see Plan a virtual network for Azure HDInsight clusters.

Connect to Apache Kafka from an on-premises network

To create a Kafka cluster that communicates with your on-premises network, follow the steps in the Connect HDInsight to your on-premises network document.

Important

When creating the HDInsight cluster, select the Kafka cluster type.

These steps create the following configuration:

  • Azure Virtual Network
  • Site-to-site VPN gateway
  • Azure Storage account (used by HDInsight)
  • Kafka on HDInsight

To verify that a Kafka client can connect to the cluster from on-premises, use the steps in the Example: Python client section.

Connect to Apache Kafka with a VPN client

Use the steps in this section to create the following configuration:

  • Azure Virtual Network
  • Point-to-site VPN gateway
  • Azure Storage Account (used by HDInsight)
  • Kafka on HDInsight
  1. Follow the steps in the Working with self-signed certificates for Point-to-site connections document. This document creates the certificates needed for the gateway.

  2. Open a PowerShell prompt and use the following code to sign in to your Azure subscription:

    Connect-AzAccount -Environment AzureChinaCloud
    # If you have multiple subscriptions, uncomment to set the subscription
    #Select-AzSubscription -SubscriptionName "name of your subscription"
    
  3. Use the following code to create variables that contain configuration information:

    # Prompt for generic information
    $resourceGroupName = Read-Host "What is the resource group name?"
    $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':"
    $location = Read-Host "What Azure Region do you want to create the resources in?"
    $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway."
    
    # Prompt for HDInsight credentials
    $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin"
    $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser"
    
    # Names for Azure resources
    $networkName = "net-$baseName"
    $clusterName = "kafka-$baseName"
    $storageName = "store$baseName" # Can't use dashes in storage names
    $defaultContainerName = $clusterName
    $defaultSubnetName = "default"
    $gatewaySubnetName = "GatewaySubnet"
    $gatewayPublicIpName = "GatewayIp"
    $gatewayIpConfigName = "GatewayConfig"
    $vpnRootCertName = "rootcert"
    $vpnName = "VPNGateway"
    
    # Network settings
    $networkAddressPrefix = "10.0.0.0/16"
    $defaultSubnetPrefix = "10.0.0.0/24"
    $gatewaySubnetPrefix = "10.0.1.0/24"
    $vpnClientAddressPool = "172.16.201.0/24"
    
    # HDInsight settings
    $hdiWorkerNodes = 4
    $hdiVersion = "3.6"
    $hdiType = "Kafka"
    
  4. Use the following code to create the Azure resource group and virtual network:

    # Create the resource group that contains everything
    New-AzResourceGroup -Name $resourceGroupName -Location $location
    
    # Create the subnet configuration
    $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -AddressPrefix $defaultSubnetPrefix
    $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -AddressPrefix $gatewaySubnetPrefix
    
    # Create the subnet
    New-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AddressPrefix $networkAddressPrefix `
        -Subnet $defaultSubnetConfig, $gatewaySubnetConfig
    
    # Get the network & subnet that were created
    $network = Get-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName
    $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -VirtualNetwork $network
    $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -VirtualNetwork $network
    
    # Set a dynamic public IP address for the gateway subnet
    $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AllocationMethod Dynamic
    $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName `
        -Subnet $gatewaySubnet `
        -PublicIpAddress $gatewayPublicIp
    
    # Get the certificate info
    # Get the full path in case a relative path was passed
    $rootCertFile = Get-ChildItem $rootCert
    $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile)
    $certBase64 = [System.Convert]::ToBase64String($cert.RawData)
    $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName `
        -PublicCertData $certBase64
    
    # Create the VPN gateway
    New-AzVirtualNetworkGateway -Name $vpnName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -IpConfigurations $gatewayIpConfig `
        -GatewayType Vpn `
        -VpnType RouteBased `
        -EnableBgp $false `
        -GatewaySku Standard `
        -VpnClientAddressPool $vpnClientAddressPool `
        -VpnClientRootCertificates $p2sRootCert
    

    Warning

    It can take several minutes for this process to complete.

  5. Use the following code to create the Azure Storage Account and blob container:

    # Create the storage account
    New-AzStorageAccount `
        -ResourceGroupName $resourceGroupName `
        -Name $storageName `
        -SkuName Standard_GRS `
        -Location $location `
        -Kind StorageV2 `
        -EnableHttpsTrafficOnly 1
    
    # Get the storage account keys and create a context
    $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName `
        -Name $storageName)[0].Value
    $storageContext = New-AzStorageContext -StorageAccountName $storageName `
        -StorageAccountKey $defaultStorageKey
    
    # Create the default storage container
    New-AzStorageContainer -Name $defaultContainerName `
        -Context $storageContext
    
  6. Use the following code to create the HDInsight cluster:

    # Create the HDInsight cluster
    New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroupName `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $hdiWorkerNodes `
        -ClusterType $hdiType `
        -OSType Linux `
        -Version $hdiVersion `
        -HttpCredential $adminCreds `
        -SshCredential $sshCreds `
        -DefaultStorageAccountName "$storageName.blob.core.chinacloudapi.cn" `
        -DefaultStorageAccountKey $defaultStorageKey `
        -DefaultStorageContainer $defaultContainerName `
        -DisksPerWorkerNode 2 `
        -VirtualNetworkId $network.Id `
        -SubnetName $defaultSubnet.Id
    

    Warning

    This process takes around 15 minutes to complete.

Configure Kafka for IP advertising

By default, Apache Zookeeper returns the domain name of the Kafka brokers to clients. This configuration does not work with the VPN software client, as it cannot use name resolution for entities in the virtual network. For this configuration, use the following steps to configure Kafka to advertise IP addresses instead of domain names:

  1. Using a web browser, go to https://CLUSTERNAME.azurehdinsight.cn. Replace CLUSTERNAME with the name of the Kafka on HDInsight cluster.

    When prompted, use the HTTPS user name and password for the cluster. The Ambari Web UI for the cluster is displayed.

  2. To view information on Kafka, select Kafka from the list on the left.

    Service list with Kafka highlighted.

  3. To view Kafka configuration, select Configs from the top middle.

    Apache Ambari services configuration.

  4. To find the kafka-env configuration, enter kafka-env in the Filter field on the upper right.

    Kafka configuration, for kafka-env.

  5. To configure Kafka to advertise IP addresses, add the following text to the bottom of the kafka-env-template field:

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  6. To configure the interface that Kafka listens on, enter listeners in the Filter field on the upper right.

  7. To configure Kafka to listen on all network interfaces, change the value in the listeners field to PLAINTEXT://0.0.0.0:9092.

  8. To save the configuration changes, use the Save button. Enter a text message describing the changes. Select OK once the changes have been saved.

    Apache Ambari save configuration.

  9. To prevent errors when restarting Kafka, use the Service Actions button and select Turn On Maintenance Mode. Select OK to complete this operation.

    Service actions, with turn on maintenance highlighted.

  10. To restart Kafka, use the Restart button and select Restart All Affected. Confirm the restart, and then use the OK button after the operation has completed.

    Restart button with restart all affected highlighted.

  11. To disable maintenance mode, use the Service Actions button and select Turn Off Maintenance Mode. Select OK to complete this operation.

Connect to the VPN gateway

To connect to the VPN gateway, use the Connect to Azure section of the Configure a Point-to-Site connection document.

Example: Python client

To validate connectivity to Kafka, use the following steps to create and run a Python producer and consumer:

  1. Use one of the following methods to retrieve the fully qualified domain name (FQDN) and IP addresses of the nodes in the Kafka cluster:

    $resourceGroupName = "The resource group that contains the virtual network used with HDInsight"
    
    $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"}
    
    $nodes = @()
    foreach($nic in $clusterNICs) {
        $node = new-object System.Object
        $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1]
        $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress
        $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn
        $nodes += $node
    }
    $nodes | sort-object Type
    
    az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
    

    This script assumes that $resourceGroupName is the name of the Azure resource group that contains the virtual network.

    Save the returned information for use in the next steps.

  2. Use the following to install the kafka-python client:

    pip install kafka-python
    
  3. To send data to Kafka, use the following Python code:

    from kafka import KafkaProducer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # NOTE: you don't need the full list of worker nodes, just one or two.
    producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'])
    for _ in range(50):
       producer.send('testtopic', b'test message')
    

    Replace the 'kafka_broker' entries with the addresses returned from step 1 in this section:

    • If you are using a Software VPN client, replace the kafka_broker entries with the IP address of your worker nodes.

    • If you have enabled name resolution through a custom DNS server, replace the kafka_broker entries with the FQDN of the worker nodes.

      Note

      This code sends the string test message to the topic testtopic. The default configuration of Kafka on HDInsight is not to create the topic if it does not exist. See How to configure Apache Kafka on HDInsight to automatically create topics. Alternatively, you can create topics manually before producing messages.

  4. To retrieve the messages from Kafka, use the following Python code:

    from kafka import KafkaConsumer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # Again, you only need one or two, not the full list.
    # Note: auto_offset_reset='earliest' resets the starting offset to the beginning
    # of the topic
    consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest')
    consumer.subscribe(['testtopic'])
    for msg in consumer:
      print (msg)
    

    Replace the 'kafka_broker' entries with the addresses returned from step 1 in this section:

    • If you are using a Software VPN client, replace the kafka_broker entries with the IP address of your worker nodes.

    • If you have enabled name resolution through a custom DNS server, replace the kafka_broker entries with the FQDN of the worker nodes.

Next steps

For more information on using HDInsight with a virtual network, see the Plan a virtual network deployment for Azure HDInsight clusters document.

For more information on creating an Azure Virtual Network with Point-to-Site VPN gateway, see the following documents:

For more information on working with Apache Kafka on HDInsight, see the following documents: