Hello everybody
Has anybody already used the import from azure blob storage?
We have the following use-case/szenario
We needed to integrate Data from Microsoft CRM Dynamics into our datawarehouse (40 tables), about 4 years ago we established the following pipeline
CRM-Dynamics–> Azure Blob Storage → Azure SQL DB → Exasol
This works fine, but the volume of data in CRM grew over time and the time to read it from Azure SQL DB to Exasol went up to 6 hours per day.
In addition we store the data now in three different places,
- azure Blob Storage
- SQL DB
- Exasol
At last we have multiple export/import logik.
- From CRM to Azure Blob Storage
- From Azure Blob Storage to Azure SQL DB
- From Azure Blob Storage to Exasol
With the latest Exasol version comes the option to import directly from Azure Blob Storage. So we could reduce complexity, and storage.
Solution
In theory it looks very simple, Load data from Azure Blob Storage | Exasol Documentation
However in our environment the biggest challenge was the security. Just to get the firewalls properly configured took us some time.
The other challenge we had is that our Azure Team decided to have one directory per table and within this directory there are 1 to many files. Unfortunatly the import statement does not support wildcards. however we solved this problem, see implementation.
We were able to fasten the extraction dramatically. For example we have the table contact (with 6 mio records), loading this data from SQL Azure DB took us 1 hours 45 minutes, reading it directly from azure blob storage takes us now 4 minutes
Implementation
UDF-Function to get all Filenames in a directory
/**
* UDF, um alle Dateien aus einem Verzeichnis auszulesen. Die Funktion geht nicht rekursiv durch das Verzeichnis durch,
* sondern liefert nur die Dateinamen, welche direkt im Verzeichnis sind
* Bsp.
* contact/
* 2015-12.csv
* 2016-01.csv
* ...
* snapshot/
* 3245325.csv
* ==> die Dateien unter snapshot werden nicht berücksichtigt, sondern nur die Dateien direkt unter contact
*
* Paramter
* httspConObject: Der Name der zu verwendenden connection. Bsp. 'CON_IG_HDL_HTS_DYN_BLOB_POC_DEV'
* container_name: Der Name des zu verwendenden containers.
* folderName: Das ist der Name der Tabelle. Es wird nicht rekursiv durch gegeangen. Bsp 'contact/'
* httpProxy:
* httpsProxy:
*
* Return
* Es wird pro File ein Record returniert
*/
--/
CREATE OR REPLACE PYTHON3 scalar script udf_extractFileAzureBlobFileNames(httpsConObject varchar(100),
container_name varchar(250),
folderName VARCHAR(100),
httpProxy VARCHAR(256), httpsProxy VARCHAR(256)) emits (file_name varchar(20000)) AS
from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobPrefix
from azure.storage.blob import BlobClient, generate_container_sas, ContainerSasPermissions
from azure.core.pipeline.transport import RequestsTransport
import requests
def run(ctx):
conObj= exa.get_connection(ctx.httpsConObject)
account_key=conObj.password
account_name = conObj.user
account_url = 'https://' + account_name + '.blob.' + conObj.address.split("EndpointSuffix=", 1)[1]
prefix=ctx.folderName
start_time = datetime.utcnow()
expiry_time = start_time + timedelta(days=1)
proxies = {
'http':ctx.httpProxy,
'https':ctx.httpsProxy
}
session = requests.Session()
session.proxies.update(proxies)
session.verify = False
session.headers.update({"Connection": "close"})
sas_token = generate_container_sas(
account_name=account_name,
container_name=ctx.container_name,
account_key=account_key,
permission=ContainerSasPermissions(read=True, list=True),
expiry=expiry_time,
start=start_time
)
transport = RequestsTransport(
session=session,
connection_verify=False ,
connection_timeout=30,
read_timeout=30
)
blob_service_client = BlobServiceClient(account_url, credential=sas_token, transport=transport)
container_client = blob_service_client.get_container_client(container=ctx.container_name)
blob_list = container_client.list_blobs(name_starts_with=prefix)
for blob in blob_list:
fileName = f"{blob.name}".replace(prefix,"")
if fileName.find('/') == -1 and fileName !='Snapshot':
ctx.emit(ctx.container_name + '/' + ctx.folderName + fileName)
/
;
We then generate an import-script per table, which looks like followed
--/
CREATE OR REPLACE lua SCRIPT script_lod_dyn_contact() RETURNS rowcount AS
/* definiere das Leeren der Load-Tabelle */
sql_preprocess = [[
truncate table lod_dyn_contact
]]
/* bestimme den oder die Filenamen */
local sql_filename =[[SELECT udf_extractFileAzureBlobFileNames('CON_IG_HDL_HTS_DYN_BLOB_POC_DEV',
'TBD_Container_name', 'contact/',
'TBD-http-proxy', 'TBd-https-proxy') as filename FROM dual;]]
filenameRS = query(sql_filename)
local files=''
fori=1,#filenameRSdo
files = files .. "FILE '" .. filenameRS[i].FILENAME .. "'\n"
end
sql_import = [[
import into LOD_DYN_contact from
csv at cloud azure blobstorage CON_IG_HDL_HTS_DYN_BLOB_POC_DEV
]] .. files .. [[
ROW SEPARATOR ='CRLF'
]]
local suc_preprocess, res_preprocess = pquery(sql_preprocess)
if suc_preprocess
then local suc_import, res_import = pquery(sql_import)
if suc_import
then output("1")
output("Load successful: Inserted=" .. res_import.rows_inserted
.. ", Deleted=" .. res_preprocess.rows_deleted .. ". ")
query([[commit]])
else output("-2")
query([[rollback]])
error ("Error import: " .. res_import.error_message .. ". ScriptEnde=" .. string.sub(res_import.statement_text,-1850))
end
else output("-2")
query([[rollback]])
error ("Error PREprocess: " .. res_preprocess.error_message .. ". ScriptEnde=" .. string.sub(res_preprocess.statement_text,-150))
end
/
;
Conclusion
In theory it is quite simple. With a bit of trial we could also find a way around the missing Wild-card capabilities of the import statement.
As always network-security-settings are always the challenge.
But finally we were able to reduce complexity, reduce space and reduce processing time. Thank you Exasol team to keep enhancing your product with such nice new features.
Has anybody used import from azureblobstorage so far? What experiences do you have?
Next steps
The next use case will be to read data directly from AWS S3 parquet. This will probably be the same. We expect to write a UDF to extract the filenames of a directory, but otherwise it should be the same as importing from AzureblobStorage or from sftp-server