package org.apache.flink.table.store.connector.sink;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/CommitterOperator.class */
public class CommitterOperator extends AbstractStreamOperator<Committable> implements OneInputStreamOperator<Committable, Committable>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final boolean streamingCheckpointEnabled;
    private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> committableSerializer;
    private ListState<ManifestCommittable> streamingCommitterState;
    private final SerializableFunction<String, Committer> committerFactory;
    private Committer committer;
    private final Deque<Committable> inputs = new ArrayDeque();
    private boolean endInput = false;
    private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint = new TreeMap();

    public CommitterOperator(boolean z, SerializableFunction<String, Committer> serializableFunction, SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> serializableSupplier) {
        this.streamingCheckpointEnabled = z;
        this.committableSerializer = serializableSupplier;
        this.committerFactory = (SerializableFunction) Preconditions.checkNotNull(serializableFunction);
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        ListState listState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("commit_user_state", String.class));
        ArrayList arrayList = new ArrayList();
        Iterable iterable = (Iterable) listState.get();
        arrayList.getClass();
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
        if (stateInitializationContext.isRestored()) {
            Preconditions.checkState(arrayList.size() == 1, "Expecting 1 commit user name when recovering from checkpoint but found " + arrayList.size() + ". This is unexpected.");
        } else {
            Preconditions.checkState(arrayList.isEmpty(), "Expecting 0 commit user name for a fresh sink state but found " + arrayList.size() + ". This is unexpected.");
            String uuid = UUID.randomUUID().toString();
            listState.add(uuid);
            arrayList.add(uuid);
        }
        this.committer = (Committer) this.committerFactory.apply(arrayList.get(0));
        this.streamingCommitterState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE)), (SimpleVersionedSerializer) this.committableSerializer.get());
        ArrayList arrayList2 = new ArrayList();
        Iterable iterable2 = (Iterable) this.streamingCommitterState.get();
        arrayList2.getClass();
        iterable2.forEach((v1) -> {
            r1.add(v1);
        });
        this.streamingCommitterState.clear();
        commit(true, arrayList2);
    }

    private void commit(boolean z, List<ManifestCommittable> list) throws Exception {
        if (!z) {
            this.committer.commit(list);
            return;
        }
        List<ManifestCommittable> filterRecoveredCommittables = this.committer.filterRecoveredCommittables(list);
        if (filterRecoveredCommittables.isEmpty()) {
            return;
        }
        this.committer.commit(filterRecoveredCommittables);
        throw new RuntimeException("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
    }

    private ManifestCommittable toCommittables(long j, List<Committable> list) throws Exception {
        return this.committer.combine(j, list);
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        pollInputs();
        this.streamingCommitterState.update(committables(this.committablesPerCheckpoint));
    }

    private List<ManifestCommittable> committables(NavigableMap<Long, ManifestCommittable> navigableMap) {
        return new ArrayList(navigableMap.values());
    }

    public void endInput() throws Exception {
        this.endInput = true;
        if (this.streamingCheckpointEnabled) {
            return;
        }
        pollInputs();
        commitUpToCheckpoint(Long.MAX_VALUE);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        commitUpToCheckpoint(this.endInput ? Long.MAX_VALUE : j);
    }

    private void commitUpToCheckpoint(long j) throws Exception {
        NavigableMap<Long, ManifestCommittable> headMap = this.committablesPerCheckpoint.headMap(Long.valueOf(j), true);
        commit(false, committables(headMap));
        headMap.clear();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<Committable> streamRecord) {
        this.output.collect(streamRecord);
        this.inputs.add(streamRecord.getValue());
    }

    public void close() throws Exception {
        this.committablesPerCheckpoint.clear();
        this.inputs.clear();
        super.close();
    }

    private void pollInputs() throws Exception {
        HashMap hashMap = new HashMap();
        for (Committable committable : this.inputs) {
            ((List) hashMap.computeIfAbsent(Long.valueOf(committable.checkpointId()), l -> {
                return new ArrayList();
            })).add(committable);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            Long l2 = (Long) entry.getKey();
            List<Committable> list = (List) entry.getValue();
            if (this.committablesPerCheckpoint.containsKey(l2)) {
                throw new RuntimeException(String.format("Repeatedly commit the same checkpoint files. \nThe previous files is %s, \nand the subsequent files is %s", this.committablesPerCheckpoint.get(l2), list));
            }
            this.committablesPerCheckpoint.put(l2, toCommittables(l2.longValue(), list));
        }
        this.inputs.clear();
    }
}
