package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.util.CollectionUtil;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.class */
final class StatefulSinkWriterStateHandler<InputT, WriterStateT> implements SinkWriterStateHandler<InputT> {
    private static final ListStateDescriptor<byte[]> WRITER_RAW_STATES_DESC = new ListStateDescriptor<>("writer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer;
    private final Collection<String> previousSinkStateNames;
    private final StatefulSink<InputT, WriterStateT> sink;
    private List<ListState<WriterStateT>> previousSinkStates = new ArrayList();
    private ListState<WriterStateT> writerState;
    private StatefulSink.StatefulSinkWriter<InputT, WriterStateT> sinkWriter;

    public StatefulSinkWriterStateHandler(StatefulSink<InputT, WriterStateT> statefulSink) {
        this.sink = statefulSink;
        Collection<String> compatibleWriterStateNames = statefulSink instanceof StatefulSink.WithCompatibleState ? ((StatefulSink.WithCompatibleState) statefulSink).getCompatibleWriterStateNames() : Collections.emptyList();
        this.writerStateSimpleVersionedSerializer = statefulSink.getWriterStateSerializer();
        this.previousSinkStateNames = compatibleWriterStateNames;
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterStateHandler
    public SinkWriter<InputT> createWriter(Sink.InitContext initContext, StateInitializationContext stateInitializationContext) throws Exception {
        this.writerState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC), this.writerStateSimpleVersionedSerializer);
        if (stateInitializationContext.isRestored()) {
            ArrayList arrayList = new ArrayList(CollectionUtil.iterableToList(this.writerState.get()));
            Iterator<String> it = this.previousSinkStateNames.iterator();
            while (it.hasNext()) {
                SimpleVersionedListState simpleVersionedListState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(it.next(), BytePrimitiveArraySerializer.INSTANCE)), this.writerStateSimpleVersionedSerializer);
                this.previousSinkStates.add(simpleVersionedListState);
                Iterables.addAll(arrayList, simpleVersionedListState.get());
            }
            this.sinkWriter = this.sink.restoreWriter(initContext, arrayList);
        } else {
            this.sinkWriter = this.sink.createWriter(initContext);
        }
        return this.sinkWriter;
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterStateHandler
    public void snapshotState(long j) throws Exception {
        this.writerState.update(this.sinkWriter.snapshotState(j));
        this.previousSinkStates.forEach((v0) -> {
            v0.clear();
        });
    }
}
