前言
Flink实时数据开发中,遇到一个问题需求,主体数据来源是来自kafka的数据,但是实际的需求中,会需要别的数据库中的数据。
- 例如:在kafka的数据源中需要一个name字段,name字段的数据只有mysql数据表中有对应的数据,需要其name字段的数据从数据源中获取加入到kafka的datastream中一起进行数据处理
这里使用的是Datastream-API,并没有使用Table-API,也可以使用table-api进行解决
可以使用table-api获取mysql的数据,然后将其转换成Datastream,再跟主Datastream进行数据连接
提示:以下是本篇文章正文内容,下面案例可供参考
一、怎么在flink中获取到mysql的数据作为数据源?
在Flink官网的文档中,发现没有关于mysql作为数据源的介绍,连接mysql的jdbc也只是当作sink使用
二、如何在Flink开发中以mysql作为DataSource呢?
1.使用Flink的JDBC连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.0</version>
</dependency>
- 仔细查看Flink-JDBC的连接器中,发现有一个JdbcInputFormat的api
- 可以使用这个api连接mysql数据库,并且还可以只取当中你需要的个别字段作为一个数据源使用
代码如下:
val columnTypes: Array[TypeInformation[_]] = Array(Types.LONG, Types.STRING)
val columnNames = Array("user_id", "user_name")
val customer_info = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/ds_db01?charterEncoding=utf-8&useSSL=false")
.setUsername("root")
.setPassword("123456")
.setQuery("select user_id,user_name from user_info")
.setRowTypeInfo(new RowTypeInfo(columnTypes, columnNames))
.finish()
val user_map = streamEnv.createInput(customer_info)
.map(data => {
val user_id = data.getField(0).asInstanceOf[Long]
val user_name = data.getField(1).asInstanceOf[String]
(user_id, user_name)
})
.executeAndCollect()
.toList
.map(user => user.user_id -> user.user_name)
.toMap
此代码是将从数据中获取到的数据,将其存到内存中,转成map,然后在主流当中获取这个map数据
2.使用RichMapFunction函数
- Flink提供一个可以自定义的richmapfunction函数,通过自定义这个函数,从mysql中获取数据,预加载维表,最后跟主表进行关联。
代码如下(示例):
class myMapFunction extends RichMapFunction[String, String] {
private val url = "jdbc:mysql://localhost:3306/ds_db01?characterEncoding=UTF-8&useSSL=false"
private val driver = "com.mysql.jdbc.Driver"
private val user_name = "root"
private val user_password = "123456"
private var conn: Connection = Connection
private var stmt:PreparedStatement = PreparedStatement
private val cache = new util.HashMap[Long, String]()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
Class.forName(driver)
conn = DriverManager.getConnection(url, user_name, user_password)
val sql: String = "select user_id,user_name from user_info"
stmt = conn.prepareStatement(sql)
val resultSet = stmt.executeQuery
while (resultSet.next) {
val user_id = resultSet.getLong("user_id")
val user_name = resultSet.getString("user_name")
cache.put(user_id, user_name)
}
conn.close()
}
override def map(value: String): (Long,String) = {
val jSONObject = JSON.parseObject(value)
val user_id = jSONObject.getLong("user_id")
val user_name = cache.get(user_id)
(user_id,user_name)
}
}
- 在主流里面直接在map里面new一个myMapFunction对象,就可以获取到对应的mysql数据库中的数据了
总结
以上就是这次的内容,写的不好的,有问题的,不懂的私信交流沟通。