Triggers en MySql utilizando Python

MySql

Detectando cambios de Update, Insert o Delete por Primary Key sobre una tabla en MySql utilizando un conector de Python.

Maxi Galoto
2024-03-03

react

Inicio



Introduccion

Volver al Inicio



Los triggers en MySQL son piezas de código que se ejecutan automáticamente en respuesta a ciertos eventos, como operaciones de inserción, actualización o eliminación en una tabla. En el contexto de Change Data Capture (CDC), los triggers son herramientas para rastrear y registrar cambios en una tabla principal. El objetivo es mantener una tabla de seguimiento (tabla custom) actualizada con los cambios realizados en la tabla principal, permitiendo así un seguimiento detallado de las modificaciones.

Existen diferentes formas de implementar CDC pero en este post solo se va a compartir la generacion de Triggers.

* Implementación con Python y Docker:

Para automatizar este proceso, se va a compartir en este post un instructivo y, por ultimo, un repositorio donde se utiliza Python junto con Docker para levantar un servicio de MySQL y ejecutar un script que realice las siguientes tareas:



Entorno

Volver al Inicio



Para comenzar a construir el entorno, en primer lugar, se configuro el docker compose con la ultima imagen de mysql, las variables de entorno, los puertos del contenedor del entorno local y, por ultimo, los volumenes:


version: '3.8'
services:
  mysql:
    image: mysql:latest
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: ${PASS}
    ports:
      - '3306:3306'
    volumes:
      - type: "bind"
        source: "${WORKING_DIR}/data"
        target: "/var/lib/mysql" #volumes del file system de la db

Una vez instanciada la imagen podemos crear un entorno virtual para instalar las librerías necesarias para interactuar con los objetos de la base de datos y manipular los datos. El archivo requirements.txt contiene dichas librerias.

Los pasos completos serian los siguientes:



Script

Volver al Inicio



Para correr el programa se utilizo Python 3.9.0. En primer lugar se importan las librerías necesarias para ejecutar el Script que son las siguientes:

La clase principal que se encuentra en el script main.py es TriggerProcess. Esta clase inicia el constructor con las siguientes variables:

class TriggerProcess():

    def __init__(self,
                 host: str, 
                 user: str, 
                 password: str, 
                 port: str) -> None:
                 
        self.host = host
        self.user = user
        self.password = password
        self.port = port
        self.date = self.getDateNow()
        self.conn = None
        self.cursor = None
        self.dbname = 'database_prueba'
        self.tablename = 'clientes'
        self.tablecustom = self.tablename + '_custom'
        self.triggername = self.tablename + '_trigger'
        self.triggerinsert = self.triggername + '_insert'
        self.triggerupdate = self.triggername + '_update'
        self.initialRows = 10 # rows iniciales a insertar
        self.insertRows = 30 # 30 rows mas que las rows ya existentes
        self.pkupdate = 1
        self.pkinsert = 999999
        self.faker = Faker()

Ejecucion del Script:

Una vez iniciado el Script se establece la conexión y el cursor en las variables del constructor:

def connect(self):
    try:
        self.conn = mysql.connector.connect(
                user=self.user, 
                password=self.password, 
                host=self.host, 
                port=str(self.port)
            )
        self.cursor = self.conn.cursor()
    except Exception  as e:
        print(
          f"Error: {logger.error(self.bcolors.RED+ str(e)  +self.bcolors.RESET)}"
          )

Se crea la tabla principal y el schema:

self.connect()
self.createSchema()
self.createTable()
self.infoLogger(message=f"Ok Schema y Table: {self.dbname}.{self.tablename}")
time.sleep(1)

self.cursor.execute(f"""USE {self.dbname};""")

Se obtienen los datos iniciales para insertar en la tabla principal:

data = self.getInitialData(self.initialRows)
self.infoLogger(message=f"Insertando {str(self.initialRows)} filas iniciales..")
self.insertData(data)
time.sleep(1)

Luego se insertan 30 nuevos registros a partir del Max ID ingresado anteriormente:

newData = self.getNewData(maxRows=int(self.insertRows ))
self.infoLogger(message=f"Total de filas a ingresar : {len(newData)}")
self.insertData(newData)

Luego creamos la tabla custom que se va a actualizar con el Trigger:


def createCustomTable(self) -> None:
        self.cursor.execute(f"""DROP TABLE IF EXISTS {self.dbname}.{self.tablecustom};""")
        self.cursor.execute(
            f"""CREATE TABLE IF NOT EXISTS 
                {self.dbname}.{self.tablecustom} (
                pk INT,
                operacion VARCHAR(50),
                ultima_actualizacion 
                DATETIME DEFAULT 
                CURRENT_TIMESTAMP
            );"""
        )
        
        
self.createCustomTable()

Creamos los Triggers:



def createTriggers(self) -> None:
        self.cursor.execute(
          f"""DROP TRIGGER IF EXISTS {self.triggerinsert};"""
          )
        self.cursor.execute(
            f"""CREATE TRIGGER {self.triggerinsert}
                AFTER INSERT ON {self.tablename}
                FOR EACH ROW
                BEGIN
                    INSERT INTO {self.tablecustom} 
                    (pk, operacion, ultima_actualizacion)
                    VALUES 
                    (NEW.id, 'insert', NOW());
                END"""
        )

        self.cursor.execute(
          f"""DROP TRIGGER IF EXISTS {self.triggerupdate};"""
          )
        self.cursor.execute(
            f"""CREATE TRIGGER {self.triggerupdate}
                AFTER UPDATE ON {self.tablename}
                FOR EACH ROW
                BEGIN
                    INSERT INTO {self.tablecustom} 
                    (pk, operacion, ultima_actualizacion)
                    VALUES 
                    (NEW.id, 'update', NOW());
                END"""
        )
        
        
self.createTriggers()

Creamos una row con datos ficticios para ingresar a la tabla principal utilizando el self.pkinsert seteado anteriormente y también una row para actualizar el self.pkupdate.

name, email, address = self.getData()
dict = {
    'id' : self.pkinsert,
    'name' : name,
    'email' : email,
    'address' : address
}
newData = [dict]
self.infoLogger(message=f"Total de filas a ingresar : {len(newData)}")
self.insertData(newData)
query = f"UPDATE {self.dbname}.{self.tablename} SET name = %s WHERE id = %s;"
valores = ('newname', self.pkupdate)
self.cursor.execute(query, valores)

Luego realizamos un Inner Join entre la tabla principal y la custom para levantar los registros que se actualizaron y/o insertaron:

Se realiza un Gropy By y un MAX en ultima_actualizacion para solo obtener los últimos PK ingresados en la tabla custom. Hay que tener en cuenta que esto es un proceso batch y durante una ventana temporal el mismo PK puede tener un insert, luego un update y, por ultimo, un delete, por lo tanto, necesitamos levantar el ultimo estado del dato que en este caso seria un delete. En el ejemplo del Script solo se están utilizando operaciones insert y delete pero de todas formas este caso hay que tenerlo en cuenta.


queryinner = f"""SELECT c.*, cc.operacion, cc.ultima_actualizacion
                  FROM {self.dbname}.{self.tablename} c
                  INNER JOIN (
                      SELECT 
                      pk, operacion, MAX(ultima_actualizacion) AS ultima_actualizacion
                      FROM {self.dbname}.{self.tablename}_custom
                      GROUP BY pk, operacion
                  ) cc ON c.id = cc.pk;"""

self.cursor.execute(queryinner)
inner_rows = self.cursor.fetchall()
for fila in inner_rows:
    print(fila)

Probamos la funcionalidad de la agrupación y el MAX date para ver si nos devuelve la ultima actualización:

Actualizamos el PK de antes:

query = f"UPDATE {self.dbname}.{self.tablename} SET name = %s WHERE id = %s;"
valores = ('newname_2', self.pkupdate)
self.cursor.execute(query, valores)

Observamos nuevamente la tabla principal utilizando el Inner Join y podemos ver que la agrupación hecha en la tabla custom (que a esta altura cuenta con tres registros, un insert y dos update para el {self.pkupdate}) nos devuelve solo dos registros que se unen con la tabla principal:



Conclusion

Volver al Inicio



La implementación de triggers en MySQL para el Change Data Capture ofrece una solución automatizada para rastrear cambios en una tabla de una base de datos. Aunque esta estrategia presenta ventajas significativas, como la automatización también conlleva desafíos, como la complejidad y el posible overhead. En última instancia, la elección de utilizar triggers para CDC en MySQL dependerá de la naturaleza y requisitos específicos del proyecto, sopesando cuidadosamente los beneficios y desventajas asociados.

Los Pros y Contras de esta estrategia son los siguientes:

Captura Automática: CDC con triggers automatiza la captura de cambios, eliminando la necesidad de seguimiento manual.

Retrocompatibilidad: Facilita la identificación de cambios históricos y la restauración de datos antiguos.

Complejidad: Implementar y mantener triggers puede resultar complejo y requiere un conocimiento profundo del sistema.

Overhead: Los triggers pueden introducir un cierto overhead en la base de datos, especialmente en operaciones de alta concurrencia.

Posible Desincronización: En casos de fallos o interrupciones, puede ocurrir desincronización entre la tabla principal y la tabla custom.



Repositorio

Volver al Inicio



  • Github Repository: aqui