IT俱乐部 Java 如何利用Java实现MySQL的数据变化监听

如何利用Java实现MySQL的数据变化监听

在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要。MySQL 的 binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听。本文将介绍如何利用 binlog 监听 MySQL 数据增量,并提供基于 Java 的 Canal 实现示例。

1.binlog 简介

1.1 什么是 binlog

binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如 CREATEALTER)和 DML(数据操作语言,如 INSERTUPDATEDELETE)的日志文件,它用于:

  • 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
  • 数据恢复:通过 mysqlbinlog 工具解析 binlog 恢复数据。
  • 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。

1.2 binlog 的三种格式

binlog 格式 说明
STATEMENT 记录 SQL 语句本身
ROW 记录行数据变更(推荐)
MIXED 结合前两者,MySQL 自动判断

由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。

2. 开启 binlog 并配置 MySQL

2.1 检查 binlog 是否开启

1
SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 值为 OFF,说明 binlog 未开启。

2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)

[mysqld] 部分添加以下内容:

1
2
3
4
5
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
expire_logs_days=7

重启 MySQL:

1
2
systemctl restart mysql  # Linux
net stop mysql && net start mysql  # Windows

2.3 验证 binlog 配置

执行:

1
SHOW BINARY LOGS;

如果有 binlog 文件,如 mysql-bin.000001,说明已开启。

3. 使用 Java 监听 binlog

3.1 选择工具:Canal

阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。

3.2 Java 代码监听 binlog

引入 Maven 依赖

1
com.alibaba.ottercanal.client1.1.6

编写 Java 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
 
import java.net.InetSocketAddress;
import java.util.List;
 
public class BinlogListener {
    public static void main(String[] args) {
        // 连接 Canal
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example", "canal", "canal");
         
 
        try {
            connector.connect();
            connector.subscribe(".*\..*"); // 监听所有库表
            connector.rollback();
     
            while (true) {
                Message message = connector.getWithoutAck(100); // 获取数据
                long batchId = message.getId();
                List entries = message.getEntries();
     
                if (batchId != -1 && !entries.isEmpty()) {
                    for (CanalEntry.Entry entry : entries) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            processEntry(entry);
                        }
                    }
                }
                connector.ack(batchId); // 确认消息
            }
        } finally {
            connector.disconnect();
        }
    }
     
    private static void processEntry(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
     
            System.out.println("变更表:" + entry.getHeader().getTableName());
            System.out.println("变更类型:" + eventType);
     
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    System.out.println("删除数据:" + rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    System.out.println("新增数据:" + rowData.getAfterColumnsList());
                } else {
                    System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
                    System.out.println("更新后数据:" + rowData.getAfterColumnsList());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
}

4. 代码解析

1.创建 Canal 连接

1
2
3
CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111),
    "example", "canal", "canal");
  • 127.0.0.1:Canal 服务器地址
  • 11111:Canal 端口
  • example:Canal 实例
  • canal/canal:默认账号密码

2.获取 binlog 变更数据

1
Message message = connector.getWithoutAck(100);

getWithoutAck(100):拉取 100 条 binlog 事件。

3.解析 binlog

1
2
3
4
5
for (CanalEntry.Entry entry : entries) {
    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
        processEntry(entry);
    }
}

仅处理 ROWDATA 类型的变更,忽略事务等其他信息。

4.分类处理 INSERT、UPDATE、DELETE

1
2
3
4
5
6
7
8
if (eventType == CanalEntry.EventType.DELETE) {
    System.out.println("删除数据:" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
    System.out.println("新增数据:" + rowData.getAfterColumnsList());
} else {
    System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
    System.out.println("更新后数据:" + rowData.getAfterColumnsList());
}

总结

  • MySQL binlog 记录数据库变更,可用于监听增量数据。
  • Canal 作为 MySQL 从库解析 binlog,实现数据同步。
  • Java 代码示例 展示如何用 Canal 监听 INSERTUPDATEDELETE 操作,并解析变更数据。

这种方案适用于 分布式数据同步缓存一致性数据变更通知,是实时数据处理的重要手段。

到此这篇关于如何利用Java实现MySQL的数据变化监听的文章就介绍到这了,更多相关Java监听MySQL数据变化内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!

本文收集自网络,不代表IT俱乐部立场,转载请注明出处。https://www.2it.club/code/java/14874.html
上一篇
下一篇
联系我们

联系我们

在线咨询: QQ交谈

邮箱: 1120393934@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部