package org.apache.flink.table.runtime.operators.join.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.class */
public class KeyedLookupJoinWrapper extends KeyedProcessFunction<RowData, RowData, RowData> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(KeyedLookupJoinWrapper.class);
    private static final String STATE_CLEARED_WARN_MSG = "The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.";
    private final LookupJoinRunner lookupJoinRunner;
    private final StateTtlConfig ttlConfig;
    private final TypeSerializer<RowData> serializer;
    private final boolean lookupKeyContainsPrimaryKey;
    private final boolean lenient = true;
    private transient BinaryRowData emptyRow;
    private transient ValueState<List<RowData>> state;
    private transient ValueState<RowData> uniqueState;
    private transient FetchedRecordListener collectListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper$FetchedRecordListener.class */
    public class FetchedRecordListener implements ListenableCollector.CollectListener<RowData> {
        boolean collected;

        FetchedRecordListener() {
        }

        void reset() {
            this.collected = false;
        }

        @Override // org.apache.flink.table.runtime.collector.ListenableCollector.CollectListener
        public void onCollect(RowData rowData) {
            this.collected = true;
            if (null == rowData) {
                KeyedLookupJoinWrapper.this.updateState(KeyedLookupJoinWrapper.this.emptyRow);
            } else {
                KeyedLookupJoinWrapper.this.updateState(rowData);
            }
        }
    }

    public KeyedLookupJoinWrapper(LookupJoinRunner lookupJoinRunner, StateTtlConfig stateTtlConfig, TypeSerializer<RowData> typeSerializer, boolean z) {
        this.lookupJoinRunner = lookupJoinRunner;
        this.ttlConfig = stateTtlConfig;
        this.serializer = typeSerializer;
        this.lookupKeyContainsPrimaryKey = z;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.lookupJoinRunner.setRuntimeContext(getRuntimeContext());
        this.lookupJoinRunner.open(configuration);
        if (this.lookupKeyContainsPrimaryKey) {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("unique-value", this.serializer);
            if (this.ttlConfig.isEnabled()) {
                valueStateDescriptor.enableTimeToLive(this.ttlConfig);
            }
            this.uniqueState = getRuntimeContext().getState(valueStateDescriptor);
        } else {
            ValueStateDescriptor valueStateDescriptor2 = new ValueStateDescriptor("values", new ListSerializer(this.serializer));
            this.state = getRuntimeContext().getState(valueStateDescriptor2);
            if (this.ttlConfig.isEnabled()) {
                valueStateDescriptor2.enableTimeToLive(this.ttlConfig);
            }
        }
        this.emptyRow = initEmptyRow(this.lookupJoinRunner.tableFieldsCount);
        this.collectListener = new FetchedRecordListener();
        this.lookupJoinRunner.collector.setCollectListener(this.collectListener);
    }

    private BinaryRowData initEmptyRow(int i) {
        BinaryRowData binaryRowData = new BinaryRowData(i);
        int fixedLengthPartSize = binaryRowData.getFixedLengthPartSize();
        binaryRowData.pointTo(MemorySegmentFactory.wrap(new byte[fixedLengthPartSize]), 0, fixedLengthPartSize);
        for (int i2 = 0; i2 < i; i2++) {
            binaryRowData.setNullAt(i2);
        }
        return binaryRowData;
    }

    @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction
    public void processElement(RowData rowData, KeyedProcessFunction<RowData, RowData, RowData>.Context context, Collector<RowData> collector) throws Exception {
        this.lookupJoinRunner.prepareCollector(rowData, collector);
        this.collectListener.reset();
        if (RowDataUtil.isAccumulateMsg(rowData)) {
            deleteState();
            this.lookupJoinRunner.doFetch(rowData);
            if (!this.collectListener.collected) {
                updateState(this.emptyRow);
            }
            this.lookupJoinRunner.padNullForLeftJoin(rowData, collector);
            return;
        }
        if (this.lookupKeyContainsPrimaryKey) {
            RowData value = this.uniqueState.value();
            if (null == value) {
                stateStaledErrorHandle(rowData, collector);
            } else {
                collectDeleteRow(rowData, value, collector);
            }
        } else {
            List<RowData> value2 = this.state.value();
            if (null == value2) {
                stateStaledErrorHandle(rowData, collector);
            } else {
                Iterator<RowData> it = value2.iterator();
                while (it.hasNext()) {
                    collectDeleteRow(rowData, it.next(), collector);
                }
            }
        }
        deleteState();
    }

    private void collectDeleteRow(RowData rowData, RowData rowData2, Collector<RowData> collector) {
        this.lookupJoinRunner.outRow.replace(rowData, rowData2);
        this.lookupJoinRunner.outRow.setRowKind(RowKind.DELETE);
        collector.collect(this.lookupJoinRunner.outRow);
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        this.lookupJoinRunner.close();
        super.close();
    }

    void deleteState() {
        if (this.lookupKeyContainsPrimaryKey) {
            this.uniqueState.clear();
        } else {
            this.state.clear();
        }
    }

    void updateState(RowData rowData) {
        try {
            if (this.lookupKeyContainsPrimaryKey) {
                this.uniqueState.update(rowData);
            } else {
                List<RowData> value = this.state.value();
                if (null == value) {
                    value = new ArrayList();
                }
                value.add(rowData);
                this.state.update(value);
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to update state!", e);
        }
    }

    private void stateStaledErrorHandle(RowData rowData, Collector collector) {
        LOG.warn(STATE_CLEARED_WARN_MSG);
        if (this.lookupJoinRunner.isLeftOuterJoin) {
            this.lookupJoinRunner.padNullForLeftJoin(rowData, collector);
        }
    }
}
