博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的StateTtlConfig
阅读量:6568 次
发布时间:2019-06-24

本文共 17632 字,大约阅读时间需要 58 分钟。

本文主要研究一下flink的StateTtlConfig

实例

import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig    .newBuilder(Time.seconds(1))    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)    .build();    ValueStateDescriptor
stateDescriptor = new ValueStateDescriptor<>("text state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);
  • 这里利用builder创建StateTtlConfig,之后通过StateDescriptor的enableTimeToLive方法传递该config

StateTtlConfig

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java

/** * Configuration of state TTL logic. * * 

Note: The map state with TTL currently supports {@code null} user values * only if the user value serializer can handle {@code null} values. * If the serializer does not support {@code null} values, * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer} * at the cost of an extra byte in the serialized form. */public class StateTtlConfig implements Serializable { private static final long serialVersionUID = -7592693245044289793L; public static final StateTtlConfig DISABLED = newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build(); /** * This option value configures when to update last access timestamp which prolongs state TTL. */ public enum UpdateType { /** TTL is disabled. State does not expire. */ Disabled, /** Last access timestamp is initialised when state is created and updated on every write operation. */ OnCreateAndWrite, /** The same as OnCreateAndWrite but also updated on read. */ OnReadAndWrite } /** * This option configures whether expired user value can be returned or not. */ public enum StateVisibility { /** Return expired user value if it is not cleaned up yet. */ ReturnExpiredIfNotCleanedUp, /** Never return expired user value. */ NeverReturnExpired } /** * This option configures time scale to use for ttl. */ public enum TimeCharacteristic { /** Processing time, see also TimeCharacteristic.ProcessingTime. */ ProcessingTime } private final UpdateType updateType; private final StateVisibility stateVisibility; private final TimeCharacteristic timeCharacteristic; private final Time ttl; private final CleanupStrategies cleanupStrategies; private StateTtlConfig( UpdateType updateType, StateVisibility stateVisibility, TimeCharacteristic timeCharacteristic, Time ttl, CleanupStrategies cleanupStrategies) { this.updateType = Preconditions.checkNotNull(updateType); this.stateVisibility = Preconditions.checkNotNull(stateVisibility); this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic); this.ttl = Preconditions.checkNotNull(ttl); this.cleanupStrategies = cleanupStrategies; Preconditions.checkArgument(ttl.toMilliseconds() > 0, "TTL is expected to be positive"); } @Nonnull public UpdateType getUpdateType() { return updateType; } @Nonnull public StateVisibility getStateVisibility() { return stateVisibility; } @Nonnull public Time getTtl() { return ttl; } @Nonnull public TimeCharacteristic getTimeCharacteristic() { return timeCharacteristic; } public boolean isEnabled() { return updateType != UpdateType.Disabled; } @Nonnull public CleanupStrategies getCleanupStrategies() { return cleanupStrategies; } @Override public String toString() { return "StateTtlConfig{" + "updateType=" + updateType + ", stateVisibility=" + stateVisibility + ", timeCharacteristic=" + timeCharacteristic + ", ttl=" + ttl + '}'; } @Nonnull public static Builder newBuilder(@Nonnull Time ttl) { return new Builder(ttl); } /** * Builder for the {@link StateTtlConfig}. */ public static class Builder { private UpdateType updateType = OnCreateAndWrite; private StateVisibility stateVisibility = NeverReturnExpired; private TimeCharacteristic timeCharacteristic = ProcessingTime; private Time ttl; private CleanupStrategies cleanupStrategies = new CleanupStrategies(); public Builder(@Nonnull Time ttl) { this.ttl = ttl; } /** * Sets the ttl update type. * * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL. */ @Nonnull public Builder setUpdateType(UpdateType updateType) { this.updateType = updateType; return this; } @Nonnull public Builder updateTtlOnCreateAndWrite() { return setUpdateType(UpdateType.OnCreateAndWrite); } @Nonnull public Builder updateTtlOnReadAndWrite() { return setUpdateType(UpdateType.OnReadAndWrite); } /** * Sets the state visibility. * * @param stateVisibility The state visibility configures whether expired user value can be returned or not. */ @Nonnull public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) { this.stateVisibility = stateVisibility; return this; } @Nonnull public Builder returnExpiredIfNotCleanedUp() { return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp); } @Nonnull public Builder neverReturnExpired() { return setStateVisibility(StateVisibility.NeverReturnExpired); } /** * Sets the time characteristic. * * @param timeCharacteristic The time characteristic configures time scale to use for ttl. */ @Nonnull public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) { this.timeCharacteristic = timeCharacteristic; return this; } @Nonnull public Builder useProcessingTime() { return setTimeCharacteristic(TimeCharacteristic.ProcessingTime); } /** Cleanup expired state in full snapshot on checkpoint. */ @Nonnull public Builder cleanupFullSnapshot() { cleanupStrategies.strategies.put( CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT, new CleanupStrategies.CleanupStrategy() { }); return this; } /** * Sets the ttl time. * @param ttl The ttl time. */ @Nonnull public Builder setTtl(@Nonnull Time ttl) { this.ttl = ttl; return this; } @Nonnull public StateTtlConfig build() { return new StateTtlConfig( updateType, stateVisibility, timeCharacteristic, ttl, cleanupStrategies); } } /** * TTL cleanup strategies. * *

This class configures when to cleanup expired state with TTL. * By default, state is always cleaned up on explicit read access if found expired. * Currently cleanup of state full snapshot can be additionally activated. */ public static class CleanupStrategies implements Serializable { private static final long serialVersionUID = -1617740467277313524L; /** Fixed strategies ordinals in {@code strategies} config field. */ enum Strategies { FULL_STATE_SCAN_SNAPSHOT } /** Base interface for cleanup strategies configurations. */ interface CleanupStrategy extends Serializable { } final EnumMap

strategies = new EnumMap<>(Strategies.class); public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); } }}

  • StateTtlConfig用于设置state的TTL属性,这里定义了三个枚举,分别是UpdateType(Disabled、OnCreateAndWrite、OnReadAndWrite)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired)、TimeCharacteristic(ProcessingTime)
  • StateTtlConfig定义了CleanupStrategies,即TTL state的清理策略,默认在读取到expired的state时会进行清理,目前还额外提供在FULL_STATE_SCAN_SNAPSHOT的时候进行清理(在checkpoint时清理full snapshot中的expired state)的选项
  • StateTtlConfig还提供了一个Builder,用于快速设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies

AbstractKeyedStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java

/**     * @see KeyedStateBackend     */    @Override    @SuppressWarnings("unchecked")    public 
S getOrCreateKeyedState( final TypeSerializer
namespaceSerializer, StateDescriptor
stateDescriptor) throws Exception { checkNotNull(namespaceSerializer, "Namespace serializer"); checkNotNull(keySerializer, "State key serializer has not been configured in the config. " + "This operation cannot use partitioned state."); InternalKvState
kvState = keyValueStatesByName.get(stateDescriptor.getName()); if (kvState == null) { if (!stateDescriptor.isSerializerInitialized()) { stateDescriptor.initializeSerializerUnlessSet(executionConfig); } kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled( namespaceSerializer, stateDescriptor, this, ttlTimeProvider); keyValueStatesByName.put(stateDescriptor.getName(), kvState); publishQueryableStateIfEnabled(stateDescriptor, kvState); } return (S) kvState; }
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState

TtlStateFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.java

/** * This state factory wraps state objects, produced by backends, with TTL logic. */public class TtlStateFactory
{ public static
IS createStateAndWrapWithTtlIfEnabled( TypeSerializer
namespaceSerializer, StateDescriptor
stateDesc, KeyedStateFactory originalStateFactory, TtlTimeProvider timeProvider) throws Exception { Preconditions.checkNotNull(namespaceSerializer); Preconditions.checkNotNull(stateDesc); Preconditions.checkNotNull(originalStateFactory); Preconditions.checkNotNull(timeProvider); return stateDesc.getTtlConfig().isEnabled() ? new TtlStateFactory
( namespaceSerializer, stateDesc, originalStateFactory, timeProvider) .createState() : originalStateFactory.createInternalState(namespaceSerializer, stateDesc); } private final Map
, SupplierWithException
> stateFactories; private final TypeSerializer
namespaceSerializer; private final StateDescriptor
stateDesc; private final KeyedStateFactory originalStateFactory; private final StateTtlConfig ttlConfig; private final TtlTimeProvider timeProvider; private final long ttl; private TtlStateFactory( TypeSerializer
namespaceSerializer, StateDescriptor
stateDesc, KeyedStateFactory originalStateFactory, TtlTimeProvider timeProvider) { this.namespaceSerializer = namespaceSerializer; this.stateDesc = stateDesc; this.originalStateFactory = originalStateFactory; this.ttlConfig = stateDesc.getTtlConfig(); this.timeProvider = timeProvider; this.ttl = ttlConfig.getTtl().toMilliseconds(); this.stateFactories = createStateFactories(); } private Map
, SupplierWithException
> createStateFactories() { return Stream.of( Tuple2.of(ValueStateDescriptor.class, (SupplierWithException
) this::createValueState), Tuple2.of(ListStateDescriptor.class, (SupplierWithException
) this::createListState), Tuple2.of(MapStateDescriptor.class, (SupplierWithException
) this::createMapState), Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException
) this::createReducingState), Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException
) this::createAggregatingState), Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException
) this::createFoldingState) ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); } private IS createState() throws Exception { SupplierWithException
stateFactory = stateFactories.get(stateDesc.getClass()); if (stateFactory == null) { String message = String.format("State %s is not supported by %s", stateDesc.getClass(), TtlStateFactory.class); throw new FlinkRuntimeException(message); } return stateFactory.get(); } @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor
> ttlDescriptor = new ValueStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlValueState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } @SuppressWarnings("unchecked") private
IS createListState() throws Exception { ListStateDescriptor
listStateDesc = (ListStateDescriptor
) stateDesc; ListStateDescriptor
> ttlDescriptor = new ListStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer())); return (IS) new TtlListState<>( originalStateFactory.createInternalState( namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, listStateDesc.getSerializer()); } @SuppressWarnings("unchecked") private
IS createMapState() throws Exception { MapStateDescriptor
mapStateDesc = (MapStateDescriptor
) stateDesc; MapStateDescriptor
> ttlDescriptor = new MapStateDescriptor<>( stateDesc.getName(), mapStateDesc.getKeySerializer(), new TtlSerializer<>(mapStateDesc.getValueSerializer())); return (IS) new TtlMapState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, mapStateDesc.getSerializer()); } @SuppressWarnings("unchecked") private IS createReducingState() throws Exception { ReducingStateDescriptor
reducingStateDesc = (ReducingStateDescriptor
) stateDesc; ReducingStateDescriptor
> ttlDescriptor = new ReducingStateDescriptor<>( stateDesc.getName(), new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlReducingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } @SuppressWarnings("unchecked") private
IS createAggregatingState() throws Exception { AggregatingStateDescriptor
aggregatingStateDescriptor = (AggregatingStateDescriptor
) stateDesc; TtlAggregateFunction
ttlAggregateFunction = new TtlAggregateFunction<>( aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider); AggregatingStateDescriptor
, OUT> ttlDescriptor = new AggregatingStateDescriptor<>( stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction); } @SuppressWarnings({"deprecation", "unchecked"}) private
IS createFoldingState() throws Exception { FoldingStateDescriptor
foldingStateDescriptor = (FoldingStateDescriptor
) stateDesc; SV initAcc = stateDesc.getDefaultValue(); TtlValue
ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE); FoldingStateDescriptor
> ttlDescriptor = new FoldingStateDescriptor<>( stateDesc.getName(), ttlInitAcc, new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlFoldingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } //......}
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建state,如果开启ttl则调用new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState(),否则调用originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
  • 这里createStateFactories创建了不同类型的StateDescriptor对应创建方法的map,在createState的时候,根据指定类型自动调用对应的SupplierWithException,省去if else的判断
  • ValueStateDescriptor对应createValueState方法,创建的是TtlValueState;ListStateDescriptor对应createListState方法,创建的是TtlListState;MapStateDescriptor对应createMapState方法,创建的是TtlMapState;ReducingStateDescriptor对应createReducingState方法,创建的是TtlReducingState;AggregatingStateDescriptor对应createAggregatingState方法,创建的是TtlAggregatingState;FoldingStateDescriptor对应createFoldingState方法,创建的是TtlFoldingState

小结

  • StateTtlConfig用于设置state的TTL属性,这里主要设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies这几个属性
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建对应的state;TtlStateFactory的createState会根据不同类型的StateDescriptor创建对应类型的ttl state

doc

转载地址:http://gyvjo.baihongyu.com/

你可能感兴趣的文章
0510JS基础:定义、输出、变量
查看>>
C++——友元 friend
查看>>
IPC——线程信号问题
查看>>
ajax 的理论以及需要的常用参数
查看>>
【Unity3D】资源对象、预设、查找对象、组合模式等知识点
查看>>
iOS开发UI篇—实现UITableview控件数据刷新
查看>>
linux命令
查看>>
chrome取消安全模式
查看>>
团队-象棋游戏-设计文档
查看>>
hibernate Expression详解
查看>>
HTTP长连接和短连接以及推送服务原理(转)
查看>>
问卷设计入门
查看>>
input子系统分析之三:驱动模块
查看>>
jquery 选择时间(小时)区间(四)
查看>>
jquery 选择时间(小时)区间(二)
查看>>
WebService的编写与调用
查看>>
(模板)字符串哈希
查看>>
input:focus
查看>>
java中String,int,Integer,char、double类型转换
查看>>
Hdoj 2544
查看>>