Transform data in the cloud by using Spark activity in Azure Data Factory

APPLIES TO: Azure Data Factory Azure Synapse Analytics

In this tutorial, you use Azure PowerShell to create a Data Factory pipeline that transforms data using Spark Activity and an on-demand HDInsight linked service. You perform the following steps in this tutorial:

  • Create a data factory.
  • Author and deploy linked services.
  • Author and deploy a pipeline.
  • Start a pipeline run.
  • Monitor the pipeline run.

If you don't have an Azure subscription, create a trial account before you begin.

Prerequisites

Note

We recommend that you use the Azure Az PowerShell module to interact with Azure. See Install Azure PowerShell to get started. To learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.

  • Azure Storage account. You create a Python script and an input file, and upload them to the Azure storage. The output from the spark program is stored in this storage account. The on-demand Spark cluster uses the same storage account as its primary storage.
  • Azure PowerShell. Follow the instructions in How to install and configure Azure PowerShell.

Upload Python script to your Blob Storage account

  1. Create a Python file named WordCount_Spark.py with the following content:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Replace <storageAccountName> with the name of your Azure Storage account. Then, save the file.

  3. In your Azure Blob Storage, create a container named adftutorial if it does not exist.

  4. Create a folder named spark.

  5. Create a subfolder named script under spark folder.

  6. Upload the WordCount_Spark.py file to the script subfolder.

Upload the input file

  1. Create a file named minecraftstory.txt with some text. The spark program counts the number of words in this text.
  2. Create a subfolder named inputfiles in the spark folder.
  3. Upload the minecraftstory.txt to the inputfiles subfolder.

Author linked services

You author two Linked Services in this section:

  • An Azure Storage Linked Service that links an Azure Storage account to the data factory. This storage is used by the on-demand HDInsight cluster. It also contains the Spark script to be executed.
  • An On-Demand HDInsight Linked Service. Azure Data Factory automatically creates a HDInsight cluster, run the Spark program, and then deletes the HDInsight cluster after it's idle for a pre-configured time.

Azure Storage linked service

Create a JSON file using your preferred editor, copy the following JSON definition of an Azure Storage linked service, and then save the file as MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>;EndpointSuffix=core.chinacloudapi.cn"
      }
    }
}

Update the <storageAccountName> and <storageAccountKey> with the name and key of your Azure Storage account.

On-demand HDInsight linked service

Create a JSON file using your preferred editor, copy the following JSON definition of an Azure HDInsight linked service, and save the file as MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Update values for the following properties in the linked service definition:

  • hostSubscriptionId. Replace <subscriptionID> with the ID of your Azure subscription. The on-demand HDInsight cluster is created in this subscription.
  • tenant. Replace <tenantID> with ID of your Azure tenant.
  • servicePrincipalId, servicePrincipalKey. Replace <servicePrincipalID> and <servicePrincipalKey> with ID and key of your service principal in the Microsoft Entra ID. This service principal needs to be a member of the Contributor role of the subscription or the resource Group in which the cluster is created. See create Microsoft Entra application and service principal for details. The Service principal id is equivalent to the Application ID and a Service principal key is equivalent to the value for a Client secret.
  • clusterResourceGroup. Replace <resourceGroupOfHDICluster> with the name of the resource group in which the HDInsight cluster needs to be created.

Note

Azure HDInsight has limitation on the total number of cores you can use in each Azure region it supports. For On-Demand HDInsight Linked Service, the HDInsight cluster will be created in the same location of the Azure Storage used as its primary storage. Ensure that you have enough core quotas for the cluster to be created successfully. For more information, see Set up clusters in HDInsight with Hadoop, Spark, Kafka, and more.

Author a pipeline

In this step, you create a new pipeline with a Spark activity. The activity uses the word count sample. Download the contents from this location if you haven't already done so.

Create a JSON file in your preferred editor, copy the following JSON definition of a pipeline definition, and save it as MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Note the following points:

  • rootPath points to the spark folder of the adftutorial container.
  • entryFilePath points to the WordCount_Spark.py file in the script sub folder of the spark folder.

Create a data factory

You have authored linked service and pipeline definitions in JSON files. Now, let's create a data factory, and deploy the linked Service and pipeline JSON files by using PowerShell cmdlets. Run the following PowerShell commands one by one:

  1. Set variables one by one.

    Resource Group Name

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Data Factory Name. Must be globally unique

    $dataFactoryName = "MyDataFactory09102017"
    

    Pipeline name

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Launch PowerShell. Keep Azure PowerShell open until the end of this quickstart. If you close and reopen, you need to run the commands again. For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

    Run the following command, and enter the user name and password that you use to sign in to the Azure portal:

    Connect-AzAccount -Environment AzureChinaCloud
    

    Run the following command to view all the subscriptions for this account:

    Get-AzSubscription
    

    Run the following command to select the subscription that you want to work with. Replace SubscriptionId with the ID of your Azure subscription:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Create the resource group: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "China East 2" 
    
  4. Create the data factory.

     $df = Set-AzDataFactoryV2 -Location ChinaEast 2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Execute the following command to see the output:

    $df
    
  5. Switch to the folder where you created JSON files, and run the following command to deploy an Azure Storage linked service:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Run the following command to deploy an on-demand Spark linked service:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Run the following command to deploy a pipeline:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Start and monitor a pipeline run

  1. Start a pipeline run. It also captures the pipeline run ID for future monitoring.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Run the following script to continuously check the pipeline run status until it finishes.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Here is the output of the sample run:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.cn/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (China East 2)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Confirm that a folder named outputfiles is created in the spark folder of adftutorial container with the output from the spark program.

The pipeline in this sample copies data from one location to another location in an Azure blob storage. You learned how to:

  • Create a data factory.
  • Author and deploy linked services.
  • Author and deploy a pipeline.
  • Start a pipeline run.
  • Monitor the pipeline run.

Advance to the next tutorial to learn how to transform data by running Hive script on an Azure HDInsight cluster that is in a virtual network.