Detectando cambios de Update, Insert o Delete por Primary Key sobre una tabla en MySql utilizando un conector de Python.
Los triggers en MySQL son piezas de código que se ejecutan automáticamente en respuesta a ciertos eventos, como operaciones de inserción, actualización o eliminación en una tabla. En el contexto de Change Data Capture (CDC), los triggers son herramientas para rastrear y registrar cambios en una tabla principal. El objetivo es mantener una tabla de seguimiento (tabla custom) actualizada con los cambios realizados en la tabla principal, permitiendo así un seguimiento detallado de las modificaciones.
Existen diferentes formas de implementar CDC pero en este post solo se va a compartir la generacion de Triggers.
* Implementación con Python y Docker:
Para automatizar este proceso, se va a compartir en este post un instructivo y, por ultimo, un repositorio donde se utiliza Python junto con Docker para levantar un servicio de MySQL y ejecutar un script que realice las siguientes tareas:
Para comenzar a construir el entorno, en primer lugar, se configuro el docker compose con la ultima imagen de mysql, las variables de entorno, los puertos del contenedor del entorno local y, por ultimo, los volumenes:
version: '3.8'
services:
mysql:
image: mysql:latest
restart: always
environment:
MYSQL_ROOT_PASSWORD: ${PASS}
ports:
- '3306:3306'
volumes:
- type: "bind"
source: "${WORKING_DIR}/data"
target: "/var/lib/mysql" #volumes del file system de la db
Una vez instanciada la imagen podemos crear un entorno virtual para instalar las librerías necesarias para interactuar con los objetos de la base de datos y manipular los datos. El archivo requirements.txt contiene dichas librerias.
Los pasos completos serian los siguientes:
Crear un archivo.env
en el directorio del proyecto con la variable de entorno WORKING_DIR
Levantar el servicio de MySql utilizando docker compose up -d
(En modo detach para poder seguir utilizando la misma terminal)
Instalar el entorno virtual con las dependencias.
Para correr el programa se utilizo Python 3.9.0. En primer lugar se importan las librerías necesarias para ejecutar el Script que son las siguientes:
La clase principal que se encuentra en el script main.py
es TriggerProcess. Esta clase inicia el constructor con las siguientes variables:
class TriggerProcess():
def __init__(self,
str,
host: str,
user: str,
password: str) -> None:
port:
self.host = host
self.user = user
self.password = password
self.port = port
self.date = self.getDateNow()
self.conn = None
self.cursor = None
self.dbname = 'database_prueba'
self.tablename = 'clientes'
self.tablecustom = self.tablename + '_custom'
self.triggername = self.tablename + '_trigger'
self.triggerinsert = self.triggername + '_insert'
self.triggerupdate = self.triggername + '_update'
self.initialRows = 10 # rows iniciales a insertar
self.insertRows = 30 # 30 rows mas que las rows ya existentes
self.pkupdate = 1
self.pkinsert = 999999
self.faker = Faker()
Ejecucion del Script:
Una vez iniciado el Script se establece la conexión y el cursor en las variables del constructor:
def connect(self):
try:
self.conn = mysql.connector.connect(
=self.user,
user=self.password,
password=self.host,
host=str(self.port)
port
)self.cursor = self.conn.cursor()
except Exception as e:
print(
f"Error: {logger.error(self.bcolors.RED+ str(e) +self.bcolors.RESET)}"
)
Se crea la tabla principal y el schema:
self.connect()
self.createSchema()
self.createTable()
self.infoLogger(message=f"Ok Schema y Table: {self.dbname}.{self.tablename}")
1)
time.sleep(
self.cursor.execute(f"""USE {self.dbname};""")
Se obtienen los datos iniciales para insertar en la tabla principal:
= self.getInitialData(self.initialRows)
data self.infoLogger(message=f"Insertando {str(self.initialRows)} filas iniciales..")
self.insertData(data)
1) time.sleep(
Luego se insertan 30 nuevos registros a partir del Max ID ingresado anteriormente:
= self.getNewData(maxRows=int(self.insertRows ))
newData self.infoLogger(message=f"Total de filas a ingresar : {len(newData)}")
self.insertData(newData)
Luego creamos la tabla custom que se va a actualizar con el Trigger:
def createCustomTable(self) -> None:
self.cursor.execute(f"""DROP TABLE IF EXISTS {self.dbname}.{self.tablecustom};""")
self.cursor.execute(
f"""CREATE TABLE IF NOT EXISTS
{self.dbname}.{self.tablecustom} (
pk INT,
operacion VARCHAR(50),
ultima_actualizacion
DATETIME DEFAULT
CURRENT_TIMESTAMP
);"""
)
self.createCustomTable()
Creamos los Triggers:
def createTriggers(self) -> None:
self.cursor.execute(
f"""DROP TRIGGER IF EXISTS {self.triggerinsert};"""
)self.cursor.execute(
f"""CREATE TRIGGER {self.triggerinsert}
AFTER INSERT ON {self.tablename}
FOR EACH ROW
BEGIN
INSERT INTO {self.tablecustom}
(pk, operacion, ultima_actualizacion)
VALUES
(NEW.id, 'insert', NOW());
END"""
)
self.cursor.execute(
f"""DROP TRIGGER IF EXISTS {self.triggerupdate};"""
)self.cursor.execute(
f"""CREATE TRIGGER {self.triggerupdate}
AFTER UPDATE ON {self.tablename}
FOR EACH ROW
BEGIN
INSERT INTO {self.tablecustom}
(pk, operacion, ultima_actualizacion)
VALUES
(NEW.id, 'update', NOW());
END"""
)
self.createTriggers()
Creamos una row con datos ficticios para ingresar a la tabla principal utilizando el self.pkinsert seteado anteriormente y también una row para actualizar el self.pkupdate.
= self.getData()
name, email, address dict = {
'id' : self.pkinsert,
'name' : name,
'email' : email,
'address' : address
}= [dict]
newData self.infoLogger(message=f"Total de filas a ingresar : {len(newData)}")
self.insertData(newData)
= f"UPDATE {self.dbname}.{self.tablename} SET name = %s WHERE id = %s;"
query = ('newname', self.pkupdate)
valores self.cursor.execute(query, valores)
Luego realizamos un Inner Join entre la tabla principal y la custom para levantar los registros que se actualizaron y/o insertaron:
Se realiza un Gropy By y un MAX en ultima_actualizacion para solo obtener los últimos PK ingresados en la tabla custom. Hay que tener en cuenta que esto es un proceso batch y durante una ventana temporal el mismo PK puede tener un insert, luego un update y, por ultimo, un delete, por lo tanto, necesitamos levantar el ultimo estado del dato que en este caso seria un delete. En el ejemplo del Script solo se están utilizando operaciones insert y delete pero de todas formas este caso hay que tenerlo en cuenta.
= f"""SELECT c.*, cc.operacion, cc.ultima_actualizacion
queryinner FROM {self.dbname}.{self.tablename} c
INNER JOIN (
SELECT
pk, operacion, MAX(ultima_actualizacion) AS ultima_actualizacion
FROM {self.dbname}.{self.tablename}_custom
GROUP BY pk, operacion
) cc ON c.id = cc.pk;"""
self.cursor.execute(queryinner)
= self.cursor.fetchall()
inner_rows for fila in inner_rows:
print(fila)
Probamos la funcionalidad de la agrupación y el MAX date para ver si nos devuelve la ultima actualización:
Actualizamos el PK de antes:
= f"UPDATE {self.dbname}.{self.tablename} SET name = %s WHERE id = %s;"
query = ('newname_2', self.pkupdate)
valores self.cursor.execute(query, valores)
Observamos nuevamente la tabla principal utilizando el Inner Join y podemos ver que la agrupación hecha en la tabla custom (que a esta altura cuenta con tres registros, un insert y dos update para el {self.pkupdate}) nos devuelve solo dos registros que se unen con la tabla principal:
La implementación de triggers en MySQL para el Change Data Capture ofrece una solución automatizada para rastrear cambios en una tabla de una base de datos. Aunque esta estrategia presenta ventajas significativas, como la automatización también conlleva desafíos, como la complejidad y el posible overhead. En última instancia, la elección de utilizar triggers para CDC en MySQL dependerá de la naturaleza y requisitos específicos del proyecto, sopesando cuidadosamente los beneficios y desventajas asociados.
Los Pros y Contras de esta estrategia son los siguientes:
Captura Automática: CDC con triggers automatiza la captura de cambios, eliminando la necesidad de seguimiento manual.
Retrocompatibilidad: Facilita la identificación de cambios históricos y la restauración de datos antiguos.
Complejidad: Implementar y mantener triggers puede resultar complejo y requiere un conocimiento profundo del sistema.
Overhead: Los triggers pueden introducir un cierto overhead en la base de datos, especialmente en operaciones de alta concurrencia.
Posible Desincronización: En casos de fallos o interrupciones, puede ocurrir desincronización entre la tabla principal y la tabla custom.