数据同步MySQL -> Elasticsearch

news/发布时间2024/5/16 6:12:20

大家好我是苏麟,今天聊聊数据同步 .

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

MySQL =>ES(单向)

同步方式

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本 4 种方式 , 全量同步(首次)+增量同步(新数据)

1.定时任务 (推荐 : 简单)

定时任务 : 比如1分钟1次,找到 MySQL 中过去几分钟内(至少是定时周期的2 倍)发生改变的数据,然后更新到 ES.

  • 优点:简单易懂、占用资源少、不用引入第三方中间件
  • 缺点:有时间差
  • 应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
2.双写 

双写 : 写数据的时候,必须也去写 ES;更新删除数据库同理。(事务:建议先保证 MySQL写成功,如果ES 写失败了,可以通过定时任务 + 日志 +告警进行检测和修复(补偿))

  • 优点 : 不知道
  • 缺点 : 繁琐
3. Logstash

用 Logstash 数据同步管道 (一般要配合 kafka 消息队列 + beats 采集器)

  • 优点 : 用起来方便,插件多
  • 缺点 : 成本更大 : 一般要配合其他组件使用 (比如 kafka) , 维护成本 : 多维护一个组件 , 学习成本 : 学习使用
4.Canal (推荐 : 简单 , 实时性非常强)

Canal 监听 MySQL Binlog,实时同步

  • 优点 : 实时同步 , 实时性非常强
  • 缺点 :  忽略不计   (MySQL8版本可能连接失败)

定时任务

找到 MySQL 中过去几分钟内发生改变的数据,然后更新到 ES.

双向写入

写数据的时候,也去写 ES .

这个不推荐!

Logstash

传输 处理 数据的管道

文章 : Getting Started with Logstash | Logstash Reference [7.17] | Elastic

下载 : https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-windows-x86_64.zip

快速开始 : Running Logstash on Windows | Logstash Reference [7.17] | Elastic 

这里需要学习成本 , 有兴趣的小伙伴自己了解 , 这里不过多赘述 .

订阅数据库流水的同步方式 Canal

地址 : GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

原理 : 数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理canal : 帮你监听 binlog,并解析 binlog 为你可以理解的内容。 

它伪装成了 MySQL 的从节点,获取主节点给的 binlog,如图:

快速开始 : QuickStart · alibaba/canal Wiki · GitHub

windows 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.ini 文件:

Linux 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.cnf 文件:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MSQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

这里有个报错 java 找不到,修改 startup.bat 脚本为你自己的 java home

set JAVA_HOME=C:\Users\59278\.jdks\corretto-1.8.0_302 (自己MySQL路径)set PATH=%JAVA_HOME%\bin;%PATH%

Java 中引入依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>

Demo  

import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}
}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}
}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}
}}

在启动之后 修改MySQL中数据 , java控制台就能实时输出 .

这期就到这里 , 下期见 !

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/inme/3459.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

CentOS7 安装SSH

说实话&#xff0c;感觉CentOS有点难用。初始配置不是很友好&#xff0c;连自动获取IP地址都是关着的。当然&#xff0c;我这里本地用的是虚拟机。 开启获取IP 首先是获取IP地址&#xff0c;这是一些的起点。 首先使用ip addr 看看网卡地址和名称。我这边是ens33。然后打开自…

C++的vector容器->基本概念、构造函数、赋值操作、容量和大小、插入和删除、数据存取、互换容器、预留空间

#include<iostream> using namespace std; #include <vector> //vector容器构造 void printVector(vector<int>& v) { for (vector<int>::iterator it v.begin(); it ! v.end(); it) { cout << *it << " "…

[职场] 会计学专业学什么 #其他#知识分享#职场发展

会计学专业学什么 会计学专业属于工商管理学科下的一个二级学科&#xff0c;本专业培养具备财务、管理、经济、法律等方面的知识和能力&#xff0c;具有分析和解决财务、金融问题的基本能力&#xff0c;能在企、事业单位及政府部门从事会计实务以及教学、科研方面工作的工商管…

基于python社交网络大数据分析系统的设计与实现

项目&#xff1a;基于python社交网络大数据分析系统的设计与实现 摘 要 社交网络大数据分析系统是一种能自动从网络上收集信息的工具&#xff0c;可根据用户的需求定向采集特定数据信息的工具&#xff0c;本项目通过研究爬取微博网来实现社交网络大数据分析系统功能。对于采集…

2024年数学建模美赛详细总结以及经验分享

前言&#xff1a; 本文记录与二零二四年二月六日&#xff0c;正好今天是数学建模结束&#xff0c;打算写篇文章记录一下整个过程&#xff0c;以及一些感受、还有经验分享。记录这个过程的原因就是我在赛前&#xff0c;在博客上找了很久&#xff0c;也没有像我这么类似记…

抛弃chatgpt,使用微软的Cursor提升coding效率

Whats Cursor? Cursor编辑器是一个基于GPT-4的代码编辑器&#xff0c;它可以根据用户的自然语言指令或者正在编辑的代码上下文为用户提供代码建议&#xff0c;支持多种编程语言&#xff0c;如Python、Java、C/C#、go等。Cursor编辑器还可以帮助用户重构、理解和优化代码&…

有趣的 Streamlit

先看一则新闻&#xff1a;「Snowflake」以8亿美元收购「Streamlit」&#xff0c;以帮助客户构建基于数据的应用程序 Streamlit 是什么&#xff1f;去年过年前好好研究学习了一番&#xff0c;的确是个很有意思的面向数据开发者的工具&#xff0c;让不懂前端&#xff0c;只懂一点…

每日五道java面试题之spring篇(三)

目录&#xff1a; 第一题 ApplicationContext和BeanFactory有什么区别&#xff1f;第二题 Spring中的事务是如何实现的&#xff1f;第三题 Spring中什么时候Transactional会失效&#xff1f;第四题 Spring容器启动流程是怎样的&#xff1f;第五题 Spring Boot、Spring MVC 和 S…

Opencv实战(2)绘图与图像操作

Opencv实战(2)绘图与图像操作 指路前文&#xff1a;Opencv实战(1)读取与像素操作 三、基本绘图 文章目录 Opencv实战(2)绘图与图像操作三、基本绘图(1).line(2).rectangle(3).circle 四、图像处理(1).颜色空间1.意义2.cvtColor()3.inRange()4.适应光线 (2).形态操作1.腐蚀2.膨…

HarmonyOS学习--三方库

文章目录 一、三方库获取二、常用的三方库1. UI库&#xff1a;2. 网络库&#xff1a;3. 动画库&#xff1a; 三、使用开源三方库1. 安装与卸载2. 使用 四、问题解决1. zsh: command not found: ohpm 一、三方库获取 在Gitee网站中获取 搜索OpenHarmony-TPC仓库&#xff0c;在t…

从事游戏开发类职业岗位的任职资格

游戏开发工程师主要是指致力于游戏总体设计&#xff0c;负责游戏开发和维护工具的设计与开发工作。并配合主程序完成游戏架构的设计&#xff0c;与其他技术支持的工作。从事这项工作&#xff0c;需要掌握的知识和技能非常多&#xff0c;下面就跟着小编来一起了解一下吧。 1、任…

在Linux操作系统的ECS实例上安装Hive

目录 1. 完成hadoop安装配置2. 安装配置MySql安装配置 3. 安装Hive4. 配置元数据到MySQL5. hiveserver2服务配置文件测试 1. 完成hadoop安装配置 在Linux操作系统的ECS实例上安装hadoop 以上已安装并配置完jdk、hadoop也搭建了伪分布集群 2. 安装配置MySql 安装 下下一步…

AT7456E集成了EEPROM的显示器芯片

AT7456E 是一款集成了 EEPROM 的单通道、单色随屏显示发生器&#xff0c;集成了视频驱动器、同步分离器、视频分离开关以及 EEPROM&#xff0c;提高了系统的集成度&#xff0c;有效降低了系统成本。 优势 1.采用符合 NTSC 和 PAL 制式的 512 个用户可编程字符&#xff0c;适合…

袁庭新ES系列11节 | Elasticsearch基本查询

前言 查询操作是Elasticsearch最核心的模块之一。Elasticsearch能够达到数据的实时搜索&#xff0c;而且性能非常稳定&#xff0c;能很方便地用于对大量数据进行搜索和分析。这些都体现了Elasticsearch强大的搜索能力&#xff0c;因此关于Elasticsearch的查询知识的相关学习就…

C++多态

文章目录 多态的概念多态的定义及实现多态的构成条件虚函数虚函数重写虚函数重写的两个例外例外一例外二C11 override 和 final 重载、覆盖(重写)、隐藏(重定义)的对比 抽象类概念接口继承和实现继承 多态的原理虚函数表多态的原理动态绑定和静态绑定 单继承和多继承关系中的虚…

【办公类-16-10-02】“2023下学期 6个中班 自主游戏观察记录(python 排班表系列)

背景需求&#xff1a; 已经制作了本学期的中4班自主游戏观察记录表 【办公类-16-10-01】“2023下学期 中4班 自主游戏观察记录&#xff08;python 排班表系列&#xff09;-CSDN博客文章浏览阅读398次&#xff0c;点赞10次&#xff0c;收藏3次。【办公类-16-10-01】“2023下学…

springmvc+ssm+springboot房屋中介服务平台的设计与实现 i174z

本论文拟采用计算机技术设计并开发的房屋中介服务平台&#xff0c;主要是为用户提供服务。使得用户可以在系统上查看房屋出租、房屋出售、房屋求购、房屋求租&#xff0c;管理员对信息进行统一管理&#xff0c;与此同时可以筛选出符合的信息&#xff0c;给笔者提供更符合实际的…

Android全新UI框架之Jetpack Compose入门基础

Jetpack Compose是什么 如果有跨端开发经验的同学&#xff0c;理解和学习compose可能没有那么大的压力。简单地说&#xff0c;compose可以让Android的原生开发也可以使用类似rn的jsx的语法来开发UI界面。以往&#xff0c;我们开发Android原生页面的时候&#xff0c;通常是在xml…

操作系统--多线程的互斥、同步

一、概念 在进程/线程并发执行的过程中&#xff0c;进程/线程之间存在协作的关系&#xff0c;例如有互斥、同步的关系。 1.互斥 由于多线程执行操作共享变量的这段代码可能会导致竞争状态&#xff0c;因此我们将此段代码称为临界区&#xff08;critical section&#xff09;…

Linux(ACT)权限管理

文章目录 一、 ATC简介二、 案例1. 添加测试目录、用户、组&#xff0c;并将用户添加到组2. 修改目录的所有者和所属组3. 设定权限4. 为临时用户分配权限5. 验证acl权限 6. 控制组的acl权限 一、 ATC简介 ACL&#xff08;Access Control List&#xff0c;访问控制列表&#xf…
推荐文章