[MariaDB] 테이블 변경 내용 전송(RabbitMQ)
테이블, 트리거 생성
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL
);
CREATE TABLE changes_log (
id INT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(255) NOT NULL,
column_name VARCHAR(255) NOT NULL,
new_value TEXT,
change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed TINYINT(1) DEFAULT 0
);
CREATE TRIGGER after_user_name_update
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
IF OLD.name <> NEW.name THEN
INSERT INTO changes_log (table_name, column_name, new_value)
VALUES ('users', 'name', NEW.name);
END IF;
END;
.NET 콘솔 앱 메시지 전송 예제
using MySql.Data.MySqlClient;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RabbitMQConsole
{
internal class Program
{
static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult(); // .NET Framework에서 비동기 메인 메서드를 호출하는 방법
}
static async Task MainAsync(string[] args)
{
var cs = "server=localhost;userid=root;password=0000;database=db"; // DB 정보
using (var con = new MySqlConnection(cs))
{
await con.OpenAsync();
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "dataChanges",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine("Monitoring database changes...");
while (true)
{
var processedIds = new List<int>();
using (var cmd = new MySqlCommand("SELECT * FROM changes_log WHERE processed = 0", con))
using (var rdr = await cmd.ExecuteReaderAsync())
{
while (await rdr.ReadAsync())
{
var message = $"Table: {rdr["table_name"]}, Column: {rdr["column_name"]}, New Value: {rdr["new_value"]}";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "dataChanges",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
// DBNull 처리
var idValue = rdr["id"];
if (!(idValue is DBNull))
{
processedIds.Add(Convert.ToInt32(idValue));
}
//processedIds.Add(Convert.ToInt32(rdr["id"]));
}
}
if (processedIds.Count > 0)
{
foreach (var id in processedIds)
{
using (var updateCmd = new MySqlCommand("UPDATE changes_log SET processed = 1 WHERE id = @Id", con))
{
updateCmd.Parameters.AddWithValue("@Id", id);
await updateCmd.ExecuteNonQueryAsync();
}
}
}
await Task.Delay(10000); // Wait for 10 seconds before checking again.
}
}
}
}
}
}
.NET 콘솔 앱 메시지 수신 예제
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RabbitMQReceive
{
internal class Program
{
static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "dataChanges",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "dataChanges",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
users 테이블의 name이 수정되었을 때 변경사항이 클라이언트에 전송됨
직접 데이터베이스 폴링
- 데이터베이스 부담 : 애플리케이션에서 데이터베이스를 직접 폴링할 경우, 각 폴링 요청은 데이터베이스 서버에 쿼리를 실행하고 만든다. 이는 데이터베이스의 CPU 사용률, 메모리 사용량, I/O 부하를 증가시킬 수 있으며, 특히 폴링 주기가 짧고 데이터베이스가 큰 경우 부담이 가중될 수 있다.
- 즉시성 대비 부담 : 실시간성을 높이기 위해 폴링 주기를 짧게 설정하면 설정할수록 데이터베이스 부담이 커진다. 특히 변경사항이 적은 시간에도 불필요한 폴리응로 인해 리소스가 낭비될 수 있다.
RabbitMQ 사용 시 폴링
- 비동기 메시징 처리 : RabbitMQ를 사용하는 경우, 데이터베이스 변경 사항이 발생하면 해당 변경 사항만 메시지 큐에 게시된다. 그리고 별도의 프로세스나 서비스가 이 메시지 큐를 구독하여 필요한 작업을 처리한다. 이 과정에서 데이터베이스 자체에 대한 폴링 부담은 발생하지 않으며, RabbitMQ와 같은 메시징 시스템은 이러한 메시지 기반 통신을 위해 최적화 되어 있다.
- 중앙 집중식 메시징 처리 : RabbitMQ는 메시지를 중앙에서 관리하므로, 여러 서비스나 애플리케이션이 동일한 메시지를 필요에 따라 처리할 수 있다. 이는 시스템의 분산 처리를 용이하게 하며, 데이터베이스 서버로의 직접적인 부담을 줄인다.
RabbitMQ 사용 시 데이터베이스 자체에 대한 폴링 부담이 발생하지 않는 이유
데이터베이스 변경 감지
- 데이터베이스 변경 사항을 RabbitMQ에 전송하는 통상적인 방식은 직접적인 데이터베이스 풀링이 아니다. 대신, 변경 사항을 감지하고 메시지를 RabbitMQ로 전송하는 로직이 다음 두 가지 방식 중 하나로 구현된다.
1. 트리거와 중간 스토리지 사용 : 데이터베이스 트리거를 사용하여 변경 사항이 발생할 때마다 그 정보를 중간 스토리지인(ex) 특정 로그 테이블)에 기록한다. 그 후, 별도의 애플리케이션 또는 스크립트가 이 중간 스토리지를 주기적으로 확인(폴링)하여 새로운 로그가 있을 때 RabbitMQ에 메시지를 전송한다.
이 경우, 폴링이 발생하지만 이는 데이터베이스 자체를 폴링하는 것이 아니라, 변경 로그를 담고 있는 중간 스토리지를 폴링하기 때문에 데이터베이스의 부담은 트리거에 의한 로깅 작업에 국한되며, 실제 폴링 과정에서 추가적인 부담이 발생하지는 않는다.
2. 응용 프로그램 레벨에서의 이벤트 감지 : 애플리케이션 로직 내에서 데이터 변경이 발생하면, 그 즉시 RabbitMQ에 이벤트를 전송한다. 이 방식은 데이터 변경 작업을 수행하는 동일한 애플리케이션 내에서 처리되므로, 별도의 데이터베이스 폴링이 필요하지 않다.
변경 사항을 RabbitMQ로 전송하는 과정은 비동기적입니다. 즉, 메시지는 데이터베이스의 변경 사항이 감지될 때 RabbitMQ의 큐에 즉시 게시됩니다. 이 메시지는 나중에 소비자(Consumer)에 의해 비동기적으로 처리될 수 있으며, 이 과정에서 데이터베이스 자체에 추가적인 폴링 작업이나 쿼리 부하가 발생하지 않습니다.