Azure Text Analytics is an AI service that uncovers insights such as sentiment, entities, relations and key phrases in unstructured text. There is an excellent documentation providing a step by step guide of how to get started with the Text Analytics client library and REST API. However, I've recently started concerning myself with the issue of parellizing the execution of these code snippets. Thus, in this article we are going to see how to call the text Analytics API in parallel. Up we go!
Prerequisites
Azure subscription - Create one for free
Python 3.x
Once you have your Azure subscription, create a Text Analytics resource in the Azure portal to get your key and endpoint. After it deploys, click Go to resource.
You will need the key and endpoint from the resource you create to connect your application to the Text Analytics API. You'll paste your key and endpoint into the code below later in the quickstart.
You can use the free pricing tier (F0) to try the service, and upgrade later to a paid tier for production.
To use the Analyze feature, you will need a Text Analytics resource with the standard (S) pricing tier.
For further pricing details please refer to the official reference.
Important: while creating the ressource you may have noticed that you can send up to 1k calls per minute.
Great, but how to send 1k calls per minute? That's why we also need to create a databrcisk resource. Follow the steps listed in this reference, and switcht to the next step.
Cluster Configuration
There's nothing special with this step, just configure the cluster according to the parallelization level you would like to have. Here's my cluster configuration:
As you see, pretty straightforward. You may augment the number of workers, and improve each worker, but do not forget about the usage quota.
Enough talking, let's code
Imports
import torch
import random
import pandas as pd
from datetime import datetime
from azure.ai.textanalytics import TextAnalyticsClient
from azure.core.credentials import AzureKeyCredential
from pyspark.sql.types import *
from pyspark.sql.functions import col
We've imported pyspark.sql types and functions, in order to parallelize our code.
Define your constants
KEY = "<your-key>"
ENDPOINT = '<your-endpoints>'
Now let's read the data and convert it to spark dataframe (I've previously uploaded my csv file to the databricks workspace, of cource).
proverbs = pd.read_csv('proverbs.tsv', sep='\t')
proverbs_spark = spark.createDataFrame(proverbs)
Here I've used a relatvily large dataset of 50k french proverbs, that we're going to process with the Text Analytics API.
Now create the helper functions.
def authenticate_client():
ta_credential = AzureKeyCredential(KEY)
text_analytics_client = TextAnalyticsClient(
endpoint=ENDPOINT,
credential=ta_credential)
return text_analytics_client
def text_to_document(text):
return [{'id':1, 'language':'fr', 'text':text}]
As you have noticed the helper functions are exactly the same as in the official reference.
Now let us create an NLP function that calls the text analytics api and get the named entities from each row of our spark dataframe
@udf(returnType=StringType())
def get_named_entities_udf(text):
output = []
try:
documents = text_to_document(text)
result = client.recognize_entities(documents = documents)[0]
for entity in result.entities:
categorized_entity = {
"text":entity.text,
"category":entity.category,
"subcategory":entity.subcategory,
"confidence_score":entity.confidence_score
}
output.append(categorized_entity)
except:
pass
return str(output)
You may have noticed that we'v defined our function as UDF. This is needed to apply this function to all the rows in parallel.
Now you can authenticate the client.
client = authenticate_client()
And set the start time
start = datetime.now()
Now we can run the code in parallel
proverbs_spark = proverbs_spark.withColumn("NamedEntities", get_named_entities_udf(col("text"))).repartition(36)
Important: I've set the repartition level to 36 as I had 8 workers with 4 cores, and one driver which is the same as the worker. 4*9=36.
This will launch the 'lazy' evaluation of your code, so add some command to actually execute the code and estimate the elapsed time.
proverbs_spark.show(10)
end = datetime.now()
print ('Elapsed time {} min'.format((end - start).total_seconds() / 60.0))
And don't forget to save your data as a managed dataset.
proverbs_spark.write.mode("overwrite").saveAsTable("proverbs_ner")
This is it!
Discussions
The dataset with 50k lines took me about 40 mins to execute, comparing to the classical (serial) run, which took the whole night. Pretty efficient, right? However, text analytics api allows running the code in batch (10 rows per http request), so theoretically it may run x10 times faster, but it is still unclear how to convert this function to UDF. Rookie will continue his research and will keep you updated.
Hope this was uselful.
Comments