用于处理有状态的流式计算,需要对Source端的数据进行加工处理,然后写入到Sink端,下图展示了在Flink中数据所经历的过程,今天就根据这张图分别给大家分享下。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.2</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.3</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.7</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> transactions = env
.addSource(new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), properties));
transactions.print();
env.execute();
Flink所有的程序都从这一步开始,只有创建了执行环境,才能开始下一步的编写。可以使用如下方式获取运行环境:
创建一个执行环境,表示当前执行程序的上下文
如果程序是调用的,则此方法返回本地执行环境
如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
会根据查询运行的方式决定返回什么样的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Source即Flink中的数据源,Sink则为数据输出端,Flink通过Flink Streaming Connector来与外部存储系统连接,Flink主要通过四种方式完成数据交换:
Flink预定义的Source与Sink
Flink内部提供的Boundled Connectors
第三方Apache Bahir项目中的连接器
异步IO方式
下面主要对预定义内容及Boundled Connectors作为介绍,更多内容可以参考
先来看一下Flink给我们提供的内置Source,这些方法都位于StreamExecutionEnvironment类中。
Flink中内置的Sink如下图,均位于DataStream类中。
env.readTextFile(path)
将结果从文本或 csv 格式写出到文件中
dataStream.writeAsText(path) ;
dataStream.writeAsCsv(path);
在官网中,给出了如下的Connectors:
(source/sink)
(sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
FileSystem (sink)
RabbitMQ (source/sink)
Google PubSub (source/sink)
Hybrid Source (source)
Apache NiFi (source/sink)
Apache Pulsar (source)
Twitter Streaming API (source)
在使用过程中,提交 Job 的时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常
除了上述的Source与Sink外,Flink还支持自定义Source与Sink。
实现SourceFunction类
重写run方法和cancel方法
在主函数中通过addSource调用
public class MySource implements SourceFunction<String> {
// 定义一个运行标志位,表示数据源是否运行
Boolean flag = true;
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (flag){
sourceContext.collect("当前时间为:" + System.currentTimeMillis());
Thread.sleep(100);
}
}
@Override
public void cancel() {
flag = false;
}
}
继承SinkFunction
重写invoke方法
下面给出了自定义JDBC Sink的案例,可以参考
public class MyJdbcSink extends RichSinkFunction<String> {
// 定义连接
Connection conn;
// 创建连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root");
}
// 关闭连接
@Override
public void close() throws Exception {
super.close();
conn.close();
}
// 调用连接执行SQL
@Override
public void invoke(String value, Context context) throws Exception {
PreparedStatement preparedStatement = conn.prepareStatement(value);
preparedStatement.execute();
preparedStatement.close();
}
}
env.addSink(newMyJdbcSink());
rongHeStream.addSink(JdbcSink.sink(
"INSERT INTO ronghe_log VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(preparedStatement, rongHeLog) -> {
preparedStatement.setObject(1, rongHeLog.getId());
preparedStatement.setObject(2, rongHeLog.getDeviceNum());
preparedStatement.setObject(3, rongHeLog.getSrcIp());
preparedStatement.setObject(4, rongHeLog.getSrcPort());
preparedStatement.setObject(5, rongHeLog.getDstIp());
preparedStatement.setObject(6, rongHeLog.getDstPort());
preparedStatement.setObject(7, rongHeLog.getProtocol());
preparedStatement.setObject(8, new Timestamp(rongHeLog.getLastOccurTime()));
//SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//Date date = new Date(rongHeLog.getLastOccurTime());
//String dateStr = simpleDateFormat.format(date);
preparedStatement.setObject(9, rongHeLog.getCount());
try {
String idListJson = objectMapper.writeValueAsString(rongHeLog.getSourceLogIds());
preparedStatement.setObject(10, idListJson);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://81.70.199.213:3306/flink21?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false")
.withUsername("root")
.withPassword("lJPWRbm06NbToDL03Ecj")
.build()
));
//2. 时间窗口 + 滚动窗口
KeyedStream<SwitchPacket, String> keyedStream = watermarksStream.keyBy(SwitchPacket::getKey);
//真实公司中大致1分钟左右一聚合
WindowedStream<SwitchPacket, String, TimeWindow> timeWindowedStream = keyedStream.timeWindow(Time.seconds(10));
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//configuration.setString(RestOptions.BIND_PORT, "8081");
configuration.setInteger("rest.port", 8082);
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> streamSource = environment.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
String str = "" + System.currentTimeMillis();
sourceContext.collect(str);
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
}
});
streamSource.print();
environment.execute();
}
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//0. 设置eventtime语义
//1. 添加waterwark
SingleOutputStreamOperator<SwitchPacket> watermarksStream = processed.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SwitchPacket>(Time.seconds(0)) {
@Override
public long extractTimestamp(SwitchPacket element) {
return element.getCreateTime();
}
});
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- gamedaodao.com 版权所有 湘ICP备2022005869号-6
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务