当前位置: 首页 > news >正文

最新电子产品网站模板/今天株洲最新消息

最新电子产品网站模板,今天株洲最新消息,网站访问过程,深圳福田专业网站建设需求:高密网和低密网之间的mysql文件落地同步。 分析:解决不同网段之间的数据通讯可以采用光闸或者网闸。解决mysql之间的同步则可以采用canal。利用canal生成数据库变化的sql落地成相应文件,让其被交换至相对网段的某个目录下。can…

需求:高密网和低密网之间的mysql文件落地同步。          分析:解决不同网段之间的数据通讯可以采用光闸或者网闸。

解决mysql之间的同步则可以采用canal。利用canal生成数据库变化的sql落地成相应文件,让其被交换至相对网段的某个目录下。canal客户端读取该目录下的文件,执行目录下文件的sql完成数据同步。

1. canal简介:

canal是阿里巴巴的基于数据库增量日志解析,提供增量数据订阅&消费的一个开源项目。目前主要支持mysql、oracle数据库。

canal的工作原理:

原理相对比较简单:

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。

mysql master收到dump请求,开始推送binary log给slave(也就是canal)。

canal解析binary log对象(原始为byte流

2.canal的配置  canal分为canal客户端和服务端,canal服务端由阿里提供;

1.下载canal服务端 https://github.com/alibaba/canal/releases。   2.解压

3.配置canal-server。

4.开启mysql的binlog写入功能,建议配置binlog模式为row,修改数据库配置文件my.ini,Linux下为my.cnf。

[mysqld]

log-bin=mysql-bin

binlog-format=ROW

server_id=1

skip-name-resolve

expire_logs_days = 10

解释:

log-bin=mysql-bin  #开启binlog

binlog-format=ROW  #选择row模式

server_id=1  #配置mysql replaction需要定义,不能和canal的slaveId重复

skip-name-resolve  #默认安装的MySql开启了DNS的反向解析。如果禁用的话就不能在MySQL的授权表中使用主机名了而只能用ip格式,防止用127.0.0.1登录,mysql对ip反向解析后用localhost登录出现权限不足,拒绝登录的错误。

expire_logs_days = x  #二进制日志自动删除的天数。默认值为0,表示“没有自动删除”

修改完成后重启MySql服务,执行以下语句查看是否开启binlog写入功能。

SHOW VARIABLES LIKE "log_%";

如下图所示,若log_bin为ON,则binlog写入功能已打开。

5.为数据库添加canal用户,开启做为mysql slave的相关权限,执行以下语句。

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION

CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

执行完成后,执行SELECT * FROM mysql.user;查询是否添加成功,如下图所示。

6.修改配置文件instance.properties,位于canal-server/conf/example/instance.properties,红色字体的部分需要修改。

说明:canal.instance.connectionCharset代表数据库的编码方式对应到java中的编码类型,比如UTF-8,GBK

, ISO-8859-1。

7.启动canal服务器端

执行canal-server\bin下的启动脚本,windows下为startup.bat,Linux为startup.sh。windows下正常启动页面如下:

8.启动后查看日志, canal-server\logs\canal\canal.log

具体instance的日志canal-server\logs\example \example.log

以上表示正常启动。

3.canal-client  参考https://github.com/alibaba/canal/wiki/ClientExample创建canal客户端:

package com.shu.hamal.canal.instance;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.UnsupportedEncodingException;

import java.net.InetSocketAddress;

import java.util.List;

import java.util.Vector;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import com.alibaba.otter.canal.protocol.Message;

import com.shu.hamal.canal.common.CanalUtility;

import com.shu.hamal.canal.common.ConfigEntity;

import com.shu.hamal.canal.common.FileIndexUtil;

import com.shu.hamal.canal.common.InitConfig;

/**

*

* 生成SQL文件线程

*

* @author shu.xiaobai

*/

public class CanalClient implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);

// private Vector obj;

public CanalClient(Vector v) {

// this.obj = v;

}

public void run() {

// synchronized (obj) {

// 创建链接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CanalUtility.INET_SOCKET_ADDRESS,

CanalUtility.INET_SOCKET_PORT), CanalUtility.CANAL_DESTINATION, CanalUtility.CANAL_USERNAME, CanalUtility.CANAL_PASSWORD);

int batchSize = 1000;

try {

long now = System.currentTimeMillis();

while (true) {

try {

connector.connect();

connector.subscribe(".*\\..*");

connector.rollback();

LOGGER.info("连接canal服务端成功...");

break;

} catch (Exception e) {

LOGGER.error("连接canal服务端失败...10s后尝试下一次连接...");

Thread.sleep(10000);

}

if ((System.currentTimeMillis() - now) > 1000 * 60 * 10) {

LOGGER.error("10min中内连接canal服务端失败...程序退出...请启动canal服务端后重启客户端...");

System.exit(-1);

}

}

LOGGER.info("canal client is running...");

while (true) {

// if (obj.size() != 0) {

// obj.wait();

// }

Message message = connector.getWithoutAck(batchSize);

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

LOGGER.debug("canal client is running...");

// obj.add(new String("canal"));

// obj.notify();

Thread.sleep(1000);

} else {

try {

printEntry(message.getEntries());

} catch (Exception e) {

LOGGER.error("PrintEntry Exception" + e);

}

}

connector.ack(batchId);

}

} catch (Exception e) {

LOGGER.error("线程休眠或唤醒异常:" + e);

} finally {

connector.disconnect();

}

// }

}

private void printEntry(List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

}

for (RowData rowData : rowChage.getRowDatasList()) {

String sql = CanalUtility.constructSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()

.getTableName(), rowChage.getEventType());

String canalSql = CanalUtility.constructCanalSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()

.getTableName(), rowChage.getEventType());

// 将语句拼接写入文件

exportSql2File(canalSql, sql, rowChage.getEventType(), entry.getHeader().getTableName(), entry.getHeader().getSchemaName());

}

}

}

/**

* 导出sql到文件

*

* @param sql

*/

private void exportSql2File(String canalSql, String sql, EventType eventType, String tableName, String databaseName) {

ConfigEntity config = InitConfig.configMap.get(databaseName);

if (config != null) {

StringBuffer sb = new StringBuffer();

sb.append(databaseName);

sb.append(CanalUtility.minus);

sb.append(tableName);

sb.append(CanalUtility.minus);

sb.append(eventType);

sb.append(CanalUtility.minus);

sb.append(canalSql);

sb.append(CanalUtility.minus);

sb.append(sql);

if (InitConfig.listMap.get(databaseName).contains(sql)) {

InitConfig.listMap.get(databaseName).remove(sql);

return;

}

FileOutputStream fos = null;

File file = new File(config.getCanalSqlDirectory().replace("/", File.separator) + File.separator + databaseName

+ CanalUtility.CANAL_FILENAME_PREFIX + FileIndexUtil.readClientIndex(config.getClientFile()) + CanalUtility.CANAL_FILENAME_SUFFIX);

try {

if (file.exists()) {

file.delete();

} else {

file.getParentFile().mkdirs();

file.createNewFile();

file.setReadable(false);

}

fos = new FileOutputStream(file);

LOGGER.info("export [" + sb.toString() + "] to " + file.getCanonicalPath());

fos.write(sb.toString().getBytes("UTF-8"));

fos.flush();

file.setReadable(true);

LOGGER.info("SQL语句 [" + sb.toString() + "] 成功写至 " + file.getCanonicalPath());

} catch (FileNotFoundException e) {

LOGGER.error(file.getName() + "不存在:" + e);

} catch (UnsupportedEncodingException e) {

LOGGER.error("字符串转字节数组异常:" + e);

} catch (IOException e) {

LOGGER.error("字节写入文件" + file.getName() + "异常:" + e);

} finally {

if (null != fos)

try {

fos.close();

} catch (IOException e) {

LOGGER.error("流关闭异常:" + e);

}

}

}

}

}

我的完整代码:https://github.com/shuxiaoabi/xiaobaiRepositiry。

注:我的canal客户端构建场景为:canal分为两个线程一个线程用于连接监听canal服务端解析binlog日志生成对应的sql语句落地成文件,该文件会被数据交换交换至跨网的另外一端某个目录下,还有一个线程用于解析交换过来的sql文件并执行该文件使其数据库完成同步(跨网域之间的中间器可采用光闸或者网闸)。

运行

1,运行canal服务端startup.bat / startup.sh

2,运行客户端程序。

这是小白写的第一次分享,如文中有错误欢迎大家指出,大家相互学习共同成长,谢谢。

http://www.jmfq.cn/news/5120389.html

相关文章:

  • 做复刻衣服买网站/站长工具网址查询
  • 深圳做分销网站/seo企业推广案例
  • 做任务的正规网站/北京学校线上教学
  • 网站建设销售培训语/账号权重查询
  • 建设网站和ipv4和ipv6什么关系/长沙百度百科
  • 网站测试的主要内容是/网上营销方式和方法
  • 曼朗策划响应式网站建设/seo软件
  • 广州企业网站设计制作/12345微信公众号
  • led灯网站策划书/青岛seo杭州厂商
  • 电子商务网站建设课件/网络广告名词解释
  • 帮忙做网站的协议/北京营销推广公司
  • 直播网站建设项目策划书/天琥设计培训学校官网
  • 程序员做外包网站/今日新闻10条简短
  • 山海关网站制作/seo服务的内容
  • 网络销售怎么做自己的网站/seo每日一帖
  • 做设计有哪些好用的素材网站有哪些/seo咨询师
  • excel可以做网站吗/百度指数是干嘛的
  • 凯里市经济开发区建设局网站/关键词竞价广告
  • 广州正规网站建设公司/荆州百度推广
  • 外包开发一个app多少钱/谷歌seo是做什么的
  • 如何设计一个完整的网站/网络推广一般都干啥
  • 手表网站官网/b2b平台免费推广网站
  • 达内/seo分析案例
  • php可以做网站app吗/关键词推广优化外包
  • 长沙企业推广/企业优化推广
  • wordpress配置字体/衡水seo优化
  • 专门做中文音译歌曲的网站/广州网站推广
  • 接单做网页的网站/html网页设计模板
  • 做公司网站的好处以及优势/发软文的网站
  • 做编程网站/搜索引擎优化趋势