OneLake is a tool that allows you to connect to different Microsoft data sources like Power BI, Data Activator, and Data factory, among others. It enables centralization of data in DataLakes, large-volume repositories that support comprehensive data storage, analysis, and processing.
In this article, we’ll learn to configure OneLake, consume data using Python and index documents in Elasticsearch to then run semantic searches.
Sometimes you would want to run searches across unstructured data, and structured from different sources and software providers, and create visualizations with Kibana. For this kind of task indexing the documents in Elasticsearch as a central repository becomes extremely helpful.
For this example, we’ll use a fake company called Shoestic, an online shoe store. We have the list of products in a structured file (CSV) while some of the products’ datasheets are in an unstructured format (DOCX). The files are stored in OneLake.
You can find a Notebook with the complete example (including test documents) here.
Steps
OneLake initial configuration
OneLake architecture can be summarized like this:
To use OneLake and Microsoft Fabric, we’ll need an Office 365 account. If you don’t have one, you can create a trial account here.
Log into Microsoft Fabric using your account. Then, create a workspace called "ShoesticWorkspace". Once inside the newly created workspace, create a Lakehouse and name it "ShoesticDatalake". The last step will be creating a new folder inside “Files”. Click on “new subfolder” and name it "ProductsData".
Done! We're ready to begin ingesting our data.
Connect to OneLake using Python
With our OneLake configured, we can now prepare the Python scripts. Azure has libraries to handle credentials and communicate with OneLake.
Installing dependencies
Run the following in the terminal to install dependencies
pip install azure-identity elasticsearch==8.14 azure-storage-file-datalake azure-cli python-docx
The "azure-identity azure-storage-file-datalake" library lets us interact with OneLake while "azure-cli" access credentials and grant permissions. To read the files’ content to later index it to Elasticsearch, we use python-docx.
Saving Microsoft Credentials in our local environment
We’ll use "az login" to enter our Microsoft account and run:
az login --allow-no-subscriptions
The flag " --allow-no-subscriptions" allows us to authenticate to Microsoft Azure without an active subscription.
This command will open a browser window in which you’ll have to access your account and then select your account’s subscription number.
We’re now ready to start writing the code!
Create a file called onelake.py
and add the following:
_onelake.py_
# Importing dependencies import chardet from azure.identity import DefaultAzureCredential from docx import Document from azure.storage.filedatalake import DataLakeServiceClient # Initializing the OneLake client ONELAKE_ACCOUNT_NAME = "onelake" ONELAKE_WORKSPACE_NAME = "ShoesticWorkspace" # Path in format <DataLake>.Lakehouse/files/<Folder path> ONELAKE_DATA_PATH = "shoesticDatalake.Lakehouse/Files/ProductsData" # Microsoft token token_credential = DefaultAzureCredential() # OneLake services service_client = DataLakeServiceClient( account_url=f"https://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com", credential=token_credential, ) file_system_client = service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME) directory_client = file_system_client.get_directory_client(ONELAKE_DATA_PATH) # OneLake functions # Upload a file to a LakeHouse directory def upload_file_to_directory(directory_client, local_path, file_name): file_client = directory_client.get_file_client(file_name) with open(local_path, mode="rb") as data: file_client.upload_data(data, overwrite=True) print(f"File: {file_name} uploaded to the data lake.") # Get directory contents from your lake folder def list_directory_contents(file_system_client, directory_name): paths = file_system_client.get_paths(path=directory_name) for path in paths: print(path.name + "\n") # Get a file by name from your lake folder def get_file_by_name(file_name, directory_client): return directory_client.get_file_client(file_name) # Decode docx def get_docx_content(file_client): download = file_client.download_file() file_content = download.readall() temp_file_path = "temp.docx" with open(temp_file_path, "wb") as temp_file: temp_file.write(file_content) doc = Document(temp_file_path) text = [] for paragraph in doc.paragraphs: text.append(paragraph.text) return "\n".join(text) # Decode csv def get_csv_content(file_client): download = file_client.download_file() file_content = download.readall() result = chardet.detect(file_content) encoding = result["encoding"] return file_content.decode(encoding)
Uploading files to OneLake
In this example, we’ll use a CSV file and some .docx files with info about our shoe store products. Though you can upload them using the UI, we’ll do it with Python. Download the files here.
We’ll place the files in a folder /data next to a new python script called upload_files.py
:
# upload_files.py # Importing dependencies from azure.identity import DefaultAzureCredential from azure.storage.filedatalake import DataLakeServiceClient from functions import list_directory_contents, upload_file_to_directory from onelake import ONELAKE_DATA_PATH, directory_client, file_system_client csv_file_name = "products.csv" csv_local_path = f"./data/{csv_file_name}" docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] # Upload files to Lakehouse upload_file_to_directory(directory_client, csv_local_path, csv_file_name) for docx_local_path in docx_local_paths: docx_file_name = docx_local_path.split("/")[-1] upload_file_to_directory(directory_client, docx_local_path, docx_file_name) # To check that the files have been uploaded, run "list_directory_contents" function to show the contents of the /ProductsData folder in our Datalake: print("Upload finished, Listing files: ") list_directory_contents(file_system_client, ONELAKE_DATA_PATH)
Run the upload script
python upload_files.py
The result should be:
Upload finished, Listing files: shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx shoesticDatalake.Lakehouse/Files/ProductsData/products.csv shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx
Now that we have the files ready, let’s start analyzing and searching our data with Elasticsearch!
Indexing documents
We’ll be using ELSER as the embedding provider for our vector database so we can run semantic queries.
We choose ELSER because it is optimized for Elasticsearch, outperforming most of the competition in out-of-domain retrieval, which means using the model as it is, without fine tuning it for your own data.
Configuring ELSER
Start by creating the inference endpoint:
PUT _inference/sparse_embedding/onelake-inference-endpoint { "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 }
While loading the model in the background, you can get a 502 Bad Gateway error if you haven’t used ELSER before. In Kibana, you can check the model status at Machine Learning > Trained Models. Wait until the model is deployed before proceeding to the next steps.
Index data
Now, since we have both structured and unstructured data, we’ll use two different indices with different mappings as well in the Kibana DevTools Console.
For our structured sales let’s create the following index:
PUT shoestic-products { "mappings": { "properties": { "product_id": { "type": "keyword" }, "product_name": { "type": "text" }, "amount": { "type": "float" }, "tags": { "type": "keyword" } } } }
And to index our unstructured data (product datasheets) we'll use:
PUT shoestic-products-descriptions { "mappings": { "properties": { "title": { "type": "text", "analyzer": "english" }, "super_body": { "type": "semantic_text", "inference_id": "onelake-inference-endpoint" }, "body": { "type": "text", "copy_to": "super_body" } } } }
Note: It’s important to use a field with copy_to to also allow running full-text and not just semantic searches on the body field.
Reading OneLake files
Before we begin, we need to initialize our Elasticsearch client using these commands (with your own Cloud ID and API-key).
Create a python script called indexing.py
and add the following lines:
# Importing dependencies import csv from io import StringIO from onelake import directory_client from elasticsearch import Elasticsearch, helpers from functions import get_csv_content, get_docx_content, get_file_by_name from upload_files_to_onelake import csv_file_client ELASTIC_CLUSTER_ID = "your-cloud-id" ELASTIC_API_KEY = "your-api-key" # Elasticsearch client es_client = Elasticsearch( cloud_id=ELASTIC_CLUSTER_ID, api_key=ELASTIC_API_KEY, ) docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] csv_file_client = get_file_by_name("products.csv", directory_client) docx_files_clients = [] for docx_file_name in docx_files: docx_files_clients.append(get_file_by_name(docx_file_name, directory_client)) # We use these functions to extract data from the files: csv_content = get_csv_content(csv_file_client) reader = csv.DictReader(StringIO(csv_content)) docx_contents = [] for docx_file_client in docx_files_clients: docx_contents.append(get_docx_content(docx_file_client)) print("CSV FILE CONTENT: ", csv_content) print("DOCX FILE CONTENT: ", docx_contents) # The CSV tags are separated by commas (,). We'll turn these tags into an array: rows = csv_content.splitlines() reader = csv.DictReader(rows) modified_rows = [] for row in reader: row["tags"] = row["tags"].replace('"', "").split(",") modified_rows.append(row) print(row["tags"]) # We can now index the files into Elasticsearch reader = modified_rows csv_actions = [{"_index": "shoestic-products", "_source": row} for row in reader] docx_actions = [ { "_index": "shoestic-products-descriptions", "_source": {"title": docx_file_name, "body": docx}, } for docx_file_name, docx in zip(docx_files, docx_contents) ] helpers.bulk(es_client, csv_actions) print("CSV data indexed successfully.") helpers.bulk(es_client, docx_actions) print("DOCX data indexed successfully.")
Now, run the script:
python indexing.py
Queries
Once the documents have been indexed in Elasticsearch, we can test the semantic queries. In this case, we’ll search for a unique term in some of the products (tag). We’ll run a keyword search against the structured data, and a semantic one against the unstructured data.
1. Keyword search
GET shoestic-products/_search { "query": { "term": { "tags": "summer" } } }
Result:
"_source": { "product_id": "P-118", "product_name": "Casual Sandals", "amount": "128.22", "tags": [ "casual", "summer" ] }
2. Semantic search:
GET shoestic-products-descriptions/_search { "_source": { "excludes": [ "*embeddings", "*chunks" ] }, "query": { "semantic": { "field": "super_body", "query": "summer" } } }
*We excluded embeddings and chunks just for readability.
Result:
"hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 4.3853106, "hits": [ { "_index": "shoestic-products-descriptions", "_id": "P2Hj6JIBF7lnCNFTDQEA", "_score": 4.3853106, "_source": { "super_body": { "inference": { "inference_id": "onelake-inference-endpoint", "model_settings": { "task_type": "sparse_embedding" } } }, "title": "beach-flip-flops.docx", "body": "Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun." } } ] }
As you can see, when using the keyword search, we got an exact match to one of the tags and in contrast, when we used semantic search, we got a result that matches the meaning in the description, without needing an exact match.
Conclusion
OneLake makes it easier to consume data from different Microsoft sources and then indexing these documents Elasticsearch allows us to use advanced search tools. In this first part, we learnt how to connect to OneLake and index documents in Elasticsearch. In part two, we’ll make a more robust solution using the Elastic connector framework.
Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!
Elasticsearch is packed with new features to help you build the best search solutions for your use case. Dive into our sample notebooks to learn more, start a free cloud trial, or try Elastic on your local machine now.