logo
发布于

debezium根据字段发布到不同的topic

2682-–
作者
  • avatar
    姓名
    zhli

最近项目中有这样的需求:需要根据表中的某个字段,发cdc事件发布到不同的Topic,以便每个端独立去消费这些Topic。 Debezium本身带了一个Content-based routing的转换器, 不过启用它需要先安装groovy插件。

  1. 安装debezium_scripting和groovy
ENV DEBEZIUM_VERSION="2.3.2.Final" \         
    SCRIPTING_MD5=0726417249e59e375404c1b670e9a237

RUN docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5"

ENV MAVEN_DEP_DESTINATION=$KAFKA_HOME/external_libs/debezium-scripting/

RUN \
    docker-maven-download central org/codehaus/groovy groovy 3.0.7 0035721b2c7c7a9a0e4fdf7ee3d615cc && \
    docker-maven-download central org/codehaus/groovy groovy-jsr223 3.0.7 e0a6d719027ce212a118f1b84ee015b3 && \
    docker-maven-download central org/codehaus/groovy groovy-json 3.0.7 ae6400d80b2bffcbe5438c55845da534
  1. 启用前需要设置一个环境变量ENABLE_DEBEZIUM_SCRIPTING=true
debezium:    
    image: xx
    container_name: debezium
    restart: unless-stopped
    ports:
    - 8083:8083
    environment:
    - BOOTSTRAP_SERVERS=kafka:9004
    - GROUP_ID=group_id
    - CONFIG_STORAGE_TOPIC=debezium.connect_configs
    - OFFSET_STORAGE_TOPIC=debezium.connect_offsets
    - STATUS_STORAGE_TOPIC=debezium.connect_status
    - ENABLE_DEBEZIUM_SCRIPTING=true
  1. 修改connector的配置。其中value.after.xxx获取的是表的xxx字段值。
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "'debezium.' + value.after.xxx + '.' + value.source.db + '.' + value.source.table"
  1. 通过上面的配置,生成的kafka topic会安装表中的xxx字段分组。一般需要每个业务表都有这个xxx字段