- 发布于
debezium根据字段发布到不同的topic
2682-–
- 作者
- 姓名
- zhli
最近项目中有这样的需求:需要根据表中的某个字段,发cdc事件发布到不同的Topic,以便每个端独立去消费这些Topic。 Debezium本身带了一个Content-based routing的转换器, 不过启用它需要先安装groovy插件。
- 安装debezium_scripting和groovy
ENV DEBEZIUM_VERSION="2.3.2.Final" \
SCRIPTING_MD5=0726417249e59e375404c1b670e9a237
RUN docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5"
ENV MAVEN_DEP_DESTINATION=$KAFKA_HOME/external_libs/debezium-scripting/
RUN \
docker-maven-download central org/codehaus/groovy groovy 3.0.7 0035721b2c7c7a9a0e4fdf7ee3d615cc && \
docker-maven-download central org/codehaus/groovy groovy-jsr223 3.0.7 e0a6d719027ce212a118f1b84ee015b3 && \
docker-maven-download central org/codehaus/groovy groovy-json 3.0.7 ae6400d80b2bffcbe5438c55845da534
- 启用前需要设置一个环境变量
ENABLE_DEBEZIUM_SCRIPTING=true
debezium:
image: xx
container_name: debezium
restart: unless-stopped
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=kafka:9004
- GROUP_ID=group_id
- CONFIG_STORAGE_TOPIC=debezium.connect_configs
- OFFSET_STORAGE_TOPIC=debezium.connect_offsets
- STATUS_STORAGE_TOPIC=debezium.connect_status
- ENABLE_DEBEZIUM_SCRIPTING=true
- 修改connector的配置。其中
value.after.xxx
获取的是表的xxx
字段值。
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "'debezium.' + value.after.xxx + '.' + value.source.db + '.' + value.source.table"
- 通过上面的配置,生成的kafka topic会安装表中的
xxx
字段分组。一般需要每个业务表都有这个xxx
字段