Flink学习-第二天-Flink函数用法-1
Map用法
介绍
Applies a Map transformation on a DataStream.
The transformation calls a MapFunction for each element of the DataStream. Each MapFunction call returns exactly one element.
The user can also extend RichMapFunction to gain access to other features provided by the org.apache.flink.api.common.functions.RichFunction interface.
对DataStream应用映射转换。
该转换会对DataStream中的每个元素调用MapFunction,每次MapFunction调用都会精确返回一个元素。
用户还可以扩展RichMapFunction,以获取org.apache.flink.api.common.functions.RichFunction接口提供的其他功能特性。
使用MapFunction
//map,将string转换成string[]
SingleOutputStreamOperator<String[]> mapFunction = streamSource.map(item -> item.split(" "));
使用RichMapFunction
Rich variant of the MapFunction. As a RichFunction, it gives access to the RuntimeContext and provides setup and teardown methods: RichFunction.open(OpenContext) and RichFunction.close().
Type parameters:
<IN> – Type of the input elements.
<OUT> – Type of the returned elements.
MapFunction的富函数版本。作为RichFunction,它可以访问RuntimeContext,并提供了初始化与终止方法:RichFunction.open(OpenContext) 和 RichFunction.close()。
类型参数:
<IN> - 输入元素的类型
<OUT> - 返回元素的类型
SingleOutputStreamOperator<String[]> mapFunction = streamSource.map(item -> item.split(" "));
mapFunction.map(new RichMapFunction<String[], String[]>() {
private transient IntCounter wordCount;
@Override
public void open(OpenContext openContext) throws Exception {
System.out.println("open");
super.open(openContext);
//实际上keyBy才有意义
this.wordCount = getRuntimeContext().getIntCounter("wordCount");
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public String[] map(String[] value) throws Exception {
wordCount.add(1);
System.out.println(wordCount.getLocalValue());
return value;
}
});
FlatMap用法
Applies a FlatMap transformation on a DataStream. The transformation calls a FlatMapFunction for each element of the DataStream. Each FlatMapFunction call can return any number of elements including none. The user can also extend RichFlatMapFunction to gain access to other features provided by the org.apache.flink.api.common.functions.RichFunction interface.
Params:
flatMapper – The FlatMapFunction that is called for each element of the DataStream
Type parameters:
<R> – output type
Returns:
The transformed DataStream.
对DataStream应用扁平映射(FlatMap)转换。
该转换会为DataStream中的每个元素调用FlatMapFunction,每次FlatMapFunction调用可以返回任意数量的元素(包括零个元素)。
用户还可以扩展RichFlatMapFunction,以获取org.apache.flink.api.common.functions.RichFunction接口提供的其他功能特性。
参数:
flatMapper – 用于处理DataStream中每个元素的FlatMapFunction
类型参数:
<R> – 输出类型
mapFunction.flatMap(new FlatMapFunction<String[], String>() {
@Override
public void flatMap(String[] value, Collector<String> out) throws Exception {
for (String s : value) {
out.collect(s);
}
}
});
解决上一篇文章单词重复输出问题
package com.lhstack.normal;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.util.Collector;
import java.util.stream.Stream;
public class GlobalUniqueWordCounter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
String text = "Happiness isn’t just a gift, but a skill that can be learned and cultivated. People always rely on shortcuts like watching television, eating, and shopping to feel happy. But external satisfaction may lead to a sense of emptiness and even depression. Authentic happiness comes from identifying and cultivating people’s strengths like kindness, humor, and generosity";
String[] arrays = text.split(",");
DataStreamSource<String> streamSource = executionEnvironment.fromData(arrays);
streamSource
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] strings = value.split(" ");
Stream.of(strings).filter(StringUtils::isNotBlank).map(String::trim).forEach(out::collect);
}
})
.map(item -> Tuple2.of(item,1))
.returns(new TypeHint<Tuple2<String, Integer>>() {
})
.keyBy(item -> item.f0)
//创建一个流结束才触发的窗口,保证数据完全读取完毕,才写入到sink
.window(GlobalWindows.createWithEndOfStreamTrigger())
.sum("f1")
.printToErr().setParallelism(1).name("统计单词数量");
executionEnvironment.execute();
}
}
Flink学习-第二天-Flink函数用法-1
https://blog.lhstack.xyz/archives/flink-two-day