Uso de una tabla custom para Watermarks en MySql con Python y Docker .
En la gestión de grandes volúmenes de datos, garantizar que las operaciones se realicen de manera eficiente y segura es clave.
En este post, exploraremos cómo implementar un proceso ETL que extrae datos de MySQL y los guarda en un file system particionado por fecha, utilizando Python y Docker para levantar el servicio de MySQL.
El enfoque principal será lograr una operación atómica: el watermark que controla la actualización de los datos solo se ajustará si el proceso de escritura en el sistema de archivos es exitoso. Este concepto es fundamental para evitar inconsistencias y asegurar que los datos procesados reflejen correctamente la última extracción completada.
Para comenzar a construir el entorno, en primer lugar, se configuro el docker compose con la ultima imagen de mysql, las variables de entorno, los puertos del contenedor del entorno local y, por ultimo, los volumenes:
version: '3.8'
services:
mysql:
image: mysql:latest
restart: always
environment:
MYSQL_ROOT_PASSWORD: ${PASS}
ports:
- '3306:3306'
volumes:
- type: "bind"
source: "${WORKING_DIR}/data"
target: "/var/lib/mysql" #volumes del file system de la db
Una vez instanciada la imagen podemos crear un entorno virtual para instalar las librerías necesarias para interactuar con los objetos de la base de datos y manipular los datos. El archivo requirements.txt contiene dichas librerías.
Los pasos completos serian los siguientes:
Crear un archivo.env
en el directorio del proyecto con la variable de entorno WORKING_DIR
Levantar el servicio de MySql utilizando docker compose up -d
(En modo detach para poder seguir utilizando la misma terminal)
Instalar el entorno virtual con las dependencias.
Primero, se establece la conexión a la base de datos dentro del método connect. Se utiliza el constructor de la clase para inicializar las variables de conexión, como el user, password, host y port. Una vez que la conexión es exitosa, se asigna un cursor que permitirá ejecutar consultas SQL. Si ocurre algún error, se captura mediante un bloque try-except y se registra el error con un mensaje personalizado.
def connect(self):
try:
self.conn = mysql.connector.connect(
=self.user,
user=self.password,
password=self.host,
host=str(self.port)
port
)self.cursor = self.conn.cursor()
except Exception as e:
print(f"Error: {logger.error(self.bcolors.RED+ str(e) +self.bcolors.RESET)}")
Creamos el schema, la tabla, la tabla watermark y el stored procedure. Para la creacion de estas tablas tenemos que tener en cuenta lo siguiente:
La tabla tiene que tener una columna con datos tipo DATETIME para completar sus valores con la ultima actualizacion del registro. Para este caso vamos a utilizar una columna llamada LastModifiedTime.
Para la tabla de watermark hacemos lo mismo, creamos la tabla con un nombre generico para poder utilizarla con algunas otras tablas mas. Le agregamos dos columnas, TableName y WatermarkValue. En TableName completamos con el nombre de la tabla de hechos y en WatermarkValue vamos a completar una vez que hagamos la migracion inicial de los datos al file system con la fecha maxima de LastModifiedTime.
Por ultimo creamos el stored procedure para que reciba como inputs el nombre de la tabla y el LastModifiedtime.
def create_schema(self):
= self.schema
schema_name self.cursor.execute(f"""DROP SCHEMA IF EXISTS {self.schema};""")
self.infoLogger(message=f"Creating schema: '{schema_name}'.")
self.cursor.execute(f"CREATE DATABASE IF NOT EXISTS {schema_name}")
self.conn.commit()
self.successLogger(message=f"Schema '{schema_name}' created OK.")
def create_table(self):
= self.table
table_name self.cursor.execute(f"""DROP TABLE IF EXISTS {self.schema}.{self.table};""")
self.infoLogger(message=f"Creando table '{table_name}'.")
self.cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
TripID INT,
customerID INT,
LastModifiedTime DATETIME
)
""")
self.conn.commit()
self.successLogger(message=f"Table '{table_name}' created.")
def create_watermark_table(self):
= self.watermark
watermark_name self.cursor.execute(f"""DROP TABLE IF EXISTS {self.schema}.{self.watermark};""")
self.infoLogger(message=f"Creando Watermark '{watermark_name}'.")
self.cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {self.schema}.{watermark_name} (
TableName VARCHAR(100),
WatermarkValue DATETIME
)
""")
self.conn.commit()
self.successLogger(message=f"Watermark '{watermark_name}' created.")
def create_stored_procedure(self):
= self.schema
schema_name = self.procedure
procedure_name = self.watermark
watermark_name self.infoLogger(message=f"Stored Procedure '{procedure_name}'.")
self.cursor.execute(f"DROP PROCEDURE IF EXISTS {self.schema}.{self.procedure}")
self.cursor.execute(f"""
CREATE PROCEDURE {schema_name}.{procedure_name} (
IN LastModifiedtime DATETIME,
IN TableName VARCHAR(100)
)
BEGIN
UPDATE {schema_name}.{watermark_name}
SET WatermarkValue = LastModifiedtime
WHERE TableName = TableName;
END
""")
self.conn.commit()
self.successLogger(message=f"Stored Procedure '{procedure_name}' created.")
Probamos hacer dos insert en la tabla principal:
self.insert_data(
=100,
triptid=200,
customerid= '2024-09-08 12:54:22'
datetime
)
self.insert_data(
=101,
triptid=201,
customerid= '2024-09-08 12:54:22'
datetime )
Obtenemos la fecha máxima de ultima modificación de la tabla principal:
self.cursor.execute(f"SELECT MAX(LastModifiedTime) FROM {self.table}")
= self.cursor.fetchone()[0] max_datetime_first_insert
Ejecutamos la función get_atomic_operation()
La función realiza una operación SQL que puede ser un insert o update según el tipo de watermark (operation_watermark) y guarda los resultados en un archivo CSV. También se actualiza una tabla de marca de agua (watermark) que lleva un seguimiento de las operaciones. Todo el proceso es atómico: si hay un error, la transacción se revierte.
Parámetros:
def get_atomic_operation(
self,
str,
operation_watermark: str,
query:
values: Tuple[Any],
date: datetime, bool=False):
close_conn:
logger.info(self.bcolors.YELLOW+ f'Atomic Operation with {operation_watermark} in WatermarkTable'+self.bcolors.RESET
)self.cursor.execute(query.format(table=self.table), values)
= self.cursor.fetchall()
data = [desc[0] for desc in self.cursor.description]
column_names try:
# Atomic Operation
= pd.DataFrame(data, columns=column_names)
df = self.get_folder(date)
folder =False)
df.to_csv(folder, indexif operation_watermark == 'update':
self.update_watermark(datetime=date, commit=False)
if operation_watermark == 'insert':
self.insert_watermark(datetime=date, commit=False)
except Exception as e:
self.conn.rollback()
print(f'Error: {logger.error(self.bcolors.RED+ str(e) +self.bcolors.RESET)}')
finally:
self.conn.commit()
self.cursor.execute(f'SELECT * FROM {self.watermark}')
= self.cursor.fetchall()
watermark_data = [desc[0] for desc in self.cursor.description]
column_names print(F'Columns: {column_names}')
print('Final row in WatermarkTable:')
for row in watermark_data:
print(row)
if close_conn:
self.conn.close()
self.get_atomic_operation(
='insert',
operation_watermark="""SELECT * FROM {table} WHERE LastModifiedTime <= %s""",
query=(max_datetime_first_insert,),
values=max_datetime_first_insert,
date=False
close_conn )
Explicación del flujo:
Registro del inicio de la operación:
Se registra un mensaje en el logger indicando que se va a realizar una operación atómica utilizando un watermark específico. Ejecución de la consulta:
La consulta SQL pasada como parámetro se ejecuta con los valores especificados. Se obtienen los resultados de la consulta utilizando fetchall() y los nombres de las columnas con description. Creación del DataFrame y guardado como CSV:
Se utiliza un bloque try-except para manejar cualquier error durante la operación. Se construye un DataFrame de pandas con los datos obtenidos. El DataFrame se guarda en un archivo CSV en una carpeta determinada por la función get_folder. Actualización de la tabla de watermark:
Dependiendo del valor de operation_watermark (‘update’ o ‘insert’), se actualiza la tabla de watermark mediante las funciones update_watermark o insert_watermark. Se especifica commit=False para asegurar que estas actualizaciones no se confirmen hasta que toda la operación atómica haya sido completada. Manejo de errores y commit:
Si ocurre un error, se ejecuta rollback() para deshacer cualquier cambio en la transacción. En el bloque finally, se asegura que la transacción sea confirmada con commit() una vez que se haya completado todo sin errores. Visualización de los datos en la tabla de watermark:
Después de confirmar la transacción, se consulta y muestra el contenido actual de la tabla de watermark. Cierre de conexión:
Si el parámetro close_conn es True, la conexión a la base de datos se cierra.
Update Watermark
Una vez que ya tenemos la tabla completamente migrada al file system y el watermark actualizado con la fecha maxima de LastUpdateDate tenemos que armar un proceso de ETL que lleve al file system las filas utilizando un rango de fechas.
En primer lugar insertamos dos nuevos registros en la tabla principal con una fecha posterior a la primera:
self.insert_data(
=102,
triptid=202,
customerid='2024-09-08 18:54:22'
datetime
)self.insert_data(
=103,
triptid=203,
customerid='2024-09-08 18:54:22'
datetime )
Obtenemos la fecha máxima de la tabla principal y de la tabla watermark:
self.cursor.execute(f"SELECT MAX(LastModifiedTime) FROM {self.table}")
= self.cursor.fetchone()[0]
max_datetime_second_insert
self.cursor.execute(f"SELECT WatermarkValue FROM {self.watermark} WHERE TableName = '{self.table}'")
= self.cursor.fetchone()[0] last_wateremarkvalue
Si la fecha de la ultima actualización es mayor que la fecha que tenemos en la tabla watermark significa que hay registros nuevos y/o actualizados en la tabla principal:
La query a realizar es levantar los datos que se encuentran entre las fechas consultadas anteriormente.
if max_datetime_second_insert > last_wateremarkvalue:
print('New rows...')
self.get_atomic_operation(
='update',
operation_watermark="""
query SELECT * FROM {table}
WHERE LastModifiedTime > %s
AND LastModifiedTime <= %s
""",
=(last_wateremarkvalue, max_datetime_second_insert),
values=max_datetime_second_insert,
date=True
close_conn )
Resultado en el file system:
Se usa la función get_folder() que recibe la fecha de procesamiento y crea una carpeta en el file system si es que no existe. El return de la funcion es el path que se utiliza para guardar el csv.
def get_folder(self, date_process):
try:
= date_process.strftime('%H-%M-%S')
datetime_file = date_process.strftime('%Y-%m-%d')
date_file = self.dir + f'/incremental_load/{self.schema}/{self.table}/{self.table}_{date_file}/'
folder print(f'Folder: {folder}')
=True)
os.makedirs(folder, exist_okreturn folder + f'{datetime_file}.csv'
except Exception as e:
print(f"Error: {logger.error(self.bcolors.RED+ str(e) +self.bcolors.RESET)}")
En conclusión, este post detalla cómo implementar un proceso ETL robusto utilizando MySQL, Python y Docker, centrado en asegurar operaciones atómicas. El uso de un sistema de “watermark” garantiza que los datos procesados y almacenados en el file system reflejen correctamente las últimas modificaciones. El enfoque no solo proporciona seguridad y consistencia en el manejo de grandes volúmenes de datos, sino que también optimiza las operaciones incrementales, evitando duplicados y garantizando la integridad de los datos. El código presentado es modular y escalable, lo que facilita su adaptación a otros entornos o proyectos.