Indexing OneLake data into Elasticsearch - Part II

Second part of a two-part article to index and search OneLake data into Elastic using a Custom connector.

In this article, we'll put to use what we learned in part 1 to create a OneLake custom Elasticsearch connector.

We have already uploaded some OneLake documents and indexed them into Elasticsearch for search. However, this only works with a one time upload. If we want to have synchronized data, then we need to develop a more complex system.

Luckily, Elastic has a connectors framework available to develop custom connectors to fit our needs:

We'll make now make a OneLake connector based on this article: How to create custom connectors for Elasticsearch

Steps

  1. Connector Bootstrapping
  2. Implementing BaseDataSource Class
  3. Authentication
  4. Running the connector
  5. Configuring Schedule

Connector Bootstrapping

For context, there are two types of Elastic connectors:

Custom connectors fall into the “Connector Client” category, so we need to download and deploy the connectors framework.

Let's begin by cloning the connectors repository:

git clone https://github.com/elastic/connectors

Now add the dependencies you will use at the end of the requirements/framework.txt file. In this case:

azure-identity==1.19.0
azure-storage-file-datalake==12.17.0

With this, the repository is done and we can begin to code.

Implementing BaseDataSource Class

You can find the full working code on this repository.

We will go through the core pieces in the onelake.py file.

After the imports and class declaration, we must define our __init__ method which will capture the configuration parameters.

"""OneLake connector to retrieve data from datalakes"""
from functools import partial
from azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClient
from connectors.source import BaseDataSource
ACCOUNT_NAME = "onelake"
class OneLakeDataSource(BaseDataSource):
"""OneLake"""
name = "OneLake"
service_type = "onelake"
incremental_sync_enabled = True
# Here we can enter the data that we'll later need to connect our connector to OneLake.
def __init__(self, configuration):
"""Set up the connection to the azure base client
Args:
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
"""
super().__init__(configuration=configuration)
self.tenant_id = self.configuration["tenant_id"]
self.client_id = self.configuration["client_id"]
self.client_secret = self.configuration["client_secret"]
self.workspace_name = self.configuration["workspace_name"]
self.data_path = self.configuration["data_path"]

Then, you can configure the form the UI will show to fill those parameters using the get_default_configuration method which returns a configuration dictionary.

# Method to generate the Enterprise Search UI fields for the variables we need to connect to OneLake.
@classmethod
def get_default_configuration(cls):
"""Get the default configuration for OneLake
Returns:
dictionary: Default configuration
"""
return {
"tenant_id": {
"label": "OneLake tenant id",
"order": 1,
"type": "str",
},
"client_id": {
"label": "OneLake client id",
"order": 2,
"type": "str",
},
"client_secret": {
"label": "OneLake client secret",
"order": 3,
"type": "str",
"sensitive": True, # To hide sensitive data like passwords or secrets
},
"workspace_name": {
"label": "OneLake workspace name",
"order": 4,
"type": "str",
},
"data_path": {
"label": "OneLake data path",
"tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>",
"order": 5,
"type": "str",
},
"account_name": {
"tooltip": "In the most cases is 'onelake'",
"default_value": ACCOUNT_NAME,
"label": "Account name",
"order": 6,
"type": "str",
},
}

Then we configure the methods to download, and extract the content from the OneLake documents.

async def download_file(self, file_client):
"""Download file from OneLake
Args:
file_client (obj): File client
Returns:
generator: File stream
"""
try:
download = file_client.download_file()
stream = download.chunks()
for chunk in stream:
yield chunk
except Exception as e:
self._logger.error(f"Error while downloading file: {e}")
raise
async def get_content(self, file_name, doit=None, timestamp=None):
"""Obtains the file content for the specified file in `file_name`.
Args:
file_name (obj): The file name to process to obtain the content.
timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None.
doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.
Returns:
str: Content of the file or None if not applicable.
"""
if not doit:
return
file_client = await self._get_file_client(file_name)
file_properties = file_client.get_file_properties()
file_extension = self.get_file_extension(file_name)
doc = {
"_id": f"{file_client.file_system_name}_{file_properties.name}", # workspacename_data_path
"name": file_properties.name.split("/")[-1],
"_timestamp": file_properties.last_modified,
"created_at": file_properties.creation_time,
}
can_be_downloaded = self.can_file_be_downloaded(
file_extension=file_extension,
filename=file_properties.name,
file_size=file_properties.size,
)
if not can_be_downloaded:
return doc
extracted_doc = await self.download_and_extract_file(
doc=doc,
source_filename=file_properties.name.split("/")[-1],
file_extension=file_extension,
download_func=partial(self.download_file, file_client),
)
return extracted_doc if extracted_doc is not None else doc

To make our connector visible to the framework, we need to declare it in the connectors/config.py file. For this, we add the following code to sources:

"sources": {
...
"onelake": "connectors.sources.onelake:OneLakeDataSource",
...
}

Authentication

Before testing the connector, we need to get the client_id, tenant_id, and client_secret that we'll use to access the Workspace from the connector.

We will use service principals as authentication method.

An Azure service principal is an identity created for use with applications, hosted services, and automated tools to access Azure resources.

The steps are:

  1. Creating an application, and gathering client_id, tenant_id, and client_secret
  2. Enabling service principal in your workspace
  3. Adding the service principal to your workspace

You can follow this tutorial step by step.

Ready? Now it's time to test the connector!

Running the conector

With the connector ready, we can now connect to our Elasticsearch instance.

Go to : Search > Content > Connectors > New connector and choose Customized Connector

Choose a name to create, and then select Create and attach an index to create a new index with the same name as the connector.

You can now run it using Docker or run it from source. In this example, we'll use "Run from source".

Click on Generate Configuration and paste the content from the box on the file config.yml file at the project's root. On the field service_type you must match the connector's name in connectors/config.py. In this case, replace changeme with onelake.

Now you can run the connector with these commands:

make install
make run

If the connector was correctly initialized, you should see a message like this in the console:

Note: If you get a compatibility error,check your connectors/VERSION file and compare with your Elasticsearch cluster version: Version compatibility with Elasticsearch We recommend keeping the connector version and Elasticsearch version in sync. For this article we are using Elasticsearch and connector version 8.15.

If everything went fine, our local connector will communicate with our Elasticsearch cluster and we'll be able to configure it using our OneLake credentials:

We'll now index the documents from OneLake. To do this, run a Full Content Sync by clicking on Sync > Full Content:

Once the sync is over, you should see this in the console:

At the Enterprise Search UI, you can click Documents to see the indexed documents:

Configure Schedule

You can schedule recurring content syncs using the UI based on your needs to keep your index updated and in sync with OneLake.

To configure scheduled syncs go to Search > Content > Connectors and select your connector. Then click on scheduling:

As an alternative, you can use the Update connector scheduling API which allows CRON expressions.

Conclusion

In this second part, we took our configuration one step further by using the Elastic connectors framework and developing our own OneLake connector to easily communicate with our Elastic Cloud instance.

Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!

Elasticsearch is packed with new features to help you build the best search solutions for your use case. Dive into our sample notebooks to learn more, start a free cloud trial, or try Elastic on your local machine now.

Ready to build state of the art search experiences?

Sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself