Maxwell - 增量数据同步工具

news/发布时间2024/5/15 12:43:19

前言

        今天来学习一个新的大数据小工具 Maxwell ,它和 Sqoop 很像。Sqoop主要用于在 Hadoop (比如 HDFS、Hive、HBase 等)和关系型数据库之间进行数据的批量导入和导出,而 Maxwell 则主要用于监控数据库的变化(通过监控 binlog ),并将变化的数据以JSON格式发布到消息队列(一般是 Kafka)或 Redis 中。

        Sqoop 一般都是在特定时间(比如用 Azkaban 调度在每天0点进行作业),可以用于离线数仓的数据同步问题;但是对于实时数据分析,Sqoop 并不方便。所以这里就引出了 Maxwell 这个工具,它的特点包括轻量级、能够捕获完整的历史数据(通过bootstrap功能)以及支持断点还原(即在错误解决后可以从上次的位置继续读取数据)。然而,Maxwell只支持 json 格式,并且不能直接支持HA(高可用性)。

        这里说起 Sqoop 我又想到了 DataX ,它也是一款全量数据采集工具,和 Sqoop 很相似,但是还是有所区别:

  • Sqoop是基于Hadoop生态系统的,充分利用了MapReduce计算框架进行数据的导入导出。而DataX仅仅在运行DataX的单台机器上进行数据的抽取和加载。
  • Sqoop采用MapReduce框架,具有良好的并发性和容错性,可以在多个节点同时进行import或export操作。而DataX则是单进程的,没有并发性和容错性。
  • Sqoop主要支持在关系型数据库和Hadoop组件(如HDFS、Hive、HBase)之间进行数据迁移,但不支持在Hadoop组件之间或关系型数据库之间进行数据迁移。相比之下,DataX支持更广泛的数据迁移场景,包括关系型数据库、Hadoop组件以及其他数据源之间的数据迁移。
  • Sqoop在导入导出数据时,会生成一个MapReduce作业在Hadoop集群中运行,可以处理大规模的数据,但不具备统计和校验能力。而DataX在传输过程中可以进行数据过滤,并且可以统计传输数据的信息,对于业务场景复杂、表结构变更的情况更为适用。
  • Sqoop采用命令行的方式调用,容易与现有的调度监控方案相结合。而DataX采用XML配置文件的方式,开发运维上相对不太方便。

接下来就来学习 Maxwell ,内容不多,过两天再把 DataX 搞定。

1、MaxWell 简介

1.1 Maxwell概述

        Maxwell 是由美国 Zendesk 公司开源,用 Java 编写的 MySQL 变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi、RabbitMQ、Redis 或者其它平台的应用程序。

        官网地址:Maxwell's Daemon

1.2 Maxwell 输出数据格式

我们知道 Maxwell 是以 JSON 格式传输数据的,上面显示了不同数据库数据操作对应的 json 格式,这里是这些 json 字段的说明:

字段

解释

database

变更数据所属的数据库

table

表更数据所属的表

type

数据变更类型

ts

数据变更发生的时间

xid

事务id

commit

事务提交标志,可用于重新组装事务

data

对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据

old

对于update类型,表示修改之前的数据,只包含变更字段

2、Maxwell 原理

        Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。

2.1 MySQL二进制日志

2.1.1、什么是 binlog

        二进制日志(Binlog)是MySQL服务端非常重要的一种日志,它记录了所有的 DDL 和 DML (除了查询语句)语句,以事件的形式记录,还包含所执行的消耗的时间,MySQL 的二进制日志是事务安全类型的。Maxwell的工作原理和主从复制密切相关。

        一般开启二进制日志会有 1% 的性能损耗。二进制日志有两个最重要的场景:

  • MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给其它 salves 来达到 master-salve 数据一致的目的。
  • 通过使用 musqlbinlog 工具来进行数据恢复。

        MySQL二进制日志的存储机制很像我们之前学的Kafka 文件存储机制,它也包含两类文件:二进制索引文件(存储二进制日志文件索引,后缀为 .index)和二进制日志文件(用于记录所有 DDL 和 DML 语句,后缀为 .00000*)。

2.1.2、binlog 的开启

  • 找到 MySQL 配置文件的位置
  • linux:/etc/my.cnf (可以通过 locate my.cnf) 查找位置
  • window:\my.ini
  • 修改配置:在 [mysqlId] 区块,设置添加 log-bin=mysql-bin (这个表示 binlog 的前缀是 mysql-bin,也可以换成别的,也就是说以后生成的日志文件就是 mysq-bin.000001 格式的文件,每次 mysql 重启或者达到单个文件大小的阈值时,生成一个新的日志文件、按顺序编号)

2.1.3、binlog 的分类设置

mysql binlog 的格式共有三种:STATEMENT,MINED,ROW。

在配置文件中可以选择配置:binllog_format = statment|mixed|row

  • statement:语句级,它会记录每一次执行写操作的雨具。相比较 row 模式节省空间,但是可能产生不一致,比如 update student enroll_time = now(); 这种 SQL 如果进行数据恢复那一定是有问题的。
    • 优点:节省空间
    • 缺点:有可能造成数据不一致
  • row:行级,binlog 记录每次操作后每行记录的变化,相当于它记录的是我们每次操作后表格的内容。
    • 优点:保持数据的绝对一致性,不管什么sql,它只记录执行后的结果
    • 缺点:占用磁盘较大
  • mixed:混合级别,statement 的升级版,一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题。比如对于 SQL 中包含随机数或者时间的语句,它保存的是 SQL 执行的结果;对于 SQL 不包含随机数或者时间的语句,它保存的是 SQL 语句本身。
    • 优点:节省空间
    • 缺点:极个别情况下仍然存在不一致

综上,Maxwell 做监控分析,选择 row 模式比较合适(必须选择 row),因为我们的下游(比如 Kafka 是无法执行 sql 语句的)

2.1.4、Maxwell 和 Canal 对比

对比CanalMaxwell
语言

java

java
数据格式自由json
采集模式增量全量/增量
数据落地定制支持 kafka 等多种平台
HA支持支持

Canal 的原理和 Maxwell 都是通过监控 binlog 实现的。 

2.2 MySQL主从复制

        MySQL的主从复制,就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。

1)主从复制的应用场景如下:

(1)做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作。

(2)读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。

2)主从复制的工作原理如下:

(1)Master主库将数据变更记录,写到二进制日志(binary log)中

(2)Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

(3)Slave从库读取并执行中继日志中的事件,将改变的数据同步到自己的数据库。

2.3 Maxwell原理

        Maxwell 的原理很简单,就是将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。

3、Maxwell 部署

3.1、启用 MySQL binlog

MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启。

1)修改MySQL配置文件/etc/my.cnf

 sudo vim /etc/my.cnf

2)增加如下配置

[mysqld]#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall
#如果需要监控多个数据库 不要加逗号 直接新开一行
binlog-do-db=test

如果我们要监控除了个别数据库之外的其它所有数据库,我们可以使用 binlog-ignore-db 属性。

3)重启MySQL服务

sudo systemctl restart mysqld

4)查看是否修改完成

show variables like '%binlog%';

我们看到 binlog_format 的 Value 现在是 ROW ,说明 binlog 配置成功。

5)查看 binlog 文件

进入 /var/lib/mysql 目录,查看 MySQL 生成的 binlog 文件:

cd /var/lib/mysql
sudo ls -l | grep mysql-bin

 注:MySQL 生成的 binlog 文件初始大小一定是 514 字节。

3.2、初始化 maxwell 元数据库

(1)创建元数据库 maxwell 用于存储 maxwell 的元数据

create database maxwell;

 (2)设置 mysql 用户密码安全级别

set global validate_password_policy=0;
set global validate_password_length=4;

如果上面的命令报错,去 /etc/my.cnf 的 [mysqld] 下添加下面两行:

plugin-load-add=validate_password.so
validate-password=FORCE_PLUS_PERMANENT

然后重启 mysqld 服务

sudo systemctl restart mysqld

(3)分配一个用户账号可以操作该元数据库

GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';

(4)分配这个账号可以监控其他数据库的权限

GRANT SELECT ,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO maxwell@'%';

(5)刷新 mysql 表权限

flush privileges;

3.3、Maxwell 进程启动

3.3.1、使用命令行参数启动

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
  • user:连接 mysql 的用户(上面我们已经创建了)
  • password:连接 mysql 的用户密码
  • host: mysql 安装的主机名
  • producer:生产者模式(stdout:控制台,kafka:kafka 集群)

查看 maxwell 输出: 

 我们可以看到,我们的 insert 语句被 maxwell 转为了 json 被输出到了控制台。

3.3.2、定制化配置文件启动

 编辑配置文件 config.properties:

log_level=infoproducer=stdout
kafka.bootstrap.servers=localhost:9092# mysql login info
host=hadoop102
user=maxwell
password=123456

 启动 Maxwell:

4、Maxwell 使用

4.1、监控 MySQL 数据并在控制台打印

        上面我们已经演示过了,也就是首先在 mysql 的配置文件中指定需要打印 binlog 的数据库,然后通过命令或者配置文件来开启 Maxwell 进程:

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

接着我们对被监听的数据库的操作信息就会以 json 的格式打印出来: 

4.2、监控 MySQL 数据并输出到 Kafka

        监控 MySQL 数据并输出到 Kafka,这种需求是我们实际工作用的最多的,这也是我们离线数仓和实时数仓所必须的。

4.2.1、实例

1)启动 Zookeeper 和 Kafka

2)启动 Maxwell 监控 binlog

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell

注:这里我们制定了 Kafka 集群地址为 hadoop102:9092 ,主题为 maxwell,虽然主题并不存在,但是 kafka 会自动创建我们指定的主题

3)在 hadoop103 消费 maxwell 主题

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell

4)在 MySQL 中增加一条数据再删除: 

 可以看到我们的数据成功被 Kafka 消费,说明业务数据日志成功传输到 Kafka 。

4.2.2、Kafak 主题数据的分区控制

        上面的案例中,我们不论 MySQL 的那个数据库,哪张表,都被保存到了 Kafka 的 maxwell 主题,这样显然很乱。所以这里我们需要解决一下:
        在生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 Kafka 的一个主题,并且这个主题肯定是多分区的,为了提高并发度。那么如何控制这些数据的分区问题,那就变得至关重要,实现步骤如下:
(1)修改 maxwell 的配置文件,定制化启动 maxwell 进程

log_level=infoproducer=kafka
kafka.bootstrap.servers=hadoop102:9092# mysql login info
host=hadoop102
user=maxwell
password=123456# kafka 主题
kafka_topic=maxwell3# 默认配好的
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
# 常用的按照库分区、按照表分区,这里我们测试按照库分区
producer_partition_by=database

(2)手动创建 Kafka 多分区主题

kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic maxwell3

 查看主题信息:

这样,我们不同数据库的操作记录就会保存到 Kafka 不同的分区当中, 分区可以使得集群负载均衡,还可以提高提高读写效率。

4.3、监控 MySQL 指定表输出控制台

(1)运行 maxwell 来监控 mysql 指定表数据更新

监控 test 库下的 staff 表

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout --filter 'exclude: *.*,include:test.staff'

(2)向 staff 表中插入数据,查看 maxwell 控制台输出

(3)向 ws2 表中插入一条数据,查看

注:表 ws2 并不是我们指定监控的表

结果:maxwell 控制台并没有任何输出 

注:还可以设置 include.test.* 来表示监控 test 库下的所有表

4.4、监控 MySQL 指定表全量输出到控制台

        Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增以及变化的,但是 Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据来对 MySQL 中的某张表进行数据初始化,也就是我们常说的全量同步。具体操作如下:

        需求:将 test 库下的 staff 表中的数据全量导入到 maxwell 控制台打印。

我们可以在数据库 maxwell (这个数据库是我们初始化 maxwell 元数据库时创建的)中查看元数据表 positions 的内容: 

这里的 server_id 是我们在 mysql 的配置文件 /etc/my.cnf 中配置的;这里的 binlog_file 代表的是最新的 binlog 文件 mysql-bin.000003 ;这里的 binlog_position 的值为 2802 代表目前已经读到了第 2802 个字节的位置,意思就说我们现在是从这个偏移量之后开始监听的。

注:也可以通过 SQL:show master status; 来查看当前的 binlog 状态。

1. 向元数据表 maxwell.bootstrap 中插入数据

insert into maxwell.bootstrap(database_name,table_name) values('test','staff');

这就相当于告诉 maxwell,下一次启动 maxwell 作业时,就会调用一个初始化作业,把我们指定的这个 test 数据库下的 staff 表进行一次增量同步。

我们可以看到,通过 bootstrap 表插入数据来完成初始化,这样插入的数据是 json 中 type 字段的值是 "bootstrap-insert" 而不是 "insert"。

再次查看 bootstrap 表: 

我们可以发现,is_complete 字段的值从默认值 0 变成了 1,inserted_rows 从 0 变成了 6,后面的 started_at 和 completed_at 字段的值也不再为空了。

2. 使用 maxwell-bootstrap 脚本工具

除了上面的直接向 maxwell 元数据库中的 bootstrap 表插入数据,也可以使用 maxwell 提供的 maxwell-bootstrap 脚本,本质其实是一样的,具体看你觉得哪个省事。

bin/maxwell-bootstrap --database gmall --table user_info --config ./config.properties

这样,这个初始化作业就算完成了,当我们下次打开 maxwell 进程时,它并不会再次执行这个初始化任务,因为它已经完成过了。

总结

        至此,Maxwell 的学习也已经结束了,待会吧离线数仓项目里的 maxwell 部分完成。总体来说,这个工具还是很好用且简单的,希望有一天自己也可以用屎山堆砌出这么一个自己的框架!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/UDpP/1322.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

开发一款招聘小程序需要具备哪些功能?

随着时代的发展,找工作的方式也在不断变得简单,去劳务市场、人才市场的方式早就已经过时了,现在大多数年轻人都是直接通过手机来找工作。图片 找工作类的平台不但能扩大企业的招聘渠道,还能节省招聘的成本,方便求职者进…

Springboot项目的run debug都是灰色解决方法

IDEA下新建SpringBoot项目后,问题显示如下: 解决方法如下: 这个问题是由于缺少Configuration构建器的原因,因此: 1.点击Add Configuration 添加Spring Boot构建器,启动类选择好,点击确认即可&a…

Runtime Error while Saving a PyTorch bin Model: “File /** Cannot Be Opened“

原始保存代码为: quant_path "baichuan2_awq" torch.save(model.state_dict(), quant_path)报错如下: RuntimeError: File baichuan2_awq cannot be opened解决方法:需要保存的文件名字为"pytorch_model.bin" import…

Word docx文件重命名为zip文件,解压后直接查看和编辑

一个不知道算不算冷的知识[doge]: docx格式的文件本质上是一个ZIP文件 当把一个.docx文件重命名为.zip文件并解压后,你会发现里面包含了一些XML文件和媒体文件,它们共同构成了Word文档的内容和格式。 例如,word/document.xml文件…

Rust Vs Go:从头构建一个web服务

Go 和 Rust 之间的许多比较都强调它们在语法和初始学习曲线上的差异。然而,最终的决定性因素是重要项目的易用性。 “Rust 与 Go”争论 Rust vs Go 是一个不断出现的话题,并且已经有很多关于它的文章。部分原因是开发人员正在寻找信息来帮助他们决定下…

箱形理论在交易策略中的实战应用与优化

箱形理论,简单来说,就是将价格波动分成一段一段的方框,研究这些方框的高点和低点,来推测价格的趋势。 在上升行情中,价格每突破新高价后,由于群众惧高心理,可能会回跌一段,然后再上升…

HAL库 CubeMX STM32采用SDIO实现对SD卡和NAND Flash的读写

目录 完整项目源代码下载地址:HAL库CubeMX STM32采用SDIO实现对SD卡和NAND Flash的读写资源-CSDN文库 一、选择合适的存储芯片。 可以去雷龙官网白嫖,白嫖链接:免费样品 二、SD卡/SD NAND底层原理 三、硬件设计 1、SD NAND引脚图 2、芯片外观…

【4.3计算机网络】网络规划与设计

目录 1.网络规划2.逻辑网络设计3.物理网络设计 1.网络规划 需求分析->通信规范分析->逻辑网络设计->物理网络设计->实施阶段 2.逻辑网络设计 3.物理网络设计 例题1: 解析:选A。 例题2: 解析:选A。 例题3. 解析&am…

【数据库】哪些操作会导致索引失效

🍎个人博客:个人主页 🏆个人专栏:数据库 ⛳️ 功不唐捐,玉汝于成 目录 前言 正文 结语 我的其他博客 前言 在数据库管理中,索引的有效性对于查询性能至关重要。然而,索引可能会因为各种操…

14. UE5 RPG使用GameplayTag

GameplayTag本来是应用在GAS游戏技能系统里面的,后来UE直接将其抽离出来,作为一个模块,现在可以不在GAS里也可以使用这个模块。比如,我需要判断一个射线拾取的物体,首先我需要判断这个actor是否存在,然后判…

Sora了解资料

一、基本介绍 1.1sora 在 2024 年 2 月 16 日,Open AI 宣布推出全新的生成式人工智能模型“Sora”。据了解,通过文本指令,Sora 可以直接输出长达 60 秒的视频,并且包含高度细致的背景、复杂的多角度镜头,以及富有情感…

Spring中常见的注解

1 spring注解介绍 Spring Framework是一个开源的Java平台,广泛用于创建高性能的企业应用程序。随着Spring的发展,注解(Annotation)已经成为了配置Spring应用程序的主要方式,逐渐取代了传统的XML配置。使用注解可以使代…

9、使用 ChatGPT 的 GPT 制作自己的 GPT!

使用 ChatGPT 的 GPT 制作自己的 GPT! 想用自己的 GPT 超越 GPT ChatGPT 吗?那么让我们 GPT GPT 吧! 山姆 奥特曼利用这个机会在推特上宣传 GPTs 的同时还猛烈抨击了埃隆的格罗克。 GPTs概览 他们来了! 在上周刚刚宣布之后,OpenAI 现在推出了其雄心勃勃的新 ChatGPT…

开源数据可视化应用程序JSON Crack

什么是 JSON Crack ? JSON Crack 是一款免费的开源数据可视化应用程序,能够将 JSON、YAML、XML、CSV 等数据格式可视化为交互式图表。凭借其直观且用户友好的界面,JSON Crack 可以轻松探索、分析和理解即使是最复杂的数据结构。无论您是从事大…

数据结构--排序

数据结构--排序 1. 各类排序算法的性质2. 插入排序2.1 直接插入排序2.2 折半插入排序 3. 希尔排序4. 交换排序4.2 冒泡排序 5. 快速排序6. 选择排序6.1 简单选择排序6.2 堆排序 7. 归并排序8. 基数排序 1. 各类排序算法的性质 2. 插入排序 2.1 直接插入排序 2.2 折半插入排序 …

openai公司的chatgpt-3.5参数库内还未增加sora的语料信息

openai公司的chatgpt-3.5参数库内还未增加sora的语料信息!我想通过openai公司的chatgpt3.5来了解一下关于sora的技术信息,结果呢,它竟然回答不知道sora是什么。看来,sora的语料库信息还未来得及加入chatgpt3.5的训练模型中。 如图…

vue的十大面试题详情

1 v-show与v-if区别 v-if与v-show可以根据条件的结果,来决定是否显示指定内容&#xff1a; v-if: 条件不满足时, 元素不会存在. v-show: 条件不满足时, 元素不会显示(但仍然存在). <div id"app"><button click"show !show">点我</but…

计算机视觉基础【OpenCV轻松入门】:获取图像的ROI

OpenCV的基础是处理图像&#xff0c;而图像的基础是矩阵。 因此&#xff0c;如何使用好矩阵是非常关键的。 下面我们通过一个具体的实例来展示如何通过Python和OpenCV对矩阵进行操作&#xff0c;从而更好地实现对图像的处理。 ROI&#xff08;Region of Interest&#xff09;是…

【Spring】循环依赖

目录标题 什么是循环依赖循环依赖场景Java SE 演示Spring 容器演示三级缓存核心知识三级缓存四大方法三级缓存中的迁移 三级缓存源码分析源码思维导图 源码图例课前问题推荐阅读 循环依赖是什么&#xff1f;循环依赖的场景有哪一些&#xff1f;三级缓存分别是什么&#xff1f;三…

HarmonyOS4.0系统性深入开发34栅格布局(GridRow/GridCol)

栅格布局&#xff08;GridRow/GridCol&#xff09; 概述 栅格布局是一种通用的辅助定位工具&#xff0c;对移动设备的界面设计有较好的借鉴作用。主要优势包括&#xff1a; 提供可循的规律&#xff1a;栅格布局可以为布局提供规律性的结构&#xff0c;解决多尺寸多设备的动态…
推荐文章