flinkCDC

1. 开启binlog

  • 修改mysql配置文件my.cnf,添加如下内容:
[mysqld]
# 前面还有其他配置
# 添加的部分
server-id = 12345
log-bin = mysql-bin
# 必须为ROW
binlog_format = ROW
# 必须为FULL,MySQL-5.7后才有该参数
binlog_row_image  = FULL
expire_logs_days  = 10
  • 重启mysql
systemctl restart mysqld
  • 检查binlog
show variables like '%log_bin%';		运行后应该显示 log_bin 是 ON	
show variables like 'binlog_format';	运行后应该显示 binlog_format 是 ROW

2. 依赖导入

    <properties>
        <java.version>11</java.version>
        <flink.version>1.13.6</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!-- 可以将依赖打到jar包中 -->
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3. DataStream方式

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataStreamExample {
    public static void main(String[] args) throws Exception {
        // 1.获取Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("10.0.0.12")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("db_inventory_cdc")                   // 订阅的库
                .tableList("db_inventory_cdc.tb_products_cdc")      // 订阅的表
                .deserializer(new JsonDebeziumDeserializationSchema())    //反序列化
                //initial 当启动时都数据库,可以读历史数据
                //earliest 从binlog开始读
                //latest //从binlog末尾读.
                .startupOptions(StartupOptions.initial())
                .build();

        // 3. 数据打印
        env.addSource(sourceFunction).print();
        // 4. 启动任务
        env.execute();
    }
}
  • 启动:
./bin/flink run -m hadoop102:8081 -c com.tianyi.FlinkCDC ./flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar

4. FlinkSQL方式


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSQLExample {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 创建 Flink-MySQL-CDC 的 Source
        tableEnv.executeSql("CREATE TABLE flink_sql (" +
                " id STRING primary key," +
                " name STRING," +
                " sex STRING" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'scan.startup.mode' = 'latest-offset'," +
                " 'hostname' = '10.0.0.12'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '123456'," +
                " 'database-name' = 'db_inventory_cdc'," +
                " 'table-name' = 'flink_sql'" +
                ")");

        //3. 查询数据并转换为流输出
        Table table = tableEnv.sqlQuery("select * from flink_sql");
        DataStream<Tuple2<Boolean, Row>> retractStream  = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        env.execute();
    }
}

5. 两种方式对比

DataStream 在 Flink1.12 和 1.13 都可以用,而 FlinkSQL 只能在 Flink1.13 使用。

DataStream 可以同时监控多库多表,而 FlinkSQL 只能监控单表。

参考网址:

​ Flink-CDC实践(含实操步骤与截图)

​ Flink CDC2.0快速上手demo示例(Jar对接,非SQL)

​ Flink示例——Flink-CDC

​ Flink-CDC 2.0学习文章来源地址https://uudwc.com/A/yeq6

原文地址:https://blog.csdn.net/McGrady_Durant/article/details/131191758

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请联系站长进行投诉反馈,一经查实,立即删除!

上一篇 2023年06月17日 18:23
Day01 项目简介&分布式基础概念 -谷粒商城
下一篇 2023年06月17日 18:23