博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink - DataStream
阅读量:7071 次
发布时间:2019-06-28

本文共 20320 字,大约阅读时间需要 67 分钟。

先看例子,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream
> stream = env.addSource(...);stream .keyBy(0) .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) .reduce(new SummingReducer()) .addSink(new SinkFunction
>() {...});env.execute();

看出,和batch最大的不同是,这里是DataStream而不是DataSet;

/** * A DataStream represents a stream of elements of the same type. A DataStream * can be transformed into another DataStream by applying a transformation as * for example: * 
    *
  • {
    @link DataStream#map}, *
  • {
    @link DataStream#filter}, or *
* * @param
The type of the elements in this Stream */public class DataStream
{ protected final StreamExecutionEnvironment environment; protected final StreamTransformation
transformation; /** * Create a new {
@link DataStream} in the given execution environment with * partitioning set to forward by default. * * @param environment The StreamExecutionEnvironment */ public DataStream(StreamExecutionEnvironment environment, StreamTransformation
transformation) { this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null."); this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null."); } //DataStream上的各种操作。。。。。。 //map,reduce,keyby......}

DataStream的核心,即

StreamTransformation<T> transformation; 如何产生data stream

 

StreamTransformation

对于StreamTransformation,表示一个用于create dataStream的operation;

并且不一定需要对应于一个实际的物理operation,可能只是个逻辑概念,比如下面的例子

/** * A {
@code StreamTransformation} represents the operation that creates a * {
@link org.apache.flink.streaming.api.datastream.DataStream}. Every * {
@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying * {
@code StreamTransformation} that is the origin of said DataStream. * *

* API operations such as {

@link org.apache.flink.streaming.api.datastream.DataStream#map} create * a tree of {
@code StreamTransformation}s underneath. When the stream program is to be executed this * graph is translated to a {
@link StreamGraph} using * {
@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}. * *

* A {

@code StreamTransformation} does not necessarily correspond to a physical operation * at runtime. Some operations are only logical concepts. Examples of this are union, * split/select data stream, partitioning. * *

* The following graph of {

@code StreamTransformations}: * *

{
@code * Source Source * + + * | | * v v * Rebalance HashPartition * + + * | | * | | * +------>Union<------+ * + * | * v * Split * + * | * v * Select * + * v * Map * + * | * v * Sink * }
* * Would result in this graph of operations at runtime: * *
{
@code * Source Source * + + * | | * | | * +------->Map<-------+ * + * | * v * Sink * }
* * The information about partitioning, union, split/select end up being encoded in the edges * that connect the sources to the map operation. * * @param
The type of the elements that result from this {
@code StreamTransformation} */public abstract class StreamTransformation

对于StreamTransformation只定义了output,即该transform产生的result stream

这是抽象类无法直接用,transform产生stream的逻辑还是要封装在具体的operator中

通过下面的例子体会一下,transform和operator的区别,这里设计的有点绕

 

OneInputTransformation,在StreamTransformation基础上加上input

/** * This Transformation represents the application of a * {
@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input * {
@link org.apache.flink.streaming.api.transformations.StreamTransformation}. * * @param
The type of the elements in the nput {
@code StreamTransformation} * @param
The type of the elements that result from this {
@code OneInputTransformation} */public class OneInputTransformation
extends StreamTransformation
{ private final StreamTransformation
input; private final OneInputStreamOperator
operator; private KeySelector
stateKeySelector; private TypeInformation
stateKeyType;}

所以包含,

产生input stream的StreamTransformation<IN> input
以及通过input产生output的OneInputStreamOperator<IN, OUT> operator

同时也可以看下,

public class TwoInputTransformation
extends StreamTransformation
{ private final StreamTransformation
input1; private final StreamTransformation
input2; private final TwoInputStreamOperator
operator;}

 

在看下SourceTransformation和SinkTransformation的对比,

public class SourceTransformation
extends StreamTransformation
{ private final StreamSource
operator;}public class SinkTransformation
extends StreamTransformation
{ private final StreamTransformation
input; private final StreamSink
operator;}

比较容易理解transform的作用,

对于source,没有input,所以没有代表input的transformation
而对于sink,有input,但是sink的operator不是普通的streamOperator,是StreamSink,即流的终点

 

transform

这个函数的意思,用用户自定义的operator,将当前的Stream,转化为用户指定类型的Stream

/** * Method for passing user defined operators along with the type * information that will transform the DataStream. * * @param operatorName *            name of the operator, for logging purposes * @param outTypeInfo *            the output type of the operator * @param operator *            the object containing the transformation logic * @param 
* type of the return stream * @return the data stream constructed */public
SingleOutputStreamOperator
transform(String operatorName, TypeInformation
outTypeInfo, OneInputStreamOperator
operator) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); OneInputTransformation
resultTransform = new OneInputTransformation
( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator
returnStream = new SingleOutputStreamOperator(environment, resultTransform); getExecutionEnvironment().addOperator(resultTransform); return returnStream;}

所以参数为,

用户定义的: 输出的TypeInformation,以及OneInputStreamOperator

实现是,

创建OneInputTransformation,以this.transformation为input,以传入的operator为OneInputStreamOperator

所以通过resultTransform,就会将当前的stream转换为目的流

然后又封装一个SingleOutputStreamOperator,这是什么?

/** * The SingleOutputStreamOperator represents a user defined transformation * applied on a {
@link DataStream} with one predefined output type. * * @param
The type of the elements in this Stream * @param
Type of the operator. */public class SingleOutputStreamOperator
> extends DataStream
{ protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation
transformation) { super(environment, transformation); }}

说白了,就是封装了一下用户定义的transformation

Flink这块代码的命名有点混乱,Operator,transformation,两个概念容易混

 

上面的例子,里面keyBy(0)

会产生

KeyedStream
对于keyedStream,关键的就是
keySelector和keyType,如何产生key以及key的类型
/** * A {
@code KeyedStream} represents a {
@link DataStream} on which operator state is * partitioned by key using a provided {
@link KeySelector}. Typical operations supported by a * {
@code DataStream} are also possible on a {
@code KeyedStream}, with the exception of * partitioning methods such as shuffle, forward and keyBy. * *

* Reduce-style operations, such as {

@link #reduce}, {
@link #sum} and {
@link #fold} work on elements * that have the same key. * * @param
The type of the elements in the Keyed Stream. * @param
The type of the key in the Keyed Stream. */public class KeyedStream
extends DataStream
{ /** The key selector that can get the key by which the stream if partitioned from the elements */ private final KeySelector
keySelector; /** The type of the key by which the stream is partitioned */ private final TypeInformation
keyType;}

 
看下transform,在调用DataStream.transform的同时,设置keySelector和keyType
// ------------------------------------------------------------------------//  basic transformations// ------------------------------------------------------------------------@Overridepublic 
SingleOutputStreamOperator
transform(String operatorName, TypeInformation
outTypeInfo, OneInputStreamOperator
operator) { SingleOutputStreamOperator
returnStream = super.transform(operatorName, outTypeInfo,operator); // inject the key selector and key type OneInputTransformation
transform = (OneInputTransformation
) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keyType); return returnStream;}

 

KeyedStream很关键的是,作为一个到WindowedStream的过度,

所以提供一组生成Windowed的接口

// ------------------------------------------------------------------------//  Windowing// ------------------------------------------------------------------------/** * Windows this {
@code KeyedStream} into tumbling time windows. * *

* This is a shortcut for either {

@code .window(TumblingTimeWindows.of(size))} or * {
@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic * set using * {
@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)} * * @param size The size of the window. */public WindowedStream
timeWindow(AbstractTime size) { return window(TumblingTimeWindows.of(size));}

 

WindowedStream

例子中

.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))

 

/** * A {
@code WindowedStream} represents a data stream where elements are grouped by * key, and for each key, the stream of elements is split into windows based on a * {
@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission * is triggered based on a {
@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. * *

* The windows are conceptually evaluated for each key individually, meaning windows can trigger at * different points for each key. * *

* If an {

@link Evictor} is specified it will be used to evict elements from the window after * evaluation was triggered by the {
@code Trigger} but before the actual evaluation of the window. * When using an evictor window performance will degrade significantly, since * pre-aggregation of window results cannot be used. * *

* Note that the {

@code WindowedStream} is purely and API construct, during runtime * the {
@code WindowedStream} will be collapsed together with the * {
@code KeyedStream} and the operation over the window into one single operation. * * @param
The type of elements in the stream. * @param
The type of the key by which elements are grouped. * @param
The type of {
@code Window} that the {
@code WindowAssigner} assigns the elements to. */public class WindowedStream
{ /** The keyed data stream that is windowed by this stream */ private final KeyedStream
input; /** The window assigner */ private final WindowAssigner
windowAssigner; /** The trigger that is used for window evaluation/emission. */ private Trigger
trigger; /** The evictor that is used for evicting elements before window evaluation. */ private Evictor
evictor;

可以看到WindowedStream没有直接继承自DataStream

而是以,KeyedStream作为他的input

当然window所必需的,WindowAssigner,Trigger和Evictor,也是不会少

 

继续例子, .reduce(new SummingReducer())

看看windowedStream的操作,reduce

/** * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted * as a regular non-windowed stream. * 

* This window will try and pre-aggregate data as much as the window policies permit. For example, * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval, * so a few elements are stored per key (one per slide interval). * Custom windows may not be able to pre-aggregate, or may need to store extra values in an * aggregation tree. * * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */public SingleOutputStreamOperator

reduce(ReduceFunction
function) { //clean the closure function = input.getExecutionEnvironment().clean(function); String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; KeySelector
keySel = input.getKeySelector(); OneInputStreamOperator
operator; boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; if (evictor != null) { operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), new HeapWindowBuffer.Factory
(), new ReduceWindowFunction
(function), trigger, evictor).enableSetProcessingTime(setProcessingTime); } else { operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), new PreAggregatingHeapWindowBuffer.Factory<>(function), //PreAggre,即不会cache真实的element,而是直接存聚合过的值,这样比较节省空间 new ReduceWindowFunction
(function), trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, input.getType(), operator);}

关键就是根据是否有Evicting,选择创建不同的WindowOperator

然后调用input.transform,将windowedStream转换成SingleOutputStream,

这里input,即是keyedStream

// ------------------------------------------------------------------------//  basic transformations// ------------------------------------------------------------------------@Overridepublic 
SingleOutputStreamOperator
transform(String operatorName, TypeInformation
outTypeInfo, OneInputStreamOperator
operator) { SingleOutputStreamOperator
returnStream = super.transform(operatorName, outTypeInfo,operator); // inject the key selector and key type OneInputTransformation
transform = (OneInputTransformation
) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keyType); return returnStream;}

可以看到这里的参数是OneInputStreamOperator,而WindowOperator其实是实现了该interface的,

可以看到,对于OneInputStreamOperator而言,我们只需要实现,processElement和processWatermark两个接口,侧重如何处理input element

/** * Interface for stream operators with one input. Use * {
@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if * you want to implement a custom operator. * * @param
The input type of the operator * @param
The output type of the operator */public interface OneInputStreamOperator
extends StreamOperator
{ /** * Processes one element that arrived at this operator. * This method is guaranteed to not be called concurrently with other methods of the operator. */ void processElement(StreamRecord
element) throws Exception; /** * Processes a {
@link Watermark}. * This method is guaranteed to not be called concurrently with other methods of the operator. * * @see org.apache.flink.streaming.api.watermark.Watermark */ void processWatermark(Watermark mark) throws Exception;}

继续调用,super.transform,即DataStream的transform

 

例子最后,

.addSink(new SinkFunction<Tuple2<Long, Long>>() {...});

实际是调用,

SingleOutputStreamOperator.addSink,即DataStream.addSink

/** * Adds the given sink to this DataStream. Only streams with sinks added * will be executed once the {
@link StreamExecutionEnvironment#execute()} * method is called. * * @param sinkFunction * The object containing the sink's invoke function. * @return The closed DataStream. */public DataStreamSink
addSink(SinkFunction
sinkFunction) { StreamSink
sinkOperator = new StreamSink
(clean(sinkFunction)); DataStreamSink
sink = new DataStreamSink
(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink;}

 

SinkFunction结构,

public interface SinkFunction
extends Function, Serializable { /** * Function for standard sink behaviour. This function is called for every record. * * @param value The input record. * @throws Exception */ void invoke(IN value) throws Exception;}

 

StreamSink,即是OneInputStreamOperator,所以主要是processElement接口

public class StreamSink
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
{ public StreamSink(SinkFunction
sinkFunction) { super(sinkFunction); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord
element) throws Exception { userFunction.invoke(element.getValue()); } @Override public void processWatermark(Watermark mark) throws Exception { // ignore it for now, we are a sink, after all }}

 

DataStreamSink,就是对SinkTransformation的封装

/** * A Stream Sink. This is used for emitting elements from a streaming topology. * * @param 
The type of the elements in the Stream */public class DataStreamSink
{ SinkTransformation
transformation; @SuppressWarnings("unchecked") protected DataStreamSink(DataStream
inputStream, StreamSink
operator) { this.transformation = new SinkTransformation
(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism()); }}

 

最终,

把SinkTransformation加入 List
> transformations

 

最后走到,env.execute();

转载地址:http://ylhll.baihongyu.com/

你可能感兴趣的文章
android手机在slackware linux上的调试
查看>>
mysql性能优化配置
查看>>
JavaScript继承方式详解
查看>>
解决win7旗舰版无法打开微软论坛
查看>>
烂泥:高负载均衡学习haproxy之安装与配置
查看>>
第一个OC的类
查看>>
LeetCode:448. Find All Numbers Disappeared in an Array
查看>>
BarTender条码检验位类型知识讲解
查看>>
过滤器(登录认证)
查看>>
kernel(一) interrupts
查看>>
SQLite数据库约束详解
查看>>
form submit不进入action方法问题
查看>>
c#语言中 (int)、int.Parse()、int.TryParse、Convert.ToInt32的区别
查看>>
kbmMWClientQuery 排序问题
查看>>
WPF中的容器控件——WrapPanel
查看>>
初学CI源码
查看>>
获取数据库内容一
查看>>
移动端知识梳理
查看>>
在CentOS 7中部署Tomcat
查看>>
滚动时div的背景图片随之滚动
查看>>