最新电子产品网站模板/今天株洲最新消息
需求:高密网和低密网之间的mysql文件落地同步。 分析:解决不同网段之间的数据通讯可以采用光闸或者网闸。
解决mysql之间的同步则可以采用canal。利用canal生成数据库变化的sql落地成相应文件,让其被交换至相对网段的某个目录下。canal客户端读取该目录下的文件,执行目录下文件的sql完成数据同步。
1. canal简介:
canal是阿里巴巴的基于数据库增量日志解析,提供增量数据订阅&消费的一个开源项目。目前主要支持mysql、oracle数据库。
canal的工作原理:
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。
mysql master收到dump请求,开始推送binary log给slave(也就是canal)。
canal解析binary log对象(原始为byte流
2.canal的配置 canal分为canal客户端和服务端,canal服务端由阿里提供;
1.下载canal服务端 https://github.com/alibaba/canal/releases。 2.解压
3.配置canal-server。
4.开启mysql的binlog写入功能,建议配置binlog模式为row,修改数据库配置文件my.ini,Linux下为my.cnf。
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
skip-name-resolve
expire_logs_days = 10
解释:
log-bin=mysql-bin #开启binlog
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
skip-name-resolve #默认安装的MySql开启了DNS的反向解析。如果禁用的话就不能在MySQL的授权表中使用主机名了而只能用ip格式,防止用127.0.0.1登录,mysql对ip反向解析后用localhost登录出现权限不足,拒绝登录的错误。
expire_logs_days = x #二进制日志自动删除的天数。默认值为0,表示“没有自动删除”
修改完成后重启MySql服务,执行以下语句查看是否开启binlog写入功能。
SHOW VARIABLES LIKE "log_%";
如下图所示,若log_bin为ON,则binlog写入功能已打开。
5.为数据库添加canal用户,开启做为mysql slave的相关权限,执行以下语句。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION
CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
执行完成后,执行SELECT * FROM mysql.user;查询是否添加成功,如下图所示。
6.修改配置文件instance.properties,位于canal-server/conf/example/instance.properties,红色字体的部分需要修改。
说明:canal.instance.connectionCharset代表数据库的编码方式对应到java中的编码类型,比如UTF-8,GBK
, ISO-8859-1。
7.启动canal服务器端
执行canal-server\bin下的启动脚本,windows下为startup.bat,Linux为startup.sh。windows下正常启动页面如下:
8.启动后查看日志, canal-server\logs\canal\canal.log
具体instance的日志canal-server\logs\example \example.log
以上表示正常启动。
3.canal-client 参考https://github.com/alibaba/canal/wiki/ClientExample创建canal客户端:
package com.shu.hamal.canal.instance;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import com.shu.hamal.canal.common.CanalUtility;
import com.shu.hamal.canal.common.ConfigEntity;
import com.shu.hamal.canal.common.FileIndexUtil;
import com.shu.hamal.canal.common.InitConfig;
/**
*
* 生成SQL文件线程
*
* @author shu.xiaobai
*/
public class CanalClient implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);
// private Vector obj;
public CanalClient(Vector v) {
// this.obj = v;
}
public void run() {
// synchronized (obj) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CanalUtility.INET_SOCKET_ADDRESS,
CanalUtility.INET_SOCKET_PORT), CanalUtility.CANAL_DESTINATION, CanalUtility.CANAL_USERNAME, CanalUtility.CANAL_PASSWORD);
int batchSize = 1000;
try {
long now = System.currentTimeMillis();
while (true) {
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
LOGGER.info("连接canal服务端成功...");
break;
} catch (Exception e) {
LOGGER.error("连接canal服务端失败...10s后尝试下一次连接...");
Thread.sleep(10000);
}
if ((System.currentTimeMillis() - now) > 1000 * 60 * 10) {
LOGGER.error("10min中内连接canal服务端失败...程序退出...请启动canal服务端后重启客户端...");
System.exit(-1);
}
}
LOGGER.info("canal client is running...");
while (true) {
// if (obj.size() != 0) {
// obj.wait();
// }
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
LOGGER.debug("canal client is running...");
// obj.add(new String("canal"));
// obj.notify();
Thread.sleep(1000);
} else {
try {
printEntry(message.getEntries());
} catch (Exception e) {
LOGGER.error("PrintEntry Exception" + e);
}
}
connector.ack(batchId);
}
} catch (Exception e) {
LOGGER.error("线程休眠或唤醒异常:" + e);
} finally {
connector.disconnect();
}
// }
}
private void printEntry(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
for (RowData rowData : rowChage.getRowDatasList()) {
String sql = CanalUtility.constructSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()
.getTableName(), rowChage.getEventType());
String canalSql = CanalUtility.constructCanalSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()
.getTableName(), rowChage.getEventType());
// 将语句拼接写入文件
exportSql2File(canalSql, sql, rowChage.getEventType(), entry.getHeader().getTableName(), entry.getHeader().getSchemaName());
}
}
}
/**
* 导出sql到文件
*
* @param sql
*/
private void exportSql2File(String canalSql, String sql, EventType eventType, String tableName, String databaseName) {
ConfigEntity config = InitConfig.configMap.get(databaseName);
if (config != null) {
StringBuffer sb = new StringBuffer();
sb.append(databaseName);
sb.append(CanalUtility.minus);
sb.append(tableName);
sb.append(CanalUtility.minus);
sb.append(eventType);
sb.append(CanalUtility.minus);
sb.append(canalSql);
sb.append(CanalUtility.minus);
sb.append(sql);
if (InitConfig.listMap.get(databaseName).contains(sql)) {
InitConfig.listMap.get(databaseName).remove(sql);
return;
}
FileOutputStream fos = null;
File file = new File(config.getCanalSqlDirectory().replace("/", File.separator) + File.separator + databaseName
+ CanalUtility.CANAL_FILENAME_PREFIX + FileIndexUtil.readClientIndex(config.getClientFile()) + CanalUtility.CANAL_FILENAME_SUFFIX);
try {
if (file.exists()) {
file.delete();
} else {
file.getParentFile().mkdirs();
file.createNewFile();
file.setReadable(false);
}
fos = new FileOutputStream(file);
LOGGER.info("export [" + sb.toString() + "] to " + file.getCanonicalPath());
fos.write(sb.toString().getBytes("UTF-8"));
fos.flush();
file.setReadable(true);
LOGGER.info("SQL语句 [" + sb.toString() + "] 成功写至 " + file.getCanonicalPath());
} catch (FileNotFoundException e) {
LOGGER.error(file.getName() + "不存在:" + e);
} catch (UnsupportedEncodingException e) {
LOGGER.error("字符串转字节数组异常:" + e);
} catch (IOException e) {
LOGGER.error("字节写入文件" + file.getName() + "异常:" + e);
} finally {
if (null != fos)
try {
fos.close();
} catch (IOException e) {
LOGGER.error("流关闭异常:" + e);
}
}
}
}
}
我的完整代码:https://github.com/shuxiaoabi/xiaobaiRepositiry。
注:我的canal客户端构建场景为:canal分为两个线程一个线程用于连接监听canal服务端解析binlog日志生成对应的sql语句落地成文件,该文件会被数据交换交换至跨网的另外一端某个目录下,还有一个线程用于解析交换过来的sql文件并执行该文件使其数据库完成同步(跨网域之间的中间器可采用光闸或者网闸)。
运行
1,运行canal服务端startup.bat / startup.sh
2,运行客户端程序。
这是小白写的第一次分享,如文中有错误欢迎大家指出,大家相互学习共同成长,谢谢。