Watermarks en MySql

MySql Python

Uso de una tabla custom para Watermarks en MySql con Python y Docker .

Maxi Galoto
2024-09-12

react

Inicio



Introduccion

Volver al Inicio



En la gestión de grandes volúmenes de datos, garantizar que las operaciones se realicen de manera eficiente y segura es clave.

En este post, exploraremos cómo implementar un proceso ETL que extrae datos de MySQL y los guarda en un file system particionado por fecha, utilizando Python y Docker para levantar el servicio de MySQL.

El enfoque principal será lograr una operación atómica: el watermark que controla la actualización de los datos solo se ajustará si el proceso de escritura en el sistema de archivos es exitoso. Este concepto es fundamental para evitar inconsistencias y asegurar que los datos procesados reflejen correctamente la última extracción completada.



Entorno

Volver al Inicio



Para comenzar a construir el entorno, en primer lugar, se configuro el docker compose con la ultima imagen de mysql, las variables de entorno, los puertos del contenedor del entorno local y, por ultimo, los volumenes:


version: '3.8'
services:
  mysql:
    image: mysql:latest
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: ${PASS}
    ports:
      - '3306:3306'
    volumes:
      - type: "bind"
        source: "${WORKING_DIR}/data"
        target: "/var/lib/mysql" #volumes del file system de la db

Una vez instanciada la imagen podemos crear un entorno virtual para instalar las librerías necesarias para interactuar con los objetos de la base de datos y manipular los datos. El archivo requirements.txt contiene dichas librerías.

Los pasos completos serian los siguientes:



Script

Volver al Inicio



Primero, se establece la conexión a la base de datos dentro del método connect. Se utiliza el constructor de la clase para inicializar las variables de conexión, como el user, password, host y port. Una vez que la conexión es exitosa, se asigna un cursor que permitirá ejecutar consultas SQL. Si ocurre algún error, se captura mediante un bloque try-except y se registra el error con un mensaje personalizado.

  def connect(self):
      try:
          self.conn = mysql.connector.connect(
                  user=self.user, 
                  password=self.password, 
                  host=self.host, 
                  port=str(self.port)
              )
          self.cursor = self.conn.cursor()
      except Exception  as e:
          print(f"Error: {logger.error(self.bcolors.RED+ str(e)  +self.bcolors.RESET)}")

Creamos el schema, la tabla, la tabla watermark y el stored procedure. Para la creacion de estas tablas tenemos que tener en cuenta lo siguiente:


def create_schema(self):
    schema_name = self.schema
    self.cursor.execute(f"""DROP SCHEMA IF EXISTS {self.schema};""")
    self.infoLogger(message=f"Creating schema: '{schema_name}'.")
    self.cursor.execute(f"CREATE DATABASE IF NOT EXISTS {schema_name}")
    self.conn.commit()
    self.successLogger(message=f"Schema '{schema_name}' created OK.")


def create_table(self):
    table_name = self.table
    self.cursor.execute(f"""DROP TABLE IF EXISTS {self.schema}.{self.table};""")
    self.infoLogger(message=f"Creando table '{table_name}'.")
    self.cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            TripID INT,
            customerID INT,
            LastModifiedTime DATETIME
        )
    """)
    self.conn.commit()
    self.successLogger(message=f"Table '{table_name}' created.")
    


def create_watermark_table(self):
    watermark_name = self.watermark
    self.cursor.execute(f"""DROP TABLE IF EXISTS {self.schema}.{self.watermark};""")
    self.infoLogger(message=f"Creando Watermark '{watermark_name}'.")
    self.cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {self.schema}.{watermark_name} (
            TableName VARCHAR(100),
            WatermarkValue DATETIME
        )
    """)
    self.conn.commit()
    self.successLogger(message=f"Watermark '{watermark_name}' created.")



def create_stored_procedure(self):
    schema_name = self.schema
    procedure_name = self.procedure
    watermark_name = self.watermark
    self.infoLogger(message=f"Stored Procedure '{procedure_name}'.")
    self.cursor.execute(f"DROP PROCEDURE IF EXISTS {self.schema}.{self.procedure}")
    self.cursor.execute(f"""
        CREATE PROCEDURE {schema_name}.{procedure_name} (
            IN LastModifiedtime DATETIME, 
            IN TableName VARCHAR(100)
        )
        BEGIN
            UPDATE {schema_name}.{watermark_name} 
            SET WatermarkValue = LastModifiedtime 
            WHERE TableName = TableName;
        END
    """)
    self.conn.commit()
    self.successLogger(message=f"Stored Procedure '{procedure_name}' created.")

Probamos hacer dos insert en la tabla principal:


self.insert_data(
            triptid=100,
            customerid=200,
            datetime = '2024-09-08 12:54:22'
)

self.insert_data(
    triptid=101,
    customerid=201,
    datetime = '2024-09-08 12:54:22'
)

Obtenemos la fecha máxima de ultima modificación de la tabla principal:

self.cursor.execute(f"SELECT MAX(LastModifiedTime) FROM {self.table}")
max_datetime_first_insert = self.cursor.fetchone()[0]

Ejecutamos la función get_atomic_operation()

La función realiza una operación SQL que puede ser un insert o update según el tipo de watermark (operation_watermark) y guarda los resultados en un archivo CSV. También se actualiza una tabla de marca de agua (watermark) que lleva un seguimiento de las operaciones. Todo el proceso es atómico: si hay un error, la transacción se revierte.

Parámetros:


def get_atomic_operation(
      self, 
      operation_watermark: str, 
      query: str, 
      values: Tuple[Any], 
      date: datetime, 
      close_conn:bool=False):
      
        logger.info(
          self.bcolors.YELLOW+ f'Atomic Operation with {operation_watermark} in WatermarkTable'+self.bcolors.RESET
          )
        self.cursor.execute(query.format(table=self.table), values)
        data = self.cursor.fetchall()
        column_names = [desc[0] for desc in self.cursor.description]
        try:
            # Atomic Operation
            df = pd.DataFrame(data, columns=column_names)
            folder = self.get_folder(date)
            df.to_csv(folder, index=False)
            if operation_watermark == 'update':
                self.update_watermark(datetime=date, commit=False)
            if operation_watermark == 'insert':
                self.insert_watermark(datetime=date, commit=False)
        except Exception as e:
            self.conn.rollback()
            print(f'Error: {logger.error(self.bcolors.RED+ str(e)  +self.bcolors.RESET)}')
        finally:
            self.conn.commit()
            self.cursor.execute(f'SELECT * FROM {self.watermark}')
            watermark_data = self.cursor.fetchall()
            column_names = [desc[0] for desc in self.cursor.description]
            print(F'Columns: {column_names}')
            print('Final row in WatermarkTable:')
            for row in watermark_data:
                print(row)
            if close_conn:
                self.conn.close()
                



self.get_atomic_operation(
            operation_watermark='insert', 
            query="""SELECT * FROM {table} WHERE LastModifiedTime <= %s""", 
            values=(max_datetime_first_insert,), 
            date=max_datetime_first_insert,
            close_conn=False
        )

Explicación del flujo:

Registro del inicio de la operación:

Se registra un mensaje en el logger indicando que se va a realizar una operación atómica utilizando un watermark específico. Ejecución de la consulta:

La consulta SQL pasada como parámetro se ejecuta con los valores especificados. Se obtienen los resultados de la consulta utilizando fetchall() y los nombres de las columnas con description. Creación del DataFrame y guardado como CSV:

Se utiliza un bloque try-except para manejar cualquier error durante la operación. Se construye un DataFrame de pandas con los datos obtenidos. El DataFrame se guarda en un archivo CSV en una carpeta determinada por la función get_folder. Actualización de la tabla de watermark:

Dependiendo del valor de operation_watermark (‘update’ o ‘insert’), se actualiza la tabla de watermark mediante las funciones update_watermark o insert_watermark. Se especifica commit=False para asegurar que estas actualizaciones no se confirmen hasta que toda la operación atómica haya sido completada. Manejo de errores y commit:

Si ocurre un error, se ejecuta rollback() para deshacer cualquier cambio en la transacción. En el bloque finally, se asegura que la transacción sea confirmada con commit() una vez que se haya completado todo sin errores. Visualización de los datos en la tabla de watermark:

Después de confirmar la transacción, se consulta y muestra el contenido actual de la tabla de watermark. Cierre de conexión:

Si el parámetro close_conn es True, la conexión a la base de datos se cierra.

Update Watermark

Una vez que ya tenemos la tabla completamente migrada al file system y el watermark actualizado con la fecha maxima de LastUpdateDate tenemos que armar un proceso de ETL que lleve al file system las filas utilizando un rango de fechas.

En primer lugar insertamos dos nuevos registros en la tabla principal con una fecha posterior a la primera:


self.insert_data(
            triptid=102,
            customerid=202,
            datetime='2024-09-08 18:54:22'
        )
self.insert_data(
    triptid=103,
    customerid=203,
    datetime='2024-09-08 18:54:22'
)

Obtenemos la fecha máxima de la tabla principal y de la tabla watermark:


self.cursor.execute(f"SELECT MAX(LastModifiedTime) FROM {self.table}")
max_datetime_second_insert = self.cursor.fetchone()[0]


self.cursor.execute(f"SELECT WatermarkValue FROM {self.watermark} WHERE TableName = '{self.table}'")
last_wateremarkvalue = self.cursor.fetchone()[0]

Si la fecha de la ultima actualización es mayor que la fecha que tenemos en la tabla watermark significa que hay registros nuevos y/o actualizados en la tabla principal:

La query a realizar es levantar los datos que se encuentran entre las fechas consultadas anteriormente.


if max_datetime_second_insert > last_wateremarkvalue:
            print('New rows...')
            self.get_atomic_operation(
                operation_watermark='update', 
                query="""
                    SELECT * FROM {table}
                    WHERE LastModifiedTime > %s
                    AND LastModifiedTime <= %s
                """, 
                values=(last_wateremarkvalue, max_datetime_second_insert), 
                date=max_datetime_second_insert,
                close_conn=True
            )

Resultado en el file system:

Se usa la función get_folder() que recibe la fecha de procesamiento y crea una carpeta en el file system si es que no existe. El return de la funcion es el path que se utiliza para guardar el csv.


def get_folder(self, date_process):
        try:
            datetime_file = date_process.strftime('%H-%M-%S')
            date_file = date_process.strftime('%Y-%m-%d')
            folder = self.dir + f'/incremental_load/{self.schema}/{self.table}/{self.table}_{date_file}/'
            print(f'Folder: {folder}')
            os.makedirs(folder, exist_ok=True)
            return folder + f'{datetime_file}.csv'

        except Exception  as e:
            print(f"Error: {logger.error(self.bcolors.RED+ str(e)  +self.bcolors.RESET)}")



Conclusion

Volver al Inicio



En conclusión, este post detalla cómo implementar un proceso ETL robusto utilizando MySQL, Python y Docker, centrado en asegurar operaciones atómicas. El uso de un sistema de “watermark” garantiza que los datos procesados y almacenados en el file system reflejen correctamente las últimas modificaciones. El enfoque no solo proporciona seguridad y consistencia en el manejo de grandes volúmenes de datos, sino que también optimiza las operaciones incrementales, evitando duplicados y garantizando la integridad de los datos. El código presentado es modular y escalable, lo que facilita su adaptación a otros entornos o proyectos.



Repositorio

Volver al Inicio