Indexing OneLake data into Elasticsearch - Part 1

Learn to configure OneLake, consume data using Python and index documents in Elasticsearch to then run semantic searches.

OneLake is a tool that allows you to connect to different Microsoft data sources like Power BI, Data Activator, and Data factory, among others. It enables centralization of data in DataLakes, large-volume repositories that support comprehensive data storage, analysis, and processing.

In this article, we’ll learn to configure OneLake, consume data using Python and index documents in Elasticsearch to then run semantic searches.

Sometimes you would want to run searches across unstructured data, and structured from different sources and software providers, and create visualizations with Kibana. For this kind of task indexing the documents in Elasticsearch as a central repository becomes extremely helpful.

For this example, we’ll use a fake company called Shoestic, an online shoe store. We have the list of products in a structured file (CSV) while some of the products’ datasheets are in an unstructured format (DOCX). The files are stored in OneLake.

You can find a Notebook with the complete example (including test documents) here.

Steps

  1. OneLake initial configuration
  2. Connect to OneLake using Python
  3. Indexing documents
  4. Queries

OneLake initial configuration

OneLake architecture can be summarized like this:

To use OneLake and Microsoft Fabric, we’ll need an Office 365 account. If you don’t have one, you can create a trial account here.

Log into Microsoft Fabric using your account. Then, create a workspace called "ShoesticWorkspace". Once inside the newly created workspace, create a Lakehouse and name it "ShoesticDatalake". The last step will be creating a new folder inside “Files”. Click on “new subfolder” and name it "ProductsData".

Done! We're ready to begin ingesting our data.

Connect to OneLake using Python

With our OneLake configured, we can now prepare the Python scripts. Azure has libraries to handle credentials and communicate with OneLake.

Installing dependencies

Run the following in the terminal to install dependencies

pip install azure-identity elasticsearch==8.14 azure-storage-file-datalake azure-cli python-docx

The "azure-identity azure-storage-file-datalake" library lets us interact with OneLake while "azure-cli" access credentials and grant permissions. To read the files’ content to later index it to Elasticsearch, we use python-docx.

Saving Microsoft Credentials in our local environment

We’ll use "az login" to enter our Microsoft account and run:

az login --allow-no-subscriptions

The flag " --allow-no-subscriptions" allows us to authenticate to Microsoft Azure without an active subscription.

This command will open a browser window in which you’ll have to access your account and then select your account’s subscription number.

We’re now ready to start writing the code!

Create a file called onelake.py and add the following:

_onelake.py_

# Importing dependencies
import chardet
from azure.identity import DefaultAzureCredential
from docx import Document
from azure.storage.filedatalake import DataLakeServiceClient
# Initializing the OneLake client
ONELAKE_ACCOUNT_NAME = "onelake"
ONELAKE_WORKSPACE_NAME = "ShoesticWorkspace"
# Path in format <DataLake>.Lakehouse/files/<Folder path>
ONELAKE_DATA_PATH = "shoesticDatalake.Lakehouse/Files/ProductsData"
# Microsoft token
token_credential = DefaultAzureCredential()
# OneLake services
service_client = DataLakeServiceClient(
account_url=f"https://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com",
credential=token_credential,
)
file_system_client = service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME)
directory_client = file_system_client.get_directory_client(ONELAKE_DATA_PATH)
# OneLake functions
# Upload a file to a LakeHouse directory
def upload_file_to_directory(directory_client, local_path, file_name):
file_client = directory_client.get_file_client(file_name)
with open(local_path, mode="rb") as data:
file_client.upload_data(data, overwrite=True)
print(f"File: {file_name} uploaded to the data lake.")
# Get directory contents from your lake folder
def list_directory_contents(file_system_client, directory_name):
paths = file_system_client.get_paths(path=directory_name)
for path in paths:
print(path.name + "\n")
# Get a file by name from your lake folder
def get_file_by_name(file_name, directory_client):
return directory_client.get_file_client(file_name)
# Decode docx
def get_docx_content(file_client):
download = file_client.download_file()
file_content = download.readall()
temp_file_path = "temp.docx"
with open(temp_file_path, "wb") as temp_file:
temp_file.write(file_content)
doc = Document(temp_file_path)
text = []
for paragraph in doc.paragraphs:
text.append(paragraph.text)
return "\n".join(text)
# Decode csv
def get_csv_content(file_client):
download = file_client.download_file()
file_content = download.readall()
result = chardet.detect(file_content)
encoding = result["encoding"]
return file_content.decode(encoding)

Uploading files to OneLake

In this example, we’ll use a CSV file and some .docx files with info about our shoe store products. Though you can upload them using the UI, we’ll do it with Python. Download the files here.

We’ll place the files in a folder /data next to a new python script called upload_files.py:

# upload_files.py
# Importing dependencies
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient
from functions import list_directory_contents, upload_file_to_directory
from onelake import ONELAKE_DATA_PATH, directory_client, file_system_client
csv_file_name = "products.csv"
csv_local_path = f"./data/{csv_file_name}"
docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"]
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files]
# Upload files to Lakehouse
upload_file_to_directory(directory_client, csv_local_path, csv_file_name)
for docx_local_path in docx_local_paths:
docx_file_name = docx_local_path.split("/")[-1]
upload_file_to_directory(directory_client, docx_local_path, docx_file_name)
# To check that the files have been uploaded, run "list_directory_contents" function to show the contents of the /ProductsData folder in our Datalake:
print("Upload finished, Listing files: ")
list_directory_contents(file_system_client, ONELAKE_DATA_PATH)

Run the upload script

python upload_files.py

The result should be:

Upload finished, Listing files:
shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx
shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx
shoesticDatalake.Lakehouse/Files/ProductsData/products.csv
shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx

Now that we have the files ready, let’s start analyzing and searching our data with Elasticsearch!

Indexing documents

We’ll be using ELSER as the embedding provider for our vector database so we can run semantic queries.

We choose ELSER because it is optimized for Elasticsearch, outperforming most of the competition in out-of-domain retrieval, which means using the model as it is, without fine tuning it for your own data.

Configuring ELSER

Start by creating the inference endpoint:

PUT _inference/sparse_embedding/onelake-inference-endpoint
{
"service": "elser",
"service_settings": {
"num_allocations": 1,
"num_threads": 1
}

While loading the model in the background, you can get a 502 Bad Gateway error if you haven’t used ELSER before. In Kibana, you can check the model status at Machine Learning > Trained Models. Wait until the model is deployed before proceeding to the next steps.

Index data

Now, since we have both structured and unstructured data, we’ll use two different indices with different mappings as well in the Kibana DevTools Console.

For our structured sales let’s create the following index:

PUT shoestic-products
{
"mappings": {
"properties": {
"product_id": {
"type": "keyword"
},
"product_name": {
"type": "text"
},
"amount": {
"type": "float"
},
"tags": {
"type": "keyword"
}
}
}
}

And to index our unstructured data (product datasheets) we'll use:

PUT shoestic-products-descriptions
{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "english"
},
"super_body": {
"type": "semantic_text",
"inference_id": "onelake-inference-endpoint"
},
"body": {
"type": "text",
"copy_to": "super_body"
}
}
}
}

Note: It’s important to use a field with copy_to to also allow running full-text and not just semantic searches on the body field.

Reading OneLake files

Before we begin, we need to initialize our Elasticsearch client using these commands (with your own Cloud ID and API-key).

Create a python script called indexing.py and add the following lines:

# Importing dependencies
import csv
from io import StringIO
from onelake import directory_client
from elasticsearch import Elasticsearch, helpers
from functions import get_csv_content, get_docx_content, get_file_by_name
from upload_files_to_onelake import csv_file_client
ELASTIC_CLUSTER_ID = "your-cloud-id"
ELASTIC_API_KEY = "your-api-key"
# Elasticsearch client
es_client = Elasticsearch(
cloud_id=ELASTIC_CLUSTER_ID,
api_key=ELASTIC_API_KEY,
)
docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"]
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files]
csv_file_client = get_file_by_name("products.csv", directory_client)
docx_files_clients = []
for docx_file_name in docx_files:
docx_files_clients.append(get_file_by_name(docx_file_name, directory_client))
# We use these functions to extract data from the files:
csv_content = get_csv_content(csv_file_client)
reader = csv.DictReader(StringIO(csv_content))
docx_contents = []
for docx_file_client in docx_files_clients:
docx_contents.append(get_docx_content(docx_file_client))
print("CSV FILE CONTENT: ", csv_content)
print("DOCX FILE CONTENT: ", docx_contents)
# The CSV tags are separated by commas (,). We'll turn these tags into an array:
rows = csv_content.splitlines()
reader = csv.DictReader(rows)
modified_rows = []
for row in reader:
row["tags"] = row["tags"].replace('"', "").split(",")
modified_rows.append(row)
print(row["tags"])
# We can now index the files into Elasticsearch
reader = modified_rows
csv_actions = [{"_index": "shoestic-products", "_source": row} for row in reader]
docx_actions = [
{
"_index": "shoestic-products-descriptions",
"_source": {"title": docx_file_name, "body": docx},
}
for docx_file_name, docx in zip(docx_files, docx_contents)
]
helpers.bulk(es_client, csv_actions)
print("CSV data indexed successfully.")
helpers.bulk(es_client, docx_actions)
print("DOCX data indexed successfully.")

Now, run the script:

python indexing.py

Queries

Once the documents have been indexed in Elasticsearch, we can test the semantic queries. In this case, we’ll search for a unique term in some of the products (tag). We’ll run a keyword search against the structured data, and a semantic one against the unstructured data.

1. Keyword search

GET shoestic-products/_search
{
"query": {
"term": {
"tags": "summer"
}
}
}

Result:

"_source": {
"product_id": "P-118",
"product_name": "Casual Sandals",
"amount": "128.22",
"tags": [
"casual",
"summer"
]
}

2. Semantic search:

GET shoestic-products-descriptions/_search
{
"_source": {
"excludes": [
"*embeddings",
"*chunks"
]
},
"query": {
"semantic": {
"field": "super_body",
"query": "summer"
}
}
}

*We excluded embeddings and chunks just for readability.

Result:

"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 4.3853106,
"hits": [
{
"_index": "shoestic-products-descriptions",
"_id": "P2Hj6JIBF7lnCNFTDQEA",
"_score": 4.3853106,
"_source": {
"super_body": {
"inference": {
"inference_id": "onelake-inference-endpoint",
"model_settings": {
"task_type": "sparse_embedding"
}
}
},
"title": "beach-flip-flops.docx",
"body": "Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun."
}
}
]
}


As you can see, when using the keyword search, we got an exact match to one of the tags and in contrast, when we used semantic search, we got a result that matches the meaning in the description, without needing an exact match. 

Conclusion

OneLake makes it easier to consume data from different Microsoft sources and then indexing these documents Elasticsearch allows us to use advanced search tools. In this first part, we learnt how to connect to OneLake and index documents in Elasticsearch. In part two, we’ll make a more robust solution using the Elastic connector framework.

Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!

Elasticsearch is packed with new features to help you build the best search solutions for your use case. Dive into our sample notebooks to learn more, start a free cloud trial, or try Elastic on your local machine now.

Ready to build state of the art search experiences?

Sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself