Flink学习-第二天-Flink函数用法-1

Map用法

介绍

Applies a Map transformation on a DataStream. 
The transformation calls a MapFunction for each element of the DataStream. Each MapFunction call returns exactly one element. 
The user can also extend RichMapFunction to gain access to other features provided by the org.apache.flink.api.common.functions.RichFunction interface.
对DataStream应用映射转换。
该转换会对DataStream中的每个元素调用MapFunction,每次MapFunction调用都会精确返回一个元素。
用户还可以扩展RichMapFunction,以获取org.apache.flink.api.common.functions.RichFunction接口提供的其他功能特性。

使用MapFunction

//map,将string转换成string[]
SingleOutputStreamOperator<String[]> mapFunction = streamSource.map(item -> item.split(" "));

使用RichMapFunction

Rich variant of the MapFunction. As a RichFunction, it gives access to the RuntimeContext and provides setup and teardown methods: RichFunction.open(OpenContext) and RichFunction.close().
Type parameters:
<IN> – Type of the input elements.
<OUT> – Type of the returned elements.
MapFunction的富函数版本。作为RichFunction,它可以访问RuntimeContext,并提供了初始化与终止方法:RichFunction.open(OpenContext) 和 RichFunction.close()。
类型参数:
<IN> - 输入元素的类型
<OUT> - 返回元素的类型
SingleOutputStreamOperator<String[]> mapFunction = streamSource.map(item -> item.split(" "));
        mapFunction.map(new RichMapFunction<String[], String[]>() {

            private transient IntCounter wordCount;

            @Override
            public void open(OpenContext openContext) throws Exception {
                System.out.println("open");
                super.open(openContext);
                //实际上keyBy才有意义
                this.wordCount = getRuntimeContext().getIntCounter("wordCount");
            }


            @Override
            public void close() throws Exception {
                super.close();
            }

            @Override
            public String[] map(String[] value) throws Exception {
                wordCount.add(1);
                System.out.println(wordCount.getLocalValue());
                return value;
            }
        });

FlatMap用法

Applies a FlatMap transformation on a DataStream. The transformation calls a FlatMapFunction for each element of the DataStream. Each FlatMapFunction call can return any number of elements including none. The user can also extend RichFlatMapFunction to gain access to other features provided by the org.apache.flink.api.common.functions.RichFunction interface.
Params:
flatMapper – The FlatMapFunction that is called for each element of the DataStream
Type parameters:
<R> – output type
Returns:
The transformed DataStream.
对DataStream应用扁平映射(FlatMap)转换。
该转换会为DataStream中的每个元素调用FlatMapFunction,每次FlatMapFunction调用可以返回任意数量的元素(包括零个元素)。
用户还可以扩展RichFlatMapFunction,以获取org.apache.flink.api.common.functions.RichFunction接口提供的其他功能特性。

参数:
flatMapper – 用于处理DataStream中每个元素的FlatMapFunction

类型参数:
<R> – 输出类型
mapFunction.flatMap(new FlatMapFunction<String[], String>() {
            @Override
            public void flatMap(String[] value, Collector<String> out) throws Exception {
                for (String s : value) {
                    out.collect(s);
                }
            }
        });

解决上一篇文章单词重复输出问题

package com.lhstack.normal;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.util.Collector;

import java.util.stream.Stream;

public class GlobalUniqueWordCounter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        String text = "Happiness isn’t just a gift, but a skill that can be learned and cultivated. People always rely on shortcuts like watching television, eating, and shopping to feel happy. But external satisfaction may lead to a sense of emptiness and even depression. Authentic happiness comes from identifying and cultivating people’s strengths like kindness, humor, and generosity";
        String[] arrays = text.split(",");
        DataStreamSource<String> streamSource = executionEnvironment.fromData(arrays);
        streamSource
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String value, Collector<String> out) throws Exception {
                        String[] strings = value.split(" ");
                        Stream.of(strings).filter(StringUtils::isNotBlank).map(String::trim).forEach(out::collect);
                    }
                })
                .map(item -> Tuple2.of(item,1))
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                })
                .keyBy(item -> item.f0)
                //创建一个流结束才触发的窗口,保证数据完全读取完毕,才写入到sink
                .window(GlobalWindows.createWithEndOfStreamTrigger())
                .sum("f1")
                .printToErr().setParallelism(1).name("统计单词数量");
        executionEnvironment.execute();
    }
}

image-ywni.png


Flink学习-第二天-Flink函数用法-1
https://blog.lhstack.xyz/archives/flink-two-day
作者
lhstack
发布于
2025年08月22日
许可协议