Breve descripción sobre el uso de operaciones Data Manipulation Language en Databricks utilizando parámetros desde Data Factory.
En este post vamos a ver un ejemplo de como realizar operaciones DML sobre una tabla delta existente en nuestro Data Lake mediante el uso de metadatos recibidos de Azure Dat Factory.
Primero comenzamos viendo lo que es una Delta Table:
En el contexto de Azure Databricks, las “Delta Tables” se refieren a tablas gestionadas por Delta Lake, un sistema de almacenamiento de datos para big data que proporciona transacciones ACID (Atomicidad, Consistencia, Aislamiento y Durabilidad) en los datos a nivel de tabla en entornos de big data.
Algunas características clave de las Delta Tables son las siguientes:
Soporte para operaciones merge: Delta Lake permite realizar operaciones de merge (mezcla) para actualizar, insertar o eliminar datos en función de ciertas condiciones, lo que facilita la administración de cambios en los conjuntos de datos.
En el entorno de Data Factory podemos ejecutar una notebook de databricks utilizando un computo previamente configurado desde la plataforma de databricks. Una vez creada la actividad de Notebook en Azure Data Factory se pueden configurar los parámetros para que el script los reciba:
Podemos recibir en la notebook todos estos parámetros utilizando un diccionario (contextValues) con las variables del contexto.
Luego, al instanciar la clase principal, le enviamos como parametros estos datos.
= {
contextValues "dataBase" : dbutils.widgets.get("db"),
"table" : dbutils.widgets.get("table"),
"schema" : dbutils.widgets.get("schema"),
"env" : dbutils.widgets.get("env"),
"pk" : dbutils.widgets.get("pk")
}
Si contamos con la capa bronze y silver dentro del mismo container con la estructura detallada podemos utilizar el código que comparto en este post para realizar operaciones dml sobre la tabla principal a actualizar en nuestro delta lake.
Para los datos incrementales de la capa bronze:
//<container>@<datalakename>dfs.core.windows.net/bronze/incremental/<db>/<schema>/<table>
abfss:
Para los datos delta de la capa silver:
//<container>@<datalakename>dfs.core.windows.net/silver/tablas/<db>/<schema>/<table> abfss:
La clase principal es la siguiente:
from delta.tables import *
import os
class UpdateTable():
def _init_(self, dataBase, table, schema, env, pk ):
self.dataBase = dataBase
self.table = table
self.schema = schema
self.env = env
self.pk = self.getListFromString(pk)
self.dataLakeName = "<nombre del datalake>"
self.scopeName = "<nombre del scope>"
self.container = "<nombre del container>" + self.env
self.mainPath = "abfss://" + "<nombre del container>" + "@"
+ "<nombre del datalake>" + ".dfs.core.windows.net/"
def sparkConf(self):
"""
Returns:
Conexion mediante Access Key a Azure Data Lake Gen 2.
(Es recomendable utilizar un Service Principal o una Managed Identity)
"""
set(
spark.conf."fs.azure.account.key." + "<nombre del datalake>" + ".dfs.core.windows.net",
= "<nombre del scope>" , key = "<nombre del key>")
dbutils.secrets.get(scope
)
def getListFromString(string):
"""
Args:
string (str)
Returns:
Arma una lista a partir de un string.
"""
if ',' in string:
= string.split(',')
string_split = [string.strip() for string in string_split]
string_clean return string_clean
else:
= string.strip()
string_strip return [string_strip]
def run(self):
try:
self.sparkConf()
= "data"
aliasDataName = "new_data"
aliasNewDataName = " AND ".join(
mergeQueryUpdateAndInsert f"{aliasDataName}.{pk} = {aliasNewDataName}.{pk_new}"
for pk, pk_new in zip(self.pk, self.pk)
)
= self.mainPath + "silver/tablas/" + self.dataBase +"/"
pathDeltaTable + self.schema + "/" + self.table + "/"
= DeltaTable.forPath(spark, pathDeltaTable)
deltaTable
= self.mainPath + "bronze/incremental/" + self.dataBase +"/"
pathInsertAndUpdate + self.schema + "/" + self.table + "/"
= spark.read.parquet(pathInsertAndUpdate)
insertAndUpdateData
"""
deltaTable: Contiene los datos de la tabla principal en la capa silver a actualizar.
insertAndUpdateData: Contiene los nuevos datos a ingresar o actualizar.
mergeQueryUpdateAndInsert: Query para realizar el merge.
"""
(deltaTable.alias(aliasDataName)
.merge(insertAndUpdateData.alias(aliasNewDataName), mergeQueryUpdateAndInsert)"""Si hacen matching reemplaza los datos anteriores por los nuevos"""
.whenMatchedUpdateAll() """Si no hacen matching realiza una operacion de insert"""
.whenNotMatchedInsertAll() """Ejecuta la operacion"""
.execute()
)
"""Elimina los datos parquet que se utilizaron para actualizar la delta table principal"""
+ self.table +'.parquet', True)
dbutils.fs.rm(pathInsertAndUpdate
except Exception as e:
print(f"Error: {e}")
if _name_ == "_main_":
= UpdateTable(**contextValues)
obj
obj.run()
Funciones del Script:
Con ´sparkConf´ configuramos algunas propiedades de Spark para trabajar con Azure Data Lake Storage (ADLS) Gen2, utilizando el servicio de secretos de Databricks (dbutils.secrets). La función utiliza spark.conf.set para configurar una propiedad en la configuración de Spark. En este caso, la propiedad se refiere a la clave de cuenta de almacenamiento de Azure asociada a un Data Lake Storage Gen2. La configuración de esta clave es esencial para que Spark pueda acceder y trabajar con datos almacenados en ADLS Gen2.
´checkIfExistData´ retorna True si el directorio especificado por pathInput contiene al menos un archivo o subdirectorio. Esta funcion nos sirve para ejecutar el comando de actualizacion de datos en la tabla principal solo si existen datos.
´getListFromString´ nos sirve para generar una lista a partir del nombre de las columnas que se reciben desde Data Factory para identificar los registros unicos en la tabla. Este resultado luego se utiliza para la query que realiza el merge.
´run´ ejecuta la función principal de la clase y en primer lugar, luego de setear la configuración de sparkConf, se generan los alias para la tabla principal y para los datos nuevos. Por ultimo la query para el mergeData. Esta query necesita las columnas que identifican de forma única a cada uno de los registros
= "data"
aliasDataName = "new_data"
aliasNewDataName = " AND ".join(
mergeQueryUpdateAndInsert f"{aliasDataName}.{pk} = {aliasNewDataName}.{pk_new}"
for pk, pk_new in zip(self.pk, self.pk)
)
La actualización de los datos funciona con el siguiente comando:
(deltaTable.alias(aliasDataName)
.merge(insertAndUpdateData.alias(aliasNewDataName), mergeQueryUpdateAndInsert)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute() )
El script presentado utiliza una clase llamada UpdateTable, que configura propiedades de Spark para trabajar con Azure Data Lake Storage Gen2 y verifica la existencia de datos en el directorio correspondiente en la capa Bronze. Luego, utiliza la funcionalidad de Delta Lake para realizar operaciones de merge, actualizando la tabla principal en la capa Silver. El código también incluye manejo de excepciones y limpieza de datos temporales después de la operación de actualización.
Este enfoque demuestra una estrategia para mantener la consistencia y la integridad de los datos al realizar operaciones de ingestión e actualización incremental en un entorno de big data utilizando herramientas como Databricks y Azure Data Factory.
Para información mas detallada de la Api de Python para Delta Lake se puede acceder aqui. Para ver actualizaciones y mantenerse al día sobre Delta Lake click aqui para acceder al blog.