Pirobits
  

Cola de mensajes usando MySQL: SELECT * FOR UPDATE

alberto avatar Alberto Sola · 4/29/2024 · 6 min

Escribir un sistema de colas no tiene que implicar utilizar software complejo o escribir cientos de líneas de código. En este post quiero explicarte cómo utilizando una simple tabla de MySQL y una terminal puedes realizar un sistema de colas, que luego puedes implementar en cualquier lenguaje.

Trabajando en mis side-project quiero implementar cierta funcionalidad con la que tengo que ejecutar tareas asíncronas, y teniendo en cuenta que en estos trabajo yo sólo, cuánto menos complejidad tenga la solución mejor, porque puedo iterar más rápido y tengo menos cosas que mantener.

Por eso he estado analizando los diferentes métodos para crear un sistema de colas y quiero experimentar con el propio MySQL. En mi experiencia profesional he trabajado con sistemas de colas que utilizan herramientas como Redis, RabbitMQ o soluciones en la nube como Amazon SQS (Simple-Queue-Service).

Hace ya unos meses descubrí la funcionalidad que ofrece MySQL SELECT FOR UPDATE, que básicamente lo que permite es bloquear las filas de una tabla mientras estás en una transacción, de forma que, en otra transacción puedes ejecutar la sentencia SELECT FOR UPDATE SKIP LOCKED de forma que pueda seleccionar otras filas que no estén bloqueadas.

Y con esto tan sencillo tenemos un sistema de colas que permite múltiples consumidores.

Si únicamente tienes un consumidor, no vas a tener problemas de concurrencia, pero me apetecía experimentar y aprender bien a utilizar esta característica de MySQL, y de paso por qué no crear un vídeo y contártelo.

Primero vamos a ver un ejemplo teórico simple, y después, montaré un laboratorio para probarlo.

¿Qué es una cola de mensajes?

Una cola de mensajes es una forma de comunicación asíncrona entre múltiples procesos que necesitan transmitirse diferentes datos. Es muy común utilizarlo en arquitecturas de servicios o serverless. De esta forma un sistema añade un mensaje a la cola que se persistirá hasta que otro sistema lo consuma y lo procese. Aquí pueden existir diferentes patrones, como garantizar que el mensaje se entrega al menos una vez, puede haber colas que garanticen el FIFO (First-in First-out), otras sistemas pueden entregar un mensajes en múltiples colas...

La principal ventaja de utilizar las colas de mensajes es desacoplar diferentes sistemas, por ejemplo el envío de un mail del proceso de registrar un usuario. Si la plataforma de mails se cae, te aseguro que no querrás tener downtime y que los usuarios no puedan registrarse. También pueden ser de utilidad cuando quieres procesar múltiples operaciones en lotes, de forma que evites que se produzcan picos en la utilización de tu plataforma y se pueda degradar el servicio.

Herramientas para trabajar con colas de mensajes hay muchas: RabbitMQ, Amazon SQS, algunas más complejas para flujos de eventos como Apache Kafka o Amazon Kinesis, pero realmente puedes utilizar incluso un buffer en memoria aprovechando herramientas como Redis, o incluso MySQL como verás a continuación.

Laboratorio

En primer lugar vamos a crear un laboratorio, para ello necesitamos un servidor de MySQL.

En mi caso voy a utilizar Docker para gestionar el servidor de MySQL, pero puedes utilizar un MySQL local o utilizar cualquier proveedor cloud.

docker run --name mysql -e 'MYSQL_ROOT_PASSWORD=12341234' --port 3306:3306 mysql

Ahora utilizamos un cliente de mysql para lanzar algunas queries. Puedes utilizar algún cliente con interfaz o la propia terminal.

En mi caso voy a utilizar una terminal para abrir el cliente de línea de comandos de MySQL para conectarnos a nuestro servidor (me resulta más cómodo realizar este laboratorio con dos terminales):

mysql -h 127.0.0.1 -u root -p

Si no tienes MySQL instalado en tu local, puedes ejecutar un cliente en la terminal desde un contenedor de Docker:

docker run --rm -it --network host mysql /bin/sh

Lo siguiente es crear una tabla "tasks" que contiene la estructura mínima necesaria para gestionar tareas asíncronas a modo de ejemplo. Además aprovecho para añadir algunos datos con los que hacer pruebas:

CREATE TABLE IF NOT EXISTS tasks (
  id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  status ENUM('pending', 'done') NOT NULL DEFAULT 'pending',
  type VARCHAR(255) NOT NULL,
  payload JSON NOT NULL,
  createdAt DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updatedAt DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);


INSERT INTO tasks (type, payload) VALUES ('email', '{"recipient": "[email protected]", "subject": "Important reminder"}');
INSERT INTO tasks (type, payload, status) VALUES ('counter', '{"count": 10}', 'done');
INSERT INTO tasks (type, payload) VALUES ('log', '{"messages": ["Error occurred", "System restarted"]}');
INSERT INTO tasks (type, status, payload) VALUES ('notification', 'done', '{}');
INSERT INTO tasks (type, payload) VALUES ('user_data', '{"name": "Jane", "age": 30}');

Con esto puedes experimentar las diferentes opciones que puse al inicio del post. En el vídeo lanzo las distintas casuísticas y creo un script para procesarlas en Node.js.

Probando la cola de tareas

Si ejecutamos la siguiente línea en dos terminales obtendremos el mismo resultado, que realmente no es lo que queremos porque un elemento podría procesarse dos o más veces.

SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE;

Esto es porque esta sentencia necesita ejecutarse dentro de una transacción, por lo que podemos modificar la sentencia por esta otra:

START TRANSACTION;
SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE;
...

Y ahora en otra terminal, ejecutamos la misma sentencia y voilà, se queda bloqueada porque SELECT FOR UPDATE realiza un bloqueo de los elementos que selecciona.

¿Cuál es la solución? Añadir SKIP LOCKED de forma que selecciona todos los elementos que no estén bloqueados.

START TRANSACTION;
SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE SKIP LOCKED;
...

Ahora sí, en esta otra terminal podemos ver cómo ha seleccionado nuevos datos. Finalmente recuerda que esto es un ejemplo, y cuando implementes las diferentes casuísticas realizarás lógica de negocio dentro de la transacción para, finalmente, actualizar el estado de la tarea y cerrar la transacción con COMMIT si todo ha ido bien, o con ROLLBACK si algo ha ido mal.

START TRANSACTION;
SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE SKIP LOCKED;

-- Lógica de nogocio de tu proyecto

UPDATE tasks SET status = "done" WHERE id = ?;
COMMIT;

Si te ha resultado útil este artículo agradecería si te suscribes a mi newsletter. Recibirás contenido exclusivo de calidad y también me ayudarás enormemente. Cada suscripción apoya el trabajo que realizo y me permite conocer mejor los temas que te interesan, de forma que puedo mejorar los conocimientos que comparto contigo.


Posts recientes