《多易教育|码龙 Flink教程》
配套文档part1
1 快速认识flink
Apache Flink是一个分布式实时流式计算引擎(框架);用于无界流和有界数据流(批)的有状态计算。
Flink被设计成可以在所有常见的集群环境中运行 , 基于内存的分布式计算引擎 ,适用于任何数据规模的(实时)计算场景。
流式计算的特点:由于数据是实时持续流入没有终结,只能每逢数据到达就进行计算,并输出”当时”的计算结果(因而计算结果也不会是一次性的最终结果,而是源源不断的无界的结果流);
2 Flink编程基础
2.1 添加依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.16.1</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink本地运行时 提供的web功能-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flinksql本地运行需要的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!--状态后端管理器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.0</version>
<exclusions>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<!--hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.0</version>
</dependency>
<!-- 打印日志的jar包 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-gateway-api</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2.2 flink程序开发基本模板
无论简单与复杂,flink程序都由如下几个部分组成
- 获取一个编程、执行入口环境env
- 通过数据源组件,加载、创建datastream
- 对datastream调用各种处理算子表达计算逻辑
- 通过sink算子指定计算结果的输出方式
- 在env.execute()上触发程序提交运行
2.3 入门示例程序
加载网络数据流 , 累计统计单词出现的次数
package com.doit.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Date: 2023/6/23
* @Tips: 学大数据 ,到多易教育
* @DOC: https://www.malong.top
* @Description:
* flink可以流式处理数据
* 源源不断地接收网络流数据 , 统计数据中每个单词出现的次数
* 实时打印结果在控制台
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1 获取flink的编程环境
// 可以做到 流批一体的处理方案
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
// 2 获取网络数据流 DataStream (Source)
// 使用 socket网络流 客户端发送数据 接收数据 在linux上使用 nc -lk 端口号
// socketTextStream 可以接收指定机器 指定端口发过来的数据
// 源源不断地接收到数据 一行一行
DataStreamSource<String> ds = see.socketTextStream("doitedu01", 8899);
// 3 处理数据 统计单词出现的次数 (Transformation)
// 3.1 将数据组装成 单词,1 二元组 Tuple2
SingleOutputStreamOperator<Tuple2<String, Integer>> wrodOne = ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
// 参数1 一行数据 参数2 收集结果
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split("\\s+");
for (String word : words) {
// 以后 在使用某个对象的时候 先不new 优先调用一下有没有静态方法
Tuple2<String, Integer> tp = Tuple2.of(word, 1);
out.collect(tp);
}
} // 泛型1 输入的数据类型 泛型2 输出的结果
});
// 3.2 按照单词分组
KeyedStream<Tuple2<String, Integer>, String> keyed = wrodOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
// 3.3 统计单词个数
SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyed.sum("f1");
// 4 将结果打印在控制台 (sink)
res.print() ;
// 5 出发程序执行
see.execute("流式单词统计") ;
}
}
3 flink常用source算子
3.1 source算子概述
source是用来获取外部数据的算子,按照获取数据的方式,可以分为:
- 基于集合的Source
- 基于Socket网络端口的Source
- 基于文件的Source
- 第三方Connector Source
- 自定义Source五种。
从并行度的角度,source又可以分为非并行的source和并行的source。
- 非并行source:并行度只能为1,即只有一个运行时实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如Socket Source;
- 并行Source:并行度可以是1到多个,在计算资源足够的前提下,并行度越大,效率越高。例如Kafka Source;
3.2 基于socket的单并行Source(单并行)
非并行的Source,通过socket通信来获取数据得到数据流;
该方法还有多个重载的方法,如:
socketTextStream(String hostname, int port, String delimiter, long maxRetry)
可以指定行分隔符和最大重新连接次数。
package com.doit.day02;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Date: 2023/6/24
* @Tips: 学大数据 ,到多易教育|码龙
* @DOC: https://www.malong.top
* @Description:
* 加载数据流(source)
*/
public class E03Source_Socket {
public static void main(String[] args) throws Exception {
// 获取编程环境
Configuration conf = new Configuration();
conf.setInteger("rest.port" , 8888);
StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
/**
* 获取数据流 : 获取数据流以后 一定要处理
* 1 网络流 socket 测试使用
* - 单并行的数据流 但并行source
*/
DataStreamSource<String> ds1 = see.socketTextStream("doitedu01", 8899);
int parallelism = ds1.getParallelism();
System.out.println("网络数据流的并行度是 : "+ parallelism);
// 多并行输出数据到控制台 可以通过 方法设置并行度
ds1.print().setParallelism(12) ; // 每获取一条数据 打印一次
see.execute("source") ;
}
}
提示:socketSource是一个非并行source,如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。
3.3 基于元素或集合的Source
fromElements(单并行)
非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。
// 获取编程环境
Configuration conf = new Configuration();
conf.setInteger("rest.port" , 8888);
StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
/**
* 加载元素 测试
* 单并行
*/
DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3, 4, 5, 6);
DataStreamSource<String> ds2 = env.fromElements("java" , "scala" , "sql" , "hive");
DataStreamSource<LogBean> ds3 = env.fromElements(
LogBean.of(1L, "s001", "e001", 10000L),
LogBean.of(2L, "s002", "e001", 20000L),
LogBean.of(3L, "s003", "e001", 30000L)
);
fromCollection(单并行)
非并行的Source,可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource。
/**
* 集合转换成 DataStream
* 单并行的 不能修改并行度
*/
List<Integer> numbers = Arrays.asList(1, 2, 3, 4);
List<LogBean> logs = Arrays.asList( LogBean.of(1L, "s001", "e001", 10000L),
LogBean.of(2L, "s002", "e001", 20000L),
LogBean.of(3L, "s003", "e001", 30000L));
DataStreamSource<Integer> ds4 = see.fromCollection(numbers);
DataStreamSource<LogBean> ds5 = see.fromCollection(logs);
fromParallelCollection(多并行)
fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。
/**
* 集合转换成 并行的 DataStream
* 1 fromParallelCollection
* 2 fromSequence
* 默认所有的可用资源
*/
DataStreamSource<Long> ds6 = see.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.class);
DataStreamSource<Long> ds7 = see.fromSequence(1, 8);
System.out.println(ds7.getParallelism());
ds7.print() ;
并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;
非并行的source后续的不能使用 setParallelism(N)修改并行度
3.4 基于文件的Source
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8888);
StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
/**
* 读取文件
*/
/* DataStreamSource<String> ds = see.readTextFile("data/logs/");
System.out.println(ds.getParallelism());
ds.print() ;*/
Path path = new Path("data/logs");
TextLineInputFormat inputFormat = new TextLineInputFormat();
FileSource<String> fileSource = FileSource.forRecordStreamFormat(inputFormat, path).build();
/**
* 参数一 数据源对象
* 参数二 水位线 处理事件时间数据时出现乱序数据后的时间推进标准
* 参数三 名字
*/
DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source");
System.out.println(ds.getParallelism());
ds.print() ;
see.execute("file source") ;
}
3.5 基于Kafka的Source
在实际生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如Apache Kafka。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。
添加依赖 : 在项目搭建的时候已经添加过 ,不需要再次添加
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
示例代码:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8888);
StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers("doitedu01:9092 , doitedu02:9092,doitedu03:9092")
.setTopics("log-01")
.setGroupId("g1")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");
ds.print();
see.execute("source 测试");
}
3.6 自定义Source
Flink的DataStream API可以让开发者根据实际需要,灵活的自定义Source,本质上就是定义一个类,实现SourceFunction或继承RichParallelSourceFunction,实现run方法和cancel方法。
可以实现 SourceFunction 或者 RichSourceFunction , 这两者都是非并行的source算子
也可继承 ParallelSourceFunction 或者 RichParallelSourceFunction , 这两者都是可并行的source算子
带 Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法
带 Parallel的,都可多实例并行执行source
/**
* 单并行
*/
static class MySource implements SourceFunction<LogBean> {
boolean flag = true;
@Override
public void run(SourceContext<LogBean> ctx) throws Exception {
while (flag) {
ctx.collect(LogBean.of("1001", "seesion01", "click", "ts01"));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
/**
* 多并行
* 且有生命周期方法
*/
static class MySourceParallel extends RichParallelSourceFunction<LogBean> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void run(SourceContext<LogBean> ctx) throws Exception {
}
@Override
public void cancel() {
}
}
3.7 cdc source
cdc : capture data change (变更数据捕获)
3.7.1 mysq-cdc-connector 基本原理
cdc连接器,是通过伪装成一个mysql的slave节点,来获取master上的数据操作日志:binlog;
从而实现实时持续获取mysql中的数据变更:增删改查;
当然,mysql的数据操作日志中,是包含了操作的目标数据行内容及其他元数据信息的;
暂时无法在飞书文档外展示此内容
3.7.2 api使用示例