要在Flink中讀取MySQL數據,可以使用Flink的JDBC連接器。以下是一個示例代碼,演示如何讀取MySQL數據:
```java
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
public class ReadMySQLData {
public static void main(String[] args) {
String driverClassName = "com.mysql.jdbc.Driver";
String dbURL = "jdbc:mysql://localhost:3306/your_database";
String query = "SELECT * FROM your_table";
RowTypeInfo rowTypeInfo = new RowTypeInfo(/* define your row types here */);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(driverClassName)
.setDBUrl(dbURL)
.setUsername("your_username")
.setPassword("your_password")
.setQuery(query)
.setRowTypeInfo(rowTypeInfo)
.finish();
DataStream
jdbcDataStream.print();
env.execute("Read MySQL Data");
}
}
```
請注意,上述代碼中的`driverClassName`、`dbURL`、`query`、`username`和`password`需要根據你的實際情況進行修改。同時,還需要定義`RowTypeInfo`來描述從MySQL中讀取的數據的類型。