canal认识

风杀开发工具约 545 字大约 2 分钟

https://github.com/alibaba/canalopen in new window

mysql配置修改

vi /home/mysql/mysql_01/my.cnf

[mysql]
log-bin=mysql-bin #添加这一行就 ok   
binlog-format=ROW #选择 row 模式    
server_id=1       #配置 mysql replaction 需要定义,不能和canal的slaveId重复

安装canal

参考https://github.com/alibaba/canal/wiki/AdminGuideopen in new window

下载canal安装包https://github.com/alibaba/canal/releases下载最新的编译过的open in new window

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gzopen in new window

解压文件

tar xf canal.deployer-1.1.5.tar.gz

复制文件夹

cp -r canal /home/canal/canal_01

修改配置文件

vi /home/canal/canal_01/conf/example/instance.properties

#mysql地址
canal.instance.master.address=192.168.56.77:3306
#用户名
canal.instance.dbUsername=copy
#密码
canal.instance.dbPassword=copy888
#指定编码方式
canal.instance.connectionCharset=UTF-8

启动canal

/home/canal/canal_01/bin/startup.sh

安装canal.adapter

参考地址https://github.com/alibaba/canal/wiki/ClientAdapteropen in new window

下载canal安装包https://github.com/alibaba/canal/releases下载最新的编译过的open in new window

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gzopen in new window

解压文件

tar xf canal.adapter-1.1.5.tar.gz

修改adapter配置文件

vim /home/canal/canal_adapter/conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 192.168.56.77:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS: 
          url: jdbc:mysql://192.168.56.77:3306/canal_demo?useUnicode=true
          username: copy
          password: copy888
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7
        hosts: 192.168.56.77:9200 # 127.0.0.1:9200 for rest mode
        key: es7key
        properties:
          mode: rest # or rest
          #          # security.auth: test:123456 #  only used for rest mode
          cluster.name: my-application
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

在/home/canal/canal_adapter/conf/es7/目录下新建一个配置文件user_info.yml

vi /home/canal/canal_adapter/conf/es7/user.yml

dataSourceKey: defaultDS
outerAdapterKey: es7key
destination: example
groupId: g1
esMapping:
  _index: index_canal3
  _id: _id
  _type: _doc
  sql: "select id as _id,id,name,title from user"
  commitBatch: 3000

启动canal_adapter进行同步

/home/canal/canal_adapter/bin/startup.sh

curl http://localhost:8081/etl/es7/es7key/user.ymlopen in new window -X POST

curl http://localhost:8081/etl/es7/user.ymlopen in new window -X POST

上次编辑于:
贡献者: 风杀