In this article, we'll put to use what we learned in part 1 to create a OneLake custom Elasticsearch connector.
We have already uploaded some OneLake documents and indexed them into Elasticsearch for search. However, this only works with a one time upload. If we want to have synchronized data, then we need to develop a more complex system.
Luckily, Elastic has a connectors framework available to develop custom connectors to fit our needs:
We'll make now make a OneLake connector based on this article: How to create custom connectors for Elasticsearch
Steps
- Connector Bootstrapping
- Implementing BaseDataSource Class
- Authentication
- Running the connector
- Configuring Schedule
Connector Bootstrapping
For context, there are two types of Elastic connectors:
- Elastic managed connector: Fully managed and run in Elastic Cloud
- Self managed connector: Self-managed. Must be hosted in your infrastructure
Custom connectors fall into the “Connector Client” category, so we need to download and deploy the connectors framework.
Let's begin by cloning the connectors repository:
git clone https://github.com/elastic/connectors
Now add the dependencies you will use at the end of the requirements/framework.txt
file. In this case:
azure-identity==1.19.0azure-storage-file-datalake==12.17.0
With this, the repository is done and we can begin to code.
Implementing BaseDataSource Class
You can find the full working code on this repository.
We will go through the core pieces in the onelake.py file.
After the imports and class declaration, we must define our __init__
method which will capture the configuration parameters.
"""OneLake connector to retrieve data from datalakes"""
from functools import partial
from azure.identity import ClientSecretCredentialfrom azure.storage.filedatalake import DataLakeServiceClient
from connectors.source import BaseDataSource
ACCOUNT_NAME = "onelake"
class OneLakeDataSource(BaseDataSource): """OneLake"""
name = "OneLake" service_type = "onelake" incremental_sync_enabled = True
# Here we can enter the data that we'll later need to connect our connector to OneLake. def __init__(self, configuration): """Set up the connection to the azure base client
Args: configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. """ super().__init__(configuration=configuration) self.tenant_id = self.configuration["tenant_id"] self.client_id = self.configuration["client_id"] self.client_secret = self.configuration["client_secret"] self.workspace_name = self.configuration["workspace_name"] self.data_path = self.configuration["data_path"]
Then, you can configure the form the UI will show to fill those parameters using the get_default_configuration
method which returns a configuration dictionary.
# Method to generate the Enterprise Search UI fields for the variables we need to connect to OneLake. @classmethod def get_default_configuration(cls): """Get the default configuration for OneLake
Returns: dictionary: Default configuration """ return { "tenant_id": { "label": "OneLake tenant id", "order": 1, "type": "str", }, "client_id": { "label": "OneLake client id", "order": 2, "type": "str", }, "client_secret": { "label": "OneLake client secret", "order": 3, "type": "str", "sensitive": True, # To hide sensitive data like passwords or secrets }, "workspace_name": { "label": "OneLake workspace name", "order": 4, "type": "str", }, "data_path": { "label": "OneLake data path", "tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>", "order": 5, "type": "str", }, "account_name": { "tooltip": "In the most cases is 'onelake'", "default_value": ACCOUNT_NAME, "label": "Account name", "order": 6, "type": "str", }, }
Then we configure the methods to download, and extract the content from the OneLake documents.
async def download_file(self, file_client): """Download file from OneLake
Args: file_client (obj): File client
Returns: generator: File stream """
try: download = file_client.download_file() stream = download.chunks()
for chunk in stream: yield chunk except Exception as e: self._logger.error(f"Error while downloading file: {e}") raise
async def get_content(self, file_name, doit=None, timestamp=None): """Obtains the file content for the specified file in `file_name`.
Args: file_name (obj): The file name to process to obtain the content. timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None. doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.
Returns: str: Content of the file or None if not applicable. """
if not doit: return
file_client = await self._get_file_client(file_name) file_properties = file_client.get_file_properties() file_extension = self.get_file_extension(file_name)
doc = { "_id": f"{file_client.file_system_name}_{file_properties.name}", # workspacename_data_path "name": file_properties.name.split("/")[-1], "_timestamp": file_properties.last_modified, "created_at": file_properties.creation_time, }
can_be_downloaded = self.can_file_be_downloaded( file_extension=file_extension, filename=file_properties.name, file_size=file_properties.size, )
if not can_be_downloaded: return doc
extracted_doc = await self.download_and_extract_file( doc=doc, source_filename=file_properties.name.split("/")[-1], file_extension=file_extension, download_func=partial(self.download_file, file_client), )
return extracted_doc if extracted_doc is not None else doc
To make our connector visible to the framework, we need to declare it in the connectors/config.py
file. For this, we add the following code to sources:
"sources": { ... "onelake": "connectors.sources.onelake:OneLakeDataSource", ... }
Authentication
Before testing the connector, we need to get the client_id
, tenant_id
, and client_secret
that we'll use to access the Workspace from the connector.
We will use service principals as authentication method.
An Azure service principal is an identity created for use with applications, hosted services, and automated tools to access Azure resources.
The steps are:
- Creating an application, and gathering
client_id
,tenant_id
, andclient_secret
- Enabling service principal in your workspace
- Adding the service principal to your workspace
You can follow this tutorial step by step.
Ready? Now it's time to test the connector!
Running the conector
With the connector ready, we can now connect to our Elasticsearch instance.
Go to : Search > Content > Connectors > New connector and choose Customized Connector
Choose a name to create, and then select Create and attach an index to create a new index with the same name as the connector.
You can now run it using Docker or run it from source. In this example, we'll use "Run from source".
Click on Generate Configuration and paste the content from the box on the file config.yml
file at the project's root. On the field service_type you must match the connector's name in connectors/config.py
. In this case, replace changeme
with onelake
.
Now you can run the connector with these commands:
make installmake run
If the connector was correctly initialized, you should see a message like this in the console:
Note: If you get a compatibility error,check your connectors/VERSION
file and compare with your Elasticsearch cluster version: Version compatibility with Elasticsearch We recommend keeping the connector version and Elasticsearch version in sync. For this article we are using Elasticsearch and connector version 8.15.
If everything went fine, our local connector will communicate with our Elasticsearch cluster and we'll be able to configure it using our OneLake credentials:
We'll now index the documents from OneLake. To do this, run a Full Content Sync by clicking on Sync > Full Content:
Once the sync is over, you should see this in the console:
At the Enterprise Search UI, you can click Documents to see the indexed documents:
Configure Schedule
You can schedule recurring content syncs using the UI based on your needs to keep your index updated and in sync with OneLake.
To configure scheduled syncs go to Search > Content > Connectors and select your connector. Then click on scheduling:
As an alternative, you can use the Update connector scheduling API which allows CRON expressions.
Conclusion
In this second part, we took our configuration one step further by using the Elastic connectors framework and developing our own OneLake connector to easily communicate with our Elastic Cloud instance.
Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!
Elasticsearch is packed with new features to help you build the best search solutions for your use case. Dive into our sample notebooks to learn more, start a free cloud trial, or try Elastic on your local machine now.