package org.apache.flink.runtime.taskexecutor.slot;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceBudgetManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.class */
public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTable<T> {
    private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTableImpl.class);
    private final int numberSlots;
    private final ResourceProfile defaultSlotResourceProfile;
    private final int memoryPageSize;
    private final TimerService<AllocationID> timerService;
    private final Map<Integer, TaskSlot<T>> taskSlots;
    private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
    private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;
    private final Map<JobID, Set<AllocationID>> slotsPerJob;

    @Nullable
    private SlotActions slotActions;
    private volatile State state;
    private int dynamicSlotIndex;
    private final ResourceBudgetManager budgetManager;
    private final CompletableFuture<Void> closingFuture;
    private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("TaskSlotTableImpl is not initialized with proper main thread executor, call to TaskSlotTableImpl#start is required");
    private final Executor memoryVerificationExecutor;

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl$PayloadIterator.class */
    private final class PayloadIterator implements Iterator<T> {
        private final Iterator<TaskSlot<T>> taskSlotIterator;
        private Iterator<T> currentTasks;

        private PayloadIterator(JobID jobID) {
            this.taskSlotIterator = new TaskSlotIterator(jobID, TaskSlotState.ACTIVE);
            this.currentTasks = null;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (true) {
                if ((this.currentTasks == null || !this.currentTasks.hasNext()) && this.taskSlotIterator.hasNext()) {
                    this.currentTasks = this.taskSlotIterator.next().getTasks();
                }
            }
            return this.currentTasks != null && this.currentTasks.hasNext();
        }

        @Override // java.util.Iterator
        public T next() {
            while (true) {
                if (this.currentTasks != null && this.currentTasks.hasNext()) {
                    return this.currentTasks.next();
                }
                try {
                    this.currentTasks = this.taskSlotIterator.next().getTasks();
                } catch (NoSuchElementException e) {
                    throw new NoSuchElementException("No more tasks.");
                }
            }
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl$State.class */
    public enum State {
        CREATED,
        RUNNING,
        CLOSING,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl$TaskSlotIterator.class */
    public final class TaskSlotIterator implements Iterator<TaskSlot<T>> {
        private final Iterator<AllocationID> allSlots;
        private final TaskSlotState state;
        private TaskSlot<T> currentSlot;

        private TaskSlotIterator(TaskSlotTableImpl taskSlotTableImpl, TaskSlotState taskSlotState) {
            this((Iterator<AllocationID>) ((Set) taskSlotTableImpl.slotsPerJob.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet())).iterator(), taskSlotState);
        }

        private TaskSlotIterator(TaskSlotTableImpl taskSlotTableImpl, JobID jobID, TaskSlotState taskSlotState) {
            this((Iterator<AllocationID>) (taskSlotTableImpl.slotsPerJob.get(jobID) == null ? Collections.emptyIterator() : ((Set) taskSlotTableImpl.slotsPerJob.get(jobID)).iterator()), taskSlotState);
        }

        private TaskSlotIterator(Iterator<AllocationID> it, TaskSlotState taskSlotState) {
            this.allSlots = (Iterator) Preconditions.checkNotNull(it);
            this.state = (TaskSlotState) Preconditions.checkNotNull(taskSlotState);
            this.currentSlot = null;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (this.currentSlot == null && this.allSlots.hasNext()) {
                TaskSlot<T> taskSlot = TaskSlotTableImpl.this.getTaskSlot(this.allSlots.next());
                if (taskSlot != null && taskSlot.getState() == this.state) {
                    this.currentSlot = taskSlot;
                }
            }
            return this.currentSlot != null;
        }

        @Override // java.util.Iterator
        public TaskSlot<T> next() {
            if (this.currentSlot != null) {
                TaskSlot<T> taskSlot = this.currentSlot;
                this.currentSlot = null;
                return taskSlot;
            }
            while (true) {
                try {
                    TaskSlot<T> taskSlot2 = TaskSlotTableImpl.this.getTaskSlot(this.allSlots.next());
                    if (taskSlot2 != null && taskSlot2.getState() == this.state) {
                        return taskSlot2;
                    }
                } catch (NoSuchElementException e) {
                    throw new NoSuchElementException("No more task slots.");
                }
            }
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("Cannot remove task slots via this iterator.");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl$TaskSlotMapping.class */
    private static final class TaskSlotMapping<T extends TaskSlotPayload> {
        private final T task;
        private final TaskSlot<T> taskSlot;

        private TaskSlotMapping(T t, TaskSlot<T> taskSlot) {
            this.task = (T) Preconditions.checkNotNull(t);
            this.taskSlot = (TaskSlot) Preconditions.checkNotNull(taskSlot);
        }

        public T getTask() {
            return this.task;
        }

        public TaskSlot<T> getTaskSlot() {
            return this.taskSlot;
        }
    }

    public TaskSlotTableImpl(int i, ResourceProfile resourceProfile, ResourceProfile resourceProfile2, int i2, TimerService<AllocationID> timerService, Executor executor) {
        Preconditions.checkArgument(0 < i, "The number of task slots must be greater than 0.");
        this.numberSlots = i;
        this.dynamicSlotIndex = i;
        this.defaultSlotResourceProfile = (ResourceProfile) Preconditions.checkNotNull(resourceProfile2);
        this.memoryPageSize = i2;
        this.taskSlots = new HashMap(i);
        this.timerService = (TimerService) Preconditions.checkNotNull(timerService);
        this.budgetManager = new ResourceBudgetManager((ResourceProfile) Preconditions.checkNotNull(resourceProfile));
        this.allocatedSlots = new HashMap(i);
        this.taskSlotMappings = new HashMap(4 * i);
        this.slotsPerJob = new HashMap(4);
        this.slotActions = null;
        this.state = State.CREATED;
        this.closingFuture = new CompletableFuture<>();
        this.memoryVerificationExecutor = executor;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public void start(SlotActions slotActions, ComponentMainThreadExecutor componentMainThreadExecutor) {
        Preconditions.checkState(this.state == State.CREATED, "The %s has to be just created before starting", TaskSlotTableImpl.class.getSimpleName());
        this.slotActions = (SlotActions) Preconditions.checkNotNull(slotActions);
        this.mainThreadExecutor = (ComponentMainThreadExecutor) Preconditions.checkNotNull(componentMainThreadExecutor);
        this.timerService.start(this);
        this.state = State.RUNNING;
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        if (this.state == State.CREATED) {
            this.state = State.CLOSED;
            this.closingFuture.complete(null);
        } else if (this.state == State.RUNNING) {
            this.state = State.CLOSING;
            FlinkException flinkException = new FlinkException("Closing task slot table");
            FutureUtils.forward(FutureUtils.waitForAll((Collection) new ArrayList(this.allocatedSlots.values()).stream().map(taskSlot -> {
                return freeSlotInternal(taskSlot, flinkException);
            }).collect(Collectors.toList())).thenRunAsync(() -> {
                this.state = State.CLOSED;
                this.timerService.stop();
            }, (Executor) this.mainThreadExecutor), this.closingFuture);
        }
        return this.closingFuture;
    }

    @VisibleForTesting
    public boolean isClosed() {
        return this.state == State.CLOSED;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public Set<AllocationID> getAllocationIdsPerJob(JobID jobID) {
        Set<AllocationID> set = this.slotsPerJob.get(jobID);
        return set == null ? Collections.emptySet() : Collections.unmodifiableSet(set);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public Set<AllocationID> getActiveTaskSlotAllocationIds() {
        return createAllocationIdSet(new TaskSlotIterator(TaskSlotState.ACTIVE));
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public Set<AllocationID> getActiveTaskSlotAllocationIdsPerJob(JobID jobID) {
        return createAllocationIdSet(new TaskSlotIterator(jobID, TaskSlotState.ACTIVE));
    }

    private Set<AllocationID> createAllocationIdSet(Iterator<TaskSlot<T>> it) {
        HashSet hashSet = new HashSet();
        while (it.hasNext()) {
            hashSet.add(it.next().getAllocationId());
        }
        return hashSet;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public SlotReport createSlotReport(ResourceID resourceID) {
        SlotStatus slotStatus;
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.numberSlots; i++) {
            SlotID slotID = new SlotID(resourceID, i);
            if (this.taskSlots.containsKey(Integer.valueOf(i))) {
                TaskSlot<T> taskSlot = this.taskSlots.get(Integer.valueOf(i));
                slotStatus = new SlotStatus(slotID, taskSlot.getResourceProfile(), taskSlot.getJobId(), taskSlot.getAllocationId());
            } else {
                slotStatus = new SlotStatus(slotID, this.defaultSlotResourceProfile, null, null);
            }
            arrayList.add(slotStatus);
        }
        for (TaskSlot<T> taskSlot2 : this.allocatedSlots.values()) {
            if (isDynamicIndex(taskSlot2.getIndex())) {
                arrayList.add(new SlotStatus(new SlotID(resourceID, taskSlot2.getIndex()), taskSlot2.getResourceProfile(), taskSlot2.getJobId(), taskSlot2.getAllocationId()));
            }
        }
        return new SlotReport(arrayList);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    @VisibleForTesting
    public boolean allocateSlot(int i, JobID jobID, AllocationID allocationID, Time time) {
        return allocateSlot(i, jobID, allocationID, this.defaultSlotResourceProfile, time);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean allocateSlot(int i, JobID jobID, AllocationID allocationID, ResourceProfile resourceProfile, Time time) {
        checkRunning();
        Preconditions.checkArgument(i < this.numberSlots);
        int nextDynamicSlotIndex = i < 0 ? nextDynamicSlotIndex() : i;
        ResourceProfile resourceProfile2 = resourceProfile.equals(ResourceProfile.UNKNOWN) ? this.defaultSlotResourceProfile : resourceProfile;
        TaskSlot<T> taskSlot = this.allocatedSlots.get(allocationID);
        if (taskSlot != null) {
            return isDuplicatedSlot(taskSlot, jobID, resourceProfile2, nextDynamicSlotIndex);
        }
        if (isIndexAlreadyTaken(nextDynamicSlotIndex)) {
            LOG.info("The slot with index {} is already assigned to another allocation with id {}.", Integer.valueOf(nextDynamicSlotIndex), this.taskSlots.get(Integer.valueOf(nextDynamicSlotIndex)).getAllocationId());
            return false;
        }
        if (!this.budgetManager.reserve(resourceProfile2)) {
            LOG.info("Cannot allocate the requested resources. Trying to allocate {}, while the currently remaining available resources are {}, total is {}.", new Object[]{resourceProfile2, this.budgetManager.getAvailableBudget(), this.budgetManager.getTotalBudget()});
            return false;
        }
        TaskSlot<T> taskSlot2 = new TaskSlot<>(nextDynamicSlotIndex, resourceProfile2, this.memoryPageSize, jobID, allocationID, this.memoryVerificationExecutor);
        this.taskSlots.put(Integer.valueOf(nextDynamicSlotIndex), taskSlot2);
        this.allocatedSlots.put(allocationID, taskSlot2);
        this.timerService.registerTimeout(allocationID, time.getSize(), time.getUnit());
        Set<AllocationID> set = this.slotsPerJob.get(jobID);
        if (set == null) {
            set = new HashSet(4);
            this.slotsPerJob.put(jobID, set);
        }
        set.add(allocationID);
        return true;
    }

    private boolean isDuplicatedSlot(TaskSlot taskSlot, JobID jobID, ResourceProfile resourceProfile, int i) {
        LOG.info("Slot with allocationId {} already exist, with resource profile {}, job id {} and index {}. The required index is {}.", new Object[]{taskSlot.getAllocationId(), taskSlot.getResourceProfile(), taskSlot.getJobId(), Integer.valueOf(taskSlot.getIndex()), Integer.valueOf(i)});
        return taskSlot.getJobId().equals(jobID) && taskSlot.getResourceProfile().equals(resourceProfile) && (isDynamicIndex(i) || taskSlot.getIndex() == i);
    }

    private boolean isIndexAlreadyTaken(int i) {
        return this.taskSlots.get(Integer.valueOf(i)) != null;
    }

    private boolean isDynamicIndex(int i) {
        return i >= this.numberSlots;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean markSlotActive(AllocationID allocationID) throws SlotNotFoundException {
        checkRunning();
        TaskSlot<T> taskSlot = getTaskSlot(allocationID);
        if (taskSlot != null) {
            return markExistingSlotActive(taskSlot);
        }
        throw new SlotNotFoundException(allocationID);
    }

    private boolean markExistingSlotActive(TaskSlot<T> taskSlot) {
        if (!taskSlot.markActive()) {
            return false;
        }
        LOG.info("Activate slot {}.", taskSlot.getAllocationId());
        this.timerService.unregisterTimeout(taskSlot.getAllocationId());
        return true;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean markSlotInactive(AllocationID allocationID, Time time) throws SlotNotFoundException {
        checkStarted();
        TaskSlot<T> taskSlot = getTaskSlot(allocationID);
        if (taskSlot == null) {
            throw new SlotNotFoundException(allocationID);
        }
        if (!taskSlot.markInactive()) {
            return false;
        }
        this.timerService.registerTimeout(allocationID, time.getSize(), time.getUnit());
        return true;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public int freeSlot(AllocationID allocationID, Throwable th) throws SlotNotFoundException {
        checkStarted();
        TaskSlot<T> taskSlot = getTaskSlot(allocationID);
        if (taskSlot == null) {
            throw new SlotNotFoundException(allocationID);
        }
        if (freeSlotInternal(taskSlot, th).isDone()) {
            return taskSlot.getIndex();
        }
        return -1;
    }

    private CompletableFuture<Void> freeSlotInternal(TaskSlot<T> taskSlot, Throwable th) {
        AllocationID allocationId = taskSlot.getAllocationId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Free slot {}.", taskSlot, th);
        } else {
            LOG.info("Free slot {}.", taskSlot);
        }
        if (taskSlot.isEmpty()) {
            this.allocatedSlots.remove(allocationId);
            this.timerService.unregisterTimeout(allocationId);
            JobID jobId = taskSlot.getJobId();
            Set<AllocationID> set = this.slotsPerJob.get(jobId);
            if (set == null) {
                throw new IllegalStateException("There are no more slots allocated for the job " + jobId + ". This indicates a programming bug.");
            }
            set.remove(allocationId);
            if (set.isEmpty()) {
                this.slotsPerJob.remove(jobId);
            }
            this.taskSlots.remove(Integer.valueOf(taskSlot.getIndex()));
            this.budgetManager.release(taskSlot.getResourceProfile());
        }
        return taskSlot.closeAsync(th);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean isValidTimeout(AllocationID allocationID, UUID uuid) {
        checkStarted();
        return this.state == State.RUNNING && this.timerService.isValid(allocationID, uuid);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean isAllocated(int i, JobID jobID, AllocationID allocationID) {
        TaskSlot<T> taskSlot = this.taskSlots.get(Integer.valueOf(i));
        if (taskSlot != null) {
            return taskSlot.isAllocated(jobID, allocationID);
        }
        return false;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean tryMarkSlotActive(JobID jobID, AllocationID allocationID) {
        TaskSlot<T> taskSlot = getTaskSlot(allocationID);
        if (taskSlot == null || !taskSlot.isAllocated(jobID, allocationID)) {
            return false;
        }
        return markExistingSlotActive(taskSlot);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean isSlotFree(int i) {
        return !this.taskSlots.containsKey(Integer.valueOf(i));
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean hasAllocatedSlots(JobID jobID) {
        return getAllocatedSlots(jobID).hasNext();
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobID) {
        return new TaskSlotIterator(jobID, TaskSlotState.ALLOCATED);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    @Nullable
    public JobID getOwningJob(AllocationID allocationID) {
        TaskSlot<T> taskSlot = getTaskSlot(allocationID);
        if (taskSlot != null) {
            return taskSlot.getJobId();
        }
        return null;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public boolean addTask(T t) throws SlotNotFoundException, SlotNotActiveException {
        checkRunning();
        Preconditions.checkNotNull(t);
        TaskSlot<T> taskSlot = getTaskSlot(t.getAllocationId());
        if (taskSlot == null) {
            throw new SlotNotFoundException(t.getAllocationId());
        }
        if (!taskSlot.isActive(t.getJobID(), t.getAllocationId())) {
            throw new SlotNotActiveException(t.getJobID(), t.getAllocationId());
        }
        if (!taskSlot.add(t)) {
            return false;
        }
        this.taskSlotMappings.put(t.getExecutionId(), new TaskSlotMapping<>(t, taskSlot));
        return true;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public T removeTask(ExecutionAttemptID executionAttemptID) {
        checkStarted();
        TaskSlotMapping<T> remove = this.taskSlotMappings.remove(executionAttemptID);
        if (remove == null) {
            return null;
        }
        T task = remove.getTask();
        TaskSlot<T> taskSlot = remove.getTaskSlot();
        taskSlot.remove(task.getExecutionId());
        if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
            this.slotActions.freeSlot(taskSlot.getAllocationId());
        }
        return task;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public T getTask(ExecutionAttemptID executionAttemptID) {
        TaskSlotMapping<T> taskSlotMapping = this.taskSlotMappings.get(executionAttemptID);
        if (taskSlotMapping != null) {
            return taskSlotMapping.getTask();
        }
        return null;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public Iterator<T> getTasks(JobID jobID) {
        return new PayloadIterator(jobID);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public AllocationID getCurrentAllocation(int i) {
        TaskSlot<T> taskSlot = this.taskSlots.get(Integer.valueOf(i));
        if (taskSlot == null) {
            return null;
        }
        return taskSlot.getAllocationId();
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
    public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
        TaskSlot<T> taskSlot = getTaskSlot(allocationID);
        if (taskSlot != null) {
            return taskSlot.getMemoryManager();
        }
        throw new SlotNotFoundException(allocationID);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TimeoutListener
    public void notifyTimeout(AllocationID allocationID, UUID uuid) {
        checkStarted();
        if (this.slotActions != null) {
            this.slotActions.timeoutSlot(allocationID, uuid);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public TaskSlot<T> getTaskSlot(AllocationID allocationID) {
        Preconditions.checkNotNull(allocationID);
        return this.allocatedSlots.get(allocationID);
    }

    private int nextDynamicSlotIndex() {
        int i = this.dynamicSlotIndex;
        this.dynamicSlotIndex = i + 1;
        return i;
    }

    private void checkRunning() {
        Preconditions.checkState(this.state == State.RUNNING, "The %s has to be running.", TaskSlotTableImpl.class.getSimpleName());
    }

    private void checkStarted() {
        Preconditions.checkState(this.state != State.CREATED, "The %s has to be started (not created).", TaskSlotTableImpl.class.getSimpleName());
    }
}
