序
本文主要研究一下flink的StateTtlConfig
实例
import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptorstateDescriptor = new ValueStateDescriptor<>("text state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);
- 这里利用builder创建StateTtlConfig,之后通过StateDescriptor的enableTimeToLive方法传递该config
StateTtlConfig
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java
/** * Configuration of state TTL logic. * *Note: The map state with TTL currently supports {@code null} user values * only if the user value serializer can handle {@code null} values. * If the serializer does not support {@code null} values, * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer} * at the cost of an extra byte in the serialized form. */public class StateTtlConfig implements Serializable { private static final long serialVersionUID = -7592693245044289793L; public static final StateTtlConfig DISABLED = newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build(); /** * This option value configures when to update last access timestamp which prolongs state TTL. */ public enum UpdateType { /** TTL is disabled. State does not expire. */ Disabled, /** Last access timestamp is initialised when state is created and updated on every write operation. */ OnCreateAndWrite, /** The same as
OnCreateAndWrite
but also updated on read. */ OnReadAndWrite } /** * This option configures whether expired user value can be returned or not. */ public enum StateVisibility { /** Return expired user value if it is not cleaned up yet. */ ReturnExpiredIfNotCleanedUp, /** Never return expired user value. */ NeverReturnExpired } /** * This option configures time scale to use for ttl. */ public enum TimeCharacteristic { /** Processing time, see alsoTimeCharacteristic.ProcessingTime
. */ ProcessingTime } private final UpdateType updateType; private final StateVisibility stateVisibility; private final TimeCharacteristic timeCharacteristic; private final Time ttl; private final CleanupStrategies cleanupStrategies; private StateTtlConfig( UpdateType updateType, StateVisibility stateVisibility, TimeCharacteristic timeCharacteristic, Time ttl, CleanupStrategies cleanupStrategies) { this.updateType = Preconditions.checkNotNull(updateType); this.stateVisibility = Preconditions.checkNotNull(stateVisibility); this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic); this.ttl = Preconditions.checkNotNull(ttl); this.cleanupStrategies = cleanupStrategies; Preconditions.checkArgument(ttl.toMilliseconds() > 0, "TTL is expected to be positive"); } @Nonnull public UpdateType getUpdateType() { return updateType; } @Nonnull public StateVisibility getStateVisibility() { return stateVisibility; } @Nonnull public Time getTtl() { return ttl; } @Nonnull public TimeCharacteristic getTimeCharacteristic() { return timeCharacteristic; } public boolean isEnabled() { return updateType != UpdateType.Disabled; } @Nonnull public CleanupStrategies getCleanupStrategies() { return cleanupStrategies; } @Override public String toString() { return "StateTtlConfig{" + "updateType=" + updateType + ", stateVisibility=" + stateVisibility + ", timeCharacteristic=" + timeCharacteristic + ", ttl=" + ttl + '}'; } @Nonnull public static Builder newBuilder(@Nonnull Time ttl) { return new Builder(ttl); } /** * Builder for the {@link StateTtlConfig}. */ public static class Builder { private UpdateType updateType = OnCreateAndWrite; private StateVisibility stateVisibility = NeverReturnExpired; private TimeCharacteristic timeCharacteristic = ProcessingTime; private Time ttl; private CleanupStrategies cleanupStrategies = new CleanupStrategies(); public Builder(@Nonnull Time ttl) { this.ttl = ttl; } /** * Sets the ttl update type. * * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL. */ @Nonnull public Builder setUpdateType(UpdateType updateType) { this.updateType = updateType; return this; } @Nonnull public Builder updateTtlOnCreateAndWrite() { return setUpdateType(UpdateType.OnCreateAndWrite); } @Nonnull public Builder updateTtlOnReadAndWrite() { return setUpdateType(UpdateType.OnReadAndWrite); } /** * Sets the state visibility. * * @param stateVisibility The state visibility configures whether expired user value can be returned or not. */ @Nonnull public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) { this.stateVisibility = stateVisibility; return this; } @Nonnull public Builder returnExpiredIfNotCleanedUp() { return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp); } @Nonnull public Builder neverReturnExpired() { return setStateVisibility(StateVisibility.NeverReturnExpired); } /** * Sets the time characteristic. * * @param timeCharacteristic The time characteristic configures time scale to use for ttl. */ @Nonnull public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) { this.timeCharacteristic = timeCharacteristic; return this; } @Nonnull public Builder useProcessingTime() { return setTimeCharacteristic(TimeCharacteristic.ProcessingTime); } /** Cleanup expired state in full snapshot on checkpoint. */ @Nonnull public Builder cleanupFullSnapshot() { cleanupStrategies.strategies.put( CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT, new CleanupStrategies.CleanupStrategy() { }); return this; } /** * Sets the ttl time. * @param ttl The ttl time. */ @Nonnull public Builder setTtl(@Nonnull Time ttl) { this.ttl = ttl; return this; } @Nonnull public StateTtlConfig build() { return new StateTtlConfig( updateType, stateVisibility, timeCharacteristic, ttl, cleanupStrategies); } } /** * TTL cleanup strategies. * *This class configures when to cleanup expired state with TTL. * By default, state is always cleaned up on explicit read access if found expired. * Currently cleanup of state full snapshot can be additionally activated. */ public static class CleanupStrategies implements Serializable { private static final long serialVersionUID = -1617740467277313524L; /** Fixed strategies ordinals in {@code strategies} config field. */ enum Strategies { FULL_STATE_SCAN_SNAPSHOT } /** Base interface for cleanup strategies configurations. */ interface CleanupStrategy extends Serializable { } final EnumMap
strategies = new EnumMap<>(Strategies.class); public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); } }}
- StateTtlConfig用于设置state的TTL属性,这里定义了三个枚举,分别是UpdateType(
Disabled、OnCreateAndWrite、OnReadAndWrite
)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired
)、TimeCharacteristic(ProcessingTime
) - StateTtlConfig定义了CleanupStrategies,即TTL state的清理策略,默认在读取到expired的state时会进行清理,目前还额外提供在FULL_STATE_SCAN_SNAPSHOT的时候进行清理(
在checkpoint时清理full snapshot中的expired state
)的选项 - StateTtlConfig还提供了一个Builder,用于快速设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies
AbstractKeyedStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
/** * @see KeyedStateBackend */ @Override @SuppressWarnings("unchecked") publicS getOrCreateKeyedState( final TypeSerializer namespaceSerializer, StateDescriptor stateDescriptor) throws Exception { checkNotNull(namespaceSerializer, "Namespace serializer"); checkNotNull(keySerializer, "State key serializer has not been configured in the config. " + "This operation cannot use partitioned state."); InternalKvStatekvState = keyValueStatesByName.get(stateDescriptor.getName()); if (kvState == null) { if (!stateDescriptor.isSerializerInitialized()) { stateDescriptor.initializeSerializerUnlessSet(executionConfig); } kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled( namespaceSerializer, stateDescriptor, this, ttlTimeProvider); keyValueStatesByName.put(stateDescriptor.getName(), kvState); publishQueryableStateIfEnabled(stateDescriptor, kvState); } return (S) kvState; }
- AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState
TtlStateFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
/** * This state factory wraps state objects, produced by backends, with TTL logic. */public class TtlStateFactory{ public static IS createStateAndWrapWithTtlIfEnabled( TypeSerializer namespaceSerializer, StateDescriptor stateDesc, KeyedStateFactory originalStateFactory, TtlTimeProvider timeProvider) throws Exception { Preconditions.checkNotNull(namespaceSerializer); Preconditions.checkNotNull(stateDesc); Preconditions.checkNotNull(originalStateFactory); Preconditions.checkNotNull(timeProvider); return stateDesc.getTtlConfig().isEnabled() ? new TtlStateFactory( namespaceSerializer, stateDesc, originalStateFactory, timeProvider) .createState() : originalStateFactory.createInternalState(namespaceSerializer, stateDesc); } private final Map , SupplierWithException > stateFactories; private final TypeSerializer namespaceSerializer; private final StateDescriptor stateDesc; private final KeyedStateFactory originalStateFactory; private final StateTtlConfig ttlConfig; private final TtlTimeProvider timeProvider; private final long ttl; private TtlStateFactory( TypeSerializernamespaceSerializer, StateDescriptor stateDesc, KeyedStateFactory originalStateFactory, TtlTimeProvider timeProvider) { this.namespaceSerializer = namespaceSerializer; this.stateDesc = stateDesc; this.originalStateFactory = originalStateFactory; this.ttlConfig = stateDesc.getTtlConfig(); this.timeProvider = timeProvider; this.ttl = ttlConfig.getTtl().toMilliseconds(); this.stateFactories = createStateFactories(); } private Map, SupplierWithException > createStateFactories() { return Stream.of( Tuple2.of(ValueStateDescriptor.class, (SupplierWithException ) this::createValueState), Tuple2.of(ListStateDescriptor.class, (SupplierWithException ) this::createListState), Tuple2.of(MapStateDescriptor.class, (SupplierWithException ) this::createMapState), Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException ) this::createReducingState), Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException ) this::createAggregatingState), Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException ) this::createFoldingState) ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); } private IS createState() throws Exception { SupplierWithException stateFactory = stateFactories.get(stateDesc.getClass()); if (stateFactory == null) { String message = String.format("State %s is not supported by %s", stateDesc.getClass(), TtlStateFactory.class); throw new FlinkRuntimeException(message); } return stateFactory.get(); } @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor > ttlDescriptor = new ValueStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlValueState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } @SuppressWarnings("unchecked") private IS createListState() throws Exception { ListStateDescriptor listStateDesc = (ListStateDescriptor ) stateDesc; ListStateDescriptor > ttlDescriptor = new ListStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer())); return (IS) new TtlListState<>( originalStateFactory.createInternalState( namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, listStateDesc.getSerializer()); } @SuppressWarnings("unchecked") private IS createMapState() throws Exception { MapStateDescriptor mapStateDesc = (MapStateDescriptor ) stateDesc; MapStateDescriptor > ttlDescriptor = new MapStateDescriptor<>( stateDesc.getName(), mapStateDesc.getKeySerializer(), new TtlSerializer<>(mapStateDesc.getValueSerializer())); return (IS) new TtlMapState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, mapStateDesc.getSerializer()); } @SuppressWarnings("unchecked") private IS createReducingState() throws Exception { ReducingStateDescriptor reducingStateDesc = (ReducingStateDescriptor ) stateDesc; ReducingStateDescriptor > ttlDescriptor = new ReducingStateDescriptor<>( stateDesc.getName(), new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlReducingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } @SuppressWarnings("unchecked") private IS createAggregatingState() throws Exception { AggregatingStateDescriptor aggregatingStateDescriptor = (AggregatingStateDescriptor ) stateDesc; TtlAggregateFunction ttlAggregateFunction = new TtlAggregateFunction<>( aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider); AggregatingStateDescriptor , OUT> ttlDescriptor = new AggregatingStateDescriptor<>( stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction); } @SuppressWarnings({"deprecation", "unchecked"}) private IS createFoldingState() throws Exception { FoldingStateDescriptor foldingStateDescriptor = (FoldingStateDescriptor ) stateDesc; SV initAcc = stateDesc.getDefaultValue(); TtlValue ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE); FoldingStateDescriptor > ttlDescriptor = new FoldingStateDescriptor<>( stateDesc.getName(), ttlInitAcc, new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlFoldingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } //......}
- TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建state,如果开启ttl则调用new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState(),否则调用originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
- 这里createStateFactories创建了不同类型的StateDescriptor对应创建方法的map,在createState的时候,根据指定类型自动调用对应的SupplierWithException,省去if else的判断
- ValueStateDescriptor对应createValueState方法,创建的是TtlValueState;ListStateDescriptor对应createListState方法,创建的是TtlListState;MapStateDescriptor对应createMapState方法,创建的是TtlMapState;ReducingStateDescriptor对应createReducingState方法,创建的是TtlReducingState;AggregatingStateDescriptor对应createAggregatingState方法,创建的是TtlAggregatingState;FoldingStateDescriptor对应createFoldingState方法,创建的是TtlFoldingState
小结
- StateTtlConfig用于设置state的TTL属性,这里主要设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies这几个属性
- AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState
- TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建对应的state;TtlStateFactory的createState会根据不同类型的StateDescriptor创建对应类型的ttl state