package org.apache.flink.table.store.table.source.snapshot;

import java.io.Serializable;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.class */
public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousDataFileSnapshotEnumerator.class);
    private final SnapshotManager snapshotManager;
    private final DataTableScan scan;
    private final StartingScanner startingScanner;
    private final FollowUpScanner followUpScanner;

    @Nullable
    private Long nextSnapshotId;

    /* loaded from: input_file:org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator$Factory.class */
    public interface Factory extends Serializable {
        ContinuousDataFileSnapshotEnumerator create(DataTable dataTable, DataTableScan dataTableScan, @Nullable Long l);
    }

    public ContinuousDataFileSnapshotEnumerator(Path path, DataTableScan dataTableScan, StartingScanner startingScanner, FollowUpScanner followUpScanner, @Nullable Long l) {
        this.snapshotManager = new SnapshotManager(path);
        this.scan = dataTableScan;
        this.startingScanner = startingScanner;
        this.followUpScanner = followUpScanner;
        this.nextSnapshotId = l;
    }

    @Override // org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator
    @Nullable
    public DataTableScan.DataFilePlan enumerate() {
        return this.nextSnapshotId == null ? tryFirstEnumerate() : nextEnumerate();
    }

    private DataTableScan.DataFilePlan tryFirstEnumerate() {
        DataTableScan.DataFilePlan plan = this.startingScanner.getPlan(this.snapshotManager, this.scan);
        if (plan != null) {
            this.nextSnapshotId = Long.valueOf(plan.snapshotId.longValue() + 1);
        }
        return plan;
    }

    private DataTableScan.DataFilePlan nextEnumerate() {
        while (this.snapshotManager.snapshotExists(this.nextSnapshotId.longValue())) {
            if (this.followUpScanner.shouldScanSnapshot(this.snapshotManager.snapshot(this.nextSnapshotId.longValue()))) {
                LOG.debug("Find snapshot id {}.", this.nextSnapshotId);
                DataTableScan.DataFilePlan plan = this.followUpScanner.getPlan(this.nextSnapshotId.longValue(), this.scan);
                Long l = this.nextSnapshotId;
                this.nextSnapshotId = Long.valueOf(this.nextSnapshotId.longValue() + 1);
                return plan;
            }
            Long l2 = this.nextSnapshotId;
            this.nextSnapshotId = Long.valueOf(this.nextSnapshotId.longValue() + 1);
        }
        LOG.debug("Next snapshot id {} does not exist, wait for the snapshot generation.", this.nextSnapshotId);
        return null;
    }

    public static ContinuousDataFileSnapshotEnumerator createWithSnapshotStarting(DataTable dataTable, DataTableScan dataTableScan) {
        return new ContinuousDataFileSnapshotEnumerator(dataTable.location(), dataTableScan, dataTable.options().startupMode() == CoreOptions.StartupMode.COMPACTED_FULL ? new CompactedStartingScanner() : new FullStartingScanner(), createFollowUpScanner(dataTable, dataTableScan), null);
    }

    public static ContinuousDataFileSnapshotEnumerator create(DataTable dataTable, DataTableScan dataTableScan, @Nullable Long l) {
        return new ContinuousDataFileSnapshotEnumerator(dataTable.location(), dataTableScan, createStartingScanner(dataTable), createFollowUpScanner(dataTable, dataTableScan), l);
    }

    private static StartingScanner createStartingScanner(DataTable dataTable) {
        CoreOptions.StartupMode startupMode = dataTable.options().startupMode();
        Long scanTimestampMills = dataTable.options().scanTimestampMills();
        if (startupMode == CoreOptions.StartupMode.LATEST_FULL) {
            return new FullStartingScanner();
        }
        if (startupMode == CoreOptions.StartupMode.LATEST) {
            return new ContinuousLatestStartingScanner();
        }
        if (startupMode == CoreOptions.StartupMode.COMPACTED_FULL) {
            return new CompactedStartingScanner();
        }
        if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
            Preconditions.checkNotNull(scanTimestampMills, String.format("%s can not be null when you use %s for %s", CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), CoreOptions.StartupMode.FROM_TIMESTAMP, CoreOptions.SCAN_MODE.key()));
            return new ContinuousFromTimestampStartingScanner(scanTimestampMills.longValue());
        }
        if (startupMode != CoreOptions.StartupMode.FROM_SNAPSHOT) {
            throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
        }
        Long scanSnapshotId = dataTable.options().scanSnapshotId();
        Preconditions.checkNotNull(scanSnapshotId, String.format("%s can not be null when you use %s for %s", CoreOptions.SCAN_SNAPSHOT_ID.key(), CoreOptions.StartupMode.FROM_SNAPSHOT, CoreOptions.SCAN_MODE.key()));
        return new ContinuousFromSnapshotStartingScanner(scanSnapshotId.longValue());
    }

    private static FollowUpScanner createFollowUpScanner(DataTable dataTable, DataTableScan dataTableScan) {
        CoreOptions.ChangelogProducer changelogProducer = dataTable.options().changelogProducer();
        if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
            return new DeltaFollowUpScanner();
        }
        if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
            return new InputChangelogFollowUpScanner();
        }
        if (changelogProducer != CoreOptions.ChangelogProducer.FULL_COMPACTION) {
            throw new UnsupportedOperationException("Unknown changelog producer " + changelogProducer.name());
        }
        dataTableScan.withLevel(dataTable.options().numLevels() - 1);
        return new CompactionChangelogFollowUpScanner();
    }

    public static void validate(TableSchema tableSchema) {
        CoreOptions coreOptions = new CoreOptions(tableSchema.options());
        CoreOptions.MergeEngine mergeEngine = coreOptions.mergeEngine();
        HashMap<CoreOptions.MergeEngine, String> hashMap = new HashMap<CoreOptions.MergeEngine, String>() { // from class: org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator.1
            {
                put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial update");
                put(CoreOptions.MergeEngine.AGGREGATE, "Pre-aggregate");
            }
        };
        if (tableSchema.primaryKeys().size() > 0 && hashMap.containsKey(mergeEngine) && coreOptions.changelogProducer() != CoreOptions.ChangelogProducer.FULL_COMPACTION) {
            throw new ValidationException(hashMap.get(mergeEngine) + " continuous reading is not supported. You can use full compaction changelog producer to support streaming reading.");
        }
    }
}
