《多易教育|码龙 Flink教程》

配套文档part1

1 快速认识flink

Apache Flink是一个分布式实时流式计算引擎(框架);用于无界流和有界数据流(批)的有状态计算。

Flink被设计成可以在所有常见的集群环境中运行 , 基于内存的分布式计算引擎 ,适用于任何数据规模的(实时)计算场景。

流式计算的特点:由于数据是实时持续流入没有终结,只能每逢数据到达就进行计算,并输出”当时”的计算结果(因而计算结果也不会是一次性的最终结果,而是源源不断的无界的结果流);

2 Flink编程基础

2.1 添加依赖

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.16.1</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    
      <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--flink本地运行时 提供的web功能-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-compress</artifactId>
                <version>1.21</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- flinksql本地运行需要的依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--状态后端管理器-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.26</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-parquet</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-avro</artifactId>
                <version>1.11.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-avro</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-files</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hbase-2.2</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-connector-mysql-cdc</artifactId>
                <version>2.3.0</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.30</version>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>4.3.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hive_2.12</artifactId>
                <version>1.16.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>3.1.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>hadoop-hdfs</artifactId>
                        <groupId>org.apache.hadoop</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--hadoop -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.1.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.1.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>3.1.0</version>
            </dependency>
    
            <!-- 打印日志的jar包 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.30</version>
            </dependency>
    
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.16</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-sql-gateway-api</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>

    2.2 flink程序开发基本模板

    无论简单与复杂,flink程序都由如下几个部分组成

    • 获取一个编程、执行入口环境env
    • 通过数据源组件,加载、创建datastream
    • 对datastream调用各种处理算子表达计算逻辑
    • 通过sink算子指定计算结果的输出方式
    • 在env.execute()上触发程序提交运行

    2.3 入门示例程序

    加载网络数据流 , 累计统计单词出现的次数

    package com.doit.day01;
    
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @Date: 2023/6/23
     * @Tips: 学大数据 ,到多易教育
     * @DOC: https://www.malong.top
     * @Description:
     * flink可以流式处理数据
     *    源源不断地接收网络流数据  , 统计数据中每个单词出现的次数
     *    实时打印结果在控制台
     */
    public class StreamWordCount {
        public static void main(String[] args) throws Exception {
            // 1 获取flink的编程环境
            //  可以做到  流批一体的处理方案
            StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
            // 2 获取网络数据流    DataStream  (Source)
            // 使用 socket网络流 客户端发送数据   接收数据   在linux上使用  nc -lk 端口号
            // socketTextStream 可以接收指定机器 指定端口发过来的数据
            // 源源不断地接收到数据  一行一行
            DataStreamSource<String> ds = see.socketTextStream("doitedu01", 8899);
            // 3 处理数据   统计单词出现的次数   (Transformation)
            // 3.1  将数据组装成 单词,1  二元组  Tuple2
            SingleOutputStreamOperator<Tuple2<String, Integer>> wrodOne = ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                // 参数1   一行数据  参数2  收集结果
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] words = line.split("\\s+");
                    for (String word : words) {
                        // 以后 在使用某个对象的时候 先不new 优先调用一下有没有静态方法
                        Tuple2<String, Integer> tp = Tuple2.of(word, 1);
                        out.collect(tp);
                    }
                } // 泛型1   输入的数据类型  泛型2  输出的结果
            });
            // 3.2 按照单词分组
            KeyedStream<Tuple2<String, Integer>, String> keyed = wrodOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            });
            // 3.3 统计单词个数
            SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyed.sum("f1");
            // 4 将结果打印在控制台    (sink)
            res.print() ;
            // 5 出发程序执行
            see.execute("流式单词统计") ;
    
        }
    }

    3 flink常用source算子

      3.1 source算子概述

      source是用来获取外部数据的算子,按照获取数据的方式,可以分为:

      • 基于集合的Source
      • 基于Socket网络端口的Source
      • 基于文件的Source
      • 第三方Connector Source
      • 自定义Source五种。

      从并行度的角度,source又可以分为非并行的source和并行的source。

      • 非并行source:并行度只能为1,即只有一个运行时实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如Socket Source;
      • 并行Source:并行度可以是1到多个,在计算资源足够的前提下,并行度越大,效率越高。例如Kafka Source;

      3.2 基于socket的单并行Source(单并行)

      非并行的Source,通过socket通信来获取数据得到数据流;

      该方法还有多个重载的方法,如:

      socketTextStream(String hostname, int port, String delimiter, long maxRetry)

      可以指定行分隔符和最大重新连接次数。

      package com.doit.day02;
      
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      /**
       * @Date: 2023/6/24
       * @Tips: 学大数据 ,到多易教育|码龙
       * @DOC: https://www.malong.top
       * @Description:
       *        加载数据流(source)
       */
      public class E03Source_Socket {
          public static void main(String[] args) throws Exception {
              // 获取编程环境
              Configuration conf = new Configuration();
              conf.setInteger("rest.port" , 8888);
              StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
              /**
               * 获取数据流 : 获取数据流以后  一定要处理
               *   1 网络流  socket  测试使用
               *        -  单并行的数据流  但并行source
               */
              DataStreamSource<String> ds1 = see.socketTextStream("doitedu01", 8899);
              int parallelism = ds1.getParallelism();
              System.out.println("网络数据流的并行度是 :  "+  parallelism);
              // 多并行输出数据到控制台 可以通过 方法设置并行度
              ds1.print().setParallelism(12) ; // 每获取一条数据 打印一次
              see.execute("source") ;
          }
      }

      提示:socketSource是一个非并行source,如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。

      3.3 基于元素或集合的Source

      fromElements(单并行)

      非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。

      // 获取编程环境
      Configuration conf = new Configuration();
      conf.setInteger("rest.port" , 8888);
      StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
      /**
       * 加载元素  测试
       *   单并行
       */
      DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3, 4, 5, 6);
      DataStreamSource<String> ds2 = env.fromElements("java" , "scala" , "sql" , "hive");
      DataStreamSource<LogBean> ds3 = env.fromElements(
              LogBean.of(1L, "s001", "e001", 10000L),
              LogBean.of(2L, "s002", "e001", 20000L),
              LogBean.of(3L, "s003", "e001", 30000L)
      );

      fromCollection(单并行)

      非并行的Source,可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource。

      /**
       * 集合转换成   DataStream
       *  单并行的  不能修改并行度
       */
      List<Integer> numbers = Arrays.asList(1, 2, 3, 4);
      List<LogBean> logs = Arrays.asList( LogBean.of(1L, "s001", "e001", 10000L),
              LogBean.of(2L, "s002", "e001", 20000L),
              LogBean.of(3L, "s003", "e001", 30000L));
      DataStreamSource<Integer> ds4 = see.fromCollection(numbers);
      DataStreamSource<LogBean> ds5 = see.fromCollection(logs);

      fromParallelCollection(多并行)

      fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。

      /**
       * 集合转换成   并行的  DataStream
       *   1   fromParallelCollection
       *   2   fromSequence
       *   默认所有的可用资源
       */
      DataStreamSource<Long> ds6 = see.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.class);
      
      DataStreamSource<Long> ds7 = see.fromSequence(1, 8);
      System.out.println(ds7.getParallelism());
      ds7.print() ;

      并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;

      非并行的source后续的不能使用 setParallelism(N)修改并行度

      3.4 基于文件的Source

      public static void main(String[] args) throws Exception {
          Configuration conf = new Configuration();
          conf.setInteger("rest.port", 8888);
          StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
          /**
           * 读取文件
           */
         /* DataStreamSource<String> ds = see.readTextFile("data/logs/");
          System.out.println(ds.getParallelism());
          ds.print() ;*/
      
          Path path = new Path("data/logs");
          TextLineInputFormat inputFormat = new TextLineInputFormat();
          FileSource<String> fileSource = FileSource.forRecordStreamFormat(inputFormat, path).build();
          /**
           * 参数一   数据源对象
           * 参数二  水位线  处理事件时间数据时出现乱序数据后的时间推进标准
           * 参数三  名字
           */
          DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source");
          System.out.println(ds.getParallelism());
          ds.print() ;
          see.execute("file  source") ;
      }

      3.5 基于Kafka的Source

      在实际生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如Apache Kafka。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。

      添加依赖 : 在项目搭建的时候已经添加过 ,不需要再次添加

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
          <version>${flink.version}</version>
      </dependency>

      示例代码:

      public static void main(String[] args) throws Exception {
          Configuration conf = new Configuration();
          conf.setInteger("rest.port", 8888);
          StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
      
          KafkaSource<String> source = KafkaSource
                  .<String>builder()
                  .setBootstrapServers("doitedu01:9092 , doitedu02:9092,doitedu03:9092")
                  .setTopics("log-01")
                  .setGroupId("g1")
                  .setStartingOffsets(OffsetsInitializer.earliest())
                  .setValueOnlyDeserializer(new SimpleStringSchema())
                  .build();
          DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");
          ds.print();
          see.execute("source 测试");
      }

      3.6 自定义Source

      Flink的DataStream API可以让开发者根据实际需要,灵活的自定义Source,本质上就是定义一个类,实现SourceFunction或继承RichParallelSourceFunction,实现run方法和cancel方法。

      可以实现 SourceFunction 或者 RichSourceFunction , 这两者都是非并行的source算子

      也可继承 ParallelSourceFunction 或者 RichParallelSourceFunction , 这两者都是可并行的source算子

      带 Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法

      带 Parallel的,都可多实例并行执行source

      /**
       * 单并行
       */
      static class MySource implements SourceFunction<LogBean> {
          boolean flag = true;
      
          @Override
          public void run(SourceContext<LogBean> ctx) throws Exception {
              while (flag) {
                  ctx.collect(LogBean.of("1001", "seesion01", "click", "ts01"));
                  Thread.sleep(1000);
              }
      
          }
      
          @Override
          public void cancel() {
              flag = false;
          }
      }
      
      /**
       * 多并行
       * 且有生命周期方法
       */
      static class MySourceParallel extends RichParallelSourceFunction<LogBean> {
          @Override
          public void open(Configuration parameters) throws Exception {
              super.open(parameters);
          }
      
          @Override
          public void close() throws Exception {
              super.close();
          }
      
          @Override
          public void run(SourceContext<LogBean> ctx) throws Exception {
      
          }
      
          @Override
          public void cancel() {
      
          }
      }

      3.7 cdc source

      cdc : capture data change (变更数据捕获)

      3.7.1 mysq-cdc-connector 基本原理

      cdc连接器,是通过伪装成一个mysql的slave节点,来获取master上的数据操作日志:binlog;

      从而实现实时持续获取mysql中的数据变更:增删改查;

      当然,mysql的数据操作日志中,是包含了操作的目标数据行内容及其他元数据信息的;

      暂时无法在飞书文档外展示此内容

      3.7.2 api使用示例