您好,欢迎来到刀刀网。
搜索
您的当前位置:首页【在Flink开发中,读取外部数据源MySQL数据的方法(scala编写)】

【在Flink开发中,读取外部数据源MySQL数据的方法(scala编写)】

来源:刀刀网


前言

Flink实时数据开发中,遇到一个问题需求,主体数据来源是来自kafka的数据,但是实际的需求中,会需要别的数据库中的数据。

  • 例如:在kafka的数据源中需要一个name字段,name字段的数据只有mysql数据表中有对应的数据,需要其name字段的数据从数据源中获取加入到kafka的datastream中一起进行数据处理

这里使用的是Datastream-API,并没有使用Table-API,也可以使用table-api进行解决
可以使用table-api获取mysql的数据,然后将其转换成Datastream,再跟主Datastream进行数据连接


提示:以下是本篇文章正文内容,下面案例可供参考

一、怎么在flink中获取到mysql的数据作为数据源?

在Flink官网的文档中,发现没有关于mysql作为数据源的介绍,连接mysql的jdbc也只是当作sink使用

二、如何在Flink开发中以mysql作为DataSource呢?

1.使用Flink的JDBC连接器

  • Maven导入Flink-JDBC连接器的依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
  • 仔细查看Flink-JDBC的连接器中,发现有一个JdbcInputFormat的api
  • 可以使用这个api连接mysql数据库,并且还可以只取当中你需要的个别字段作为一个数据源使用

代码如下:

// 定义列的类型,列的类型是数组类型,数组里面存放TypeInformation类型的数据
val columnTypes: Array[TypeInformation[_]] = Array(Types.LONG, Types.STRING)

// 定义列的名称,数组里面存放的是String类型
val columnNames = Array("user_id", "user_name")

// 连接mysql数据库,获取数据
val customer_info = JdbcInputFormat.buildJdbcInputFormat()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://localhost:3306/ds_db01?charterEncoding=utf-8&useSSL=false")
      .setUsername("root")
      .setPassword("123456")
      .setQuery("select user_id,user_name from user_info") //定义需要查询的sql语句
      .setRowTypeInfo(new RowTypeInfo(columnTypes, columnNames)) //这个需要一个rowtypeinfo对象
      .finish()

val user_map = streamEnv.createInput(customer_info)
      .map(data => {
        val user_id = data.getField(0).asInstanceOf[Long]
        val user_name = data.getField(1).asInstanceOf[String]
        (user_id, user_name)
      })
      .executeAndCollect()
      .toList
      .map(user => user.user_id -> user.user_name)
      .toMap

此代码是将从数据中获取到的数据,将其存到内存中,转成map,然后在主流当中获取这个map数据

2.使用RichMapFunction函数

  • Flink提供一个可以自定义的richmapfunction函数,通过自定义这个函数,从mysql中获取数据,预加载维表,最后跟主表进行关联。

代码如下(示例):

  class myMapFunction extends RichMapFunction[String, String] {
    // 定义mysql数据库的url、driver、账号、密码
    private val url = "jdbc:mysql://localhost:3306/ds_db01?characterEncoding=UTF-8&useSSL=false"
    private val driver = "com.mysql.jdbc.Driver"
    private val user_name = "root"
    private val user_password = "123456"

    private var conn: Connection = Connection
    private var stmt:PreparedStatement = PreparedStatement

    private val cache = new util.HashMap[Long, String]()

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)

      Class.forName(driver) // 加载驱动程序

      conn = DriverManager.getConnection(url, user_name, user_password) // 连接数据库

      // 执行SQL语句
      val sql: String = "select user_id,user_name from user_info"
      stmt = conn.prepareStatement(sql)

      // 执行查询
      val resultSet = stmt.executeQuery

      //全量更新维度数据到内存
      while (resultSet.next) {
        val user_id = resultSet.getLong("user_id")
        val user_name = resultSet.getString("user_name")
        cache.put(user_id, user_name)
      }

      // 关闭连接
      conn.close()
    }

    override def map(value: String): (Long,String) = {
      val jSONObject = JSON.parseObject(value)
      val user_id = jSONObject.getLong("user_id")
      val user_name = cache.get(user_id)
      (user_id,user_name)
    }
  }
  • 在主流里面直接在map里面new一个myMapFunction对象,就可以获取到对应的mysql数据库中的数据了

总结

以上就是这次的内容,写的不好的,有问题的,不懂的私信交流沟通。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- gamedaodao.com 版权所有 湘ICP备2022005869号-6

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务