1. 开启binlog
- 修改mysql配置文件my.cnf,添加如下内容:
[mysqld]
# 前面还有其他配置
# 添加的部分
server-id = 12345
log-bin = mysql-bin
# 必须为ROW
binlog_format = ROW
# 必须为FULL,MySQL-5.7后才有该参数
binlog_row_image = FULL
expire_logs_days = 10
- 重启mysql
systemctl restart mysqld
- 检查binlog
show variables like '%log_bin%'; 运行后应该显示 log_bin 是 ON
show variables like 'binlog_format'; 运行后应该显示 binlog_format 是 ROW
2. 依赖导入
<properties>
<java.version>11</java.version>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<!-- 可以将依赖打到jar包中 -->
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3. DataStream方式
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamExample {
public static void main(String[] args) throws Exception {
// 1.获取Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.通过FlinkCDC构建SourceFunction
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("10.0.0.12")
.port(3306)
.username("root")
.password("123456")
.databaseList("db_inventory_cdc") // 订阅的库
.tableList("db_inventory_cdc.tb_products_cdc") // 订阅的表
.deserializer(new JsonDebeziumDeserializationSchema()) //反序列化
//initial 当启动时都数据库,可以读历史数据
//earliest 从binlog开始读
//latest //从binlog末尾读.
.startupOptions(StartupOptions.initial())
.build();
// 3. 数据打印
env.addSource(sourceFunction).print();
// 4. 启动任务
env.execute();
}
}
- 启动:
./bin/flink run -m hadoop102:8081 -c com.tianyi.FlinkCDC ./flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
4. FlinkSQL方式
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLExample {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 创建 Flink-MySQL-CDC 的 Source
tableEnv.executeSql("CREATE TABLE flink_sql (" +
" id STRING primary key," +
" name STRING," +
" sex STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'hostname' = '10.0.0.12'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'db_inventory_cdc'," +
" 'table-name' = 'flink_sql'" +
")");
//3. 查询数据并转换为流输出
Table table = tableEnv.sqlQuery("select * from flink_sql");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
retractStream.print();
env.execute();
}
}
5. 两种方式对比
DataStream 在 Flink1.12 和 1.13 都可以用,而 FlinkSQL 只能在 Flink1.13 使用。
DataStream 可以同时监控多库多表,而 FlinkSQL 只能监控单表。
参考网址:
Flink-CDC实践(含实操步骤与截图)
Flink CDC2.0快速上手demo示例(Jar对接,非SQL)
Flink示例——Flink-CDC文章来源:https://uudwc.com/A/yeq6
Flink-CDC 2.0学习文章来源地址https://uudwc.com/A/yeq6