Pirobits
  

Message queue using MySQL: SELECT * FOR UPDATE

alberto avatar Alberto Sola · 4/29/2024 · 6 min

Writing a queuing system does not have to involve using complex software or writing hundreds of lines of code. In this post I want to explain how using a simple MySQL table and a terminal you can make a queuing system, which you can then implement in any language.

Working in my side-projects I want to implement some functionality with which I have to execute asynchronous tasks, and taking into account that in these I work alone, the less complexity the solution has the better, because I can iterate faster and I have less things to maintain.

That's why I have been analyzing the different methods to create a queuing system and I want to experiment with MySQL itself. In my professional experience I have worked with queuing systems using tools like Redis, RabbitMQ or cloud solutions like Amazon SQS (Simple-Queue-Service).

A few months ago I discovered the functionality offered by MySQL SELECT FOR UPDATE, which basically allows you to lock the rows of a table while you are in a transaction, so that in another transaction you can execute the statement SELECT FOR UPDATE SKIP LOCKED so that you can select other rows that are not locked.

And with this very simple we have a queuing system that allows multiple consumers.

If you only have one consumer, you will not have concurrency problems, but I wanted to experiment and learn how to use this feature of MySQL, and why not create a video and tell you about it.

First we are going to see a simple theoretical example, and then I will set up a lab to test it.

What is a message queue?

A message queue is a form of asynchronous communication between multiple processes that need to transmit different data to each other. It is commonly used in service or serverless architectures. In this way a system adds a message to the queue that will persist until another system consumes and processes it. There can be different patterns here, such as guaranteeing that the message is delivered at least once, there can be queues that guarantee FIFO (First-in First-out), other systems can deliver a message in multiple queues...

The main advantage of using message queues is to decouple different systems, for example the sending of an email from the process of registering a user. If the mailing platform goes down, you don't want to have downtime and users not being able to register. They can also be useful when you want to process multiple operations in batches, so that you avoid peaks in the use of your platform and the service can be degraded.

There are many tools to work with message queues: RabbitMQ, Amazon SQS, some more complex ones for event streams such as Apache Kafka or Amazon Kinesis, but really you can even use an in-memory buffer taking advantage of tools such as Redis, or even MySQL as you will see below.

Lab

First of all we are going to create a lab, for this we need a MySQL server.

In my case I will use Docker to manage the MySQL server, but you can use a local MySQL or use any cloud provider.

docker run --name mysql -e 'MYSQL_ROOT_PASSWORD=12341234' --port 3306:3306 mysql

Now we use a mysql client to launch some queries. You can use some client with interface or the terminal itself.

In my case I am going to use a terminal to open the MySQL command line client to connect to our server (I find it more comfortable to do this lab with two terminals):

mysql -h 127.0.0.1 -u root -p

If you don't have MySQL installed on your premises, you can run a client in the terminal from a Docker container:

docker run --rm -it --network host mysql /bin/sh

The next thing is to create a tasks table that contains the minimum structure needed to manage asynchronous tasks as an example. I also take the opportunity to add some data to test with:

CREATE TABLE IF NOT EXISTS tasks (
  id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  status ENUM('pending', 'done') NOT NULL DEFAULT 'pending',
  type VARCHAR(255) NOT NULL,
  payload JSON NOT NULL,
  createdAt DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updatedAt DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);


INSERT INTO tasks (type, payload) VALUES ('email', '{"recipient": "[email protected]", "subject": "Important reminder"}');
INSERT INTO tasks (type, payload, status) VALUES ('counter', '{"count": 10}', 'done');
INSERT INTO tasks (type, payload) VALUES ('log', '{"messages": ["Error occurred", "System restarted"]}');
INSERT INTO tasks (type, status, payload) VALUES ('notification', 'done', '{}');
INSERT INTO tasks (type, payload) VALUES ('user_data', '{"name": "Jane", "age": 30}');

With this you can experiment the different options that I put at the beginning of the post. In the video I launch the different casuistries and create a script to process them in Node.js.

Testing the task queue

If we execute the following line in two terminals we will get the same result, which is not really what we want because an element could be processed two or more times.

SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE;

This is because this statement needs to be executed within a transaction, so we can modify the statement to this one:

START TRANSACTION;
SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE;
...

And now in another terminal, we execute the same statement and voilà, it crashes because SELECT FOR UPDATE performs a lock on the elements it selects.

What is the solution? Add SKIP LOCKED so that it selects all the elements that are not locked.

START TRANSACTION;
SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE SKIP LOCKED;
...

Now yes, in this other terminal we can see how it has selected new data. Finally remember that this is an example, and when you implement the different cases you will perform business logic inside the transaction to finally update the status of the task and close the transaction with COMMIT if everything went well, or with ROLLBACK if something went wrong.

START TRANSACTION;
SELECT * FROM tasks WHERE status = "pending" LIMIT 1 FOR UPDATE SKIP LOCKED;

-- Business logic of your project

UPDATE tasks SET status = "done" WHERE id = ?;
COMMIT;

Did you find this article useful? Subscribe to my newsletter and take the first step to launch IT products faster. You will receive exclusive tips that will bring you closer to your goals.


Recent posts