目录
- 1、原始需求
- 2、解决方案
- 3、canal介绍、安装
- canal的工作原理
- 架构
- 安装
- 4、验证
1、原始需求
既要同步原始全量数据,也要实时同步mysql特定库的特定表增量数据,同时对应的修改、删除也要对应。
数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。
应用场景:数据etl同步、降低业务服务器压力。
2、解决方案
3、canal介绍、安装
canal是阿里巴巴旗下的一款开源项目,纯java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariadb)。
工作原理:mysql主备复制实现
从上层来看,复制分成三步:
- master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
- slave将master的binary log events拷贝到它的中继日志(relay log);
- slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理
原理相对比较简单:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
架构
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventparser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventsink (parser和store链接器,进行数据过滤,加工,分发的工作)
- eventstore (数据存储)
- metamanager (增量订阅&消费信息管理器)
安装
1、mysql、kafka环境准备
2、canal下载:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz
4、对目录conf里文件参数配置
对canal.properties配置:
进入conf/example里,对instance.properties配置:
5、启动:bin/startup.sh
6、日志查看:
4、验证
1、开发对应的kafka消费者
package org.kafka;
import java.util.arrays;
import java.util.properties;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.apache.kafka.common.serialization.stringdeserializer;
/**
*
* title: kafkaconsumertest
* description:
* kafka消费者 demo
* version:1.0.0
* @author pancm
* @date 2018年1月26日
*/
public class kafkaconsumertest implements runnable {
private final kafkaconsumer<string, string> consumer;
private consumerrecords<string, string> msglist;
private final string topic;
private static final string groupid = "groupa";
public kafkaconsumertest(string topicname) {
properties props = new properties();
props.put("bootstrap.servers", "192.168.7.193:9092");
props.put("group.id", groupid);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", stringdeserializer.class.getname());
props.put("value.deserializer", stringdeserializer.class.getname());
this.consumer = new kafkaconsumer<string, string>(props);
this.topic = topicname;
this.consumer.subscribe(arrays.aslist(topic));
}
@override
public void run() {
int messageno = 1;
system.out.println("---------开始消费---------");
try {
for (; ; ) {
msglist = consumer.poll(1000);
if (null != msglist && msglist.count() > 0) {
for (consumerrecord<string, string> record : msglist) {
//消费100条就打印 ,但打印的数据不一定是这个规律的
system.out.println(messageno + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
// string v = decodeunicode(record.value());
// system.out.println(v);
//当消费了1000条就退出
if (messageno % 1000 == 0) {
break;
}
messageno++;
}
} else {
thread.sleep(11);
}
}
} catch (interruptedexception e) {
e.printstacktrace();
} finally {
consumer.close();
}
}
public static void main(string args[]) {
kafkaconsumertest test1 = new kafkaconsumertest("sample-data");
thread thread1 = new thread(test1);
thread1.start();
}
/*
* 中文转unicode编码
*/
public static string gbencoding(final string gbstring) {
char[] utfbytes = gbstring.tochararray();
string unicodebytes = "";
for (int i = 0; i < utfbytes.length; i++) {
string hexb = integer.tohexstring(utfbytes[i]);
if (hexb.length() <= 2) {
hexb = "00" + hexb;
}
unicodebytes = unicodebytes + "\\u" + hexb;
}
return unicodebytes;
}
/*
* unicode编码转中文
*/
public static string decodeunicode(final string datastr) {
int start = 0;
int end = 0;
final stringbuffer buffer = new stringbuffer();
while (start > -1) {
end = datastr.indexof("\\u", start + 2);
string charstr = "";
if (end == -1) {
charstr = datastr.substring(start + 2, datastr.length());
} else {
charstr = datastr.substring(start + 2, end);
}
char letter = (char) integer.parseint(charstr, 16); // 16进制parse整形字符串。
buffer.append(new character(letter).tostring());
start = end;
}
return buffer.tostring();
}
}
2、对表bak1进行增加数据
create table `bak1` ( `vin` varchar(20) not null, `p1` double default null, `p2` double default null, `p3` double default null, `p4` double default null, `p5` double default null, `p6` double default null, `p7` double default null, `p8` double default null, `p9` double default null, `p0` double default null ) engine=innodb default charset=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1` , `p2` , `p3` , `p4` , `p5` , `p6` , `p7` , `p8` , `p9` , `p0` from moci limit 10
3、查看输出结果:
到此这篇关于mysql特定表全量、增量数据同步到消息队列-解决方案的文章就介绍到这了,更多相关mysql特定表数据同步内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!