/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator;
import org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.ThreadCache;

public final class ProcessorContextImpl
extends AbstractProcessorContext<Object, Object>
implements RecordCollector.Supplier {
    private StreamTask streamTask;
    private RecordCollector collector;
    private final ProcessorStateManager stateManager;
    private final boolean consistencyEnabled;
    final Map<String, ThreadCache.DirtyEntryFlushListener> cacheNameToFlushListener = new HashMap<String, ThreadCache.DirtyEntryFlushListener>();

    public ProcessorContextImpl(TaskId id, StreamsConfig config, ProcessorStateManager stateMgr, StreamsMetricsImpl metrics, ThreadCache cache) {
        super(id, config, metrics, cache);
        this.stateManager = stateMgr;
        this.consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(this.appConfigs(), "__iq.consistency.offset.vector.enabled__", false);
    }

    @Override
    public void transitionToActive(StreamTask streamTask, RecordCollector recordCollector, ThreadCache newCache) {
        if (this.stateManager.taskType() != Task.TaskType.ACTIVE) {
            throw new IllegalStateException("Tried to transition processor context to active but the state manager's type was " + String.valueOf((Object)this.stateManager.taskType()));
        }
        this.streamTask = streamTask;
        this.collector = recordCollector;
        this.cache = newCache;
        this.addAllFlushListenersToNewCache();
    }

    @Override
    public void transitionToStandby(ThreadCache newCache) {
        if (this.stateManager.taskType() != Task.TaskType.STANDBY) {
            throw new IllegalStateException("Tried to transition processor context to standby but the state manager's type was " + String.valueOf((Object)this.stateManager.taskType()));
        }
        this.streamTask = null;
        this.collector = null;
        this.cache = newCache;
        this.addAllFlushListenersToNewCache();
    }

    @Override
    public void registerCacheFlushListener(String namespace, ThreadCache.DirtyEntryFlushListener listener) {
        this.cacheNameToFlushListener.put(namespace, listener);
        this.cache.addDirtyEntryFlushListener(namespace, listener);
    }

    private void addAllFlushListenersToNewCache() {
        for (Map.Entry<String, ThreadCache.DirtyEntryFlushListener> cacheEntry : this.cacheNameToFlushListener.entrySet()) {
            this.cache.addDirtyEntryFlushListener(cacheEntry.getKey(), cacheEntry.getValue());
        }
    }

    @Override
    public ProcessorStateManager stateManager() {
        return this.stateManager;
    }

    @Override
    public RecordCollector recordCollector() {
        return this.collector;
    }

    @Override
    public void logChange(String storeName, Bytes key, byte[] value, long timestamp, Position position) {
        RecordHeaders headers;
        this.throwUnsupportedOperationExceptionIfStandby("logChange");
        TopicPartition changelogPartition = this.stateManager().registeredChangelogPartitionFor(storeName);
        if (!this.consistencyEnabled) {
            headers = null;
        } else {
            headers = new RecordHeaders();
            headers.add((Header)ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
            headers.add((Header)new RecordHeader("c", PositionSerde.serialize(position).array()));
        }
        this.collector.send(changelogPartition.topic(), key, value, (Headers)headers, changelogPartition.partition(), timestamp, BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER, (String)null, (InternalProcessorContext<Void, Void>)null);
    }

    @Override
    public <S extends StateStore> S getStateStore(String name) {
        this.throwUnsupportedOperationExceptionIfStandby("getStateStore");
        if (this.currentNode() == null) {
            throw new StreamsException("Accessing from an unknown node");
        }
        StateStore globalStore = this.stateManager.globalStore(name);
        if (globalStore != null) {
            return (S)AbstractReadOnlyDecorator.getReadOnlyStore(globalStore);
        }
        if (!this.currentNode().stateStores.contains(name)) {
            throw new StreamsException("Processor " + this.currentNode().name() + " has no access to StateStore " + name + " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.processValues()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
        }
        StateStore store = this.stateManager.store(name);
        return (S)AbstractReadWriteDecorator.wrapWithReadWriteStore(store);
    }

    @Override
    public <K, V> void forward(K key, V value) {
        Record<K, V> toForward = new Record<K, V>(key, value, this.timestamp(), this.headers());
        this.forward(toForward);
    }

    @Override
    public <K, V> void forward(K key, V value, To to) {
        ToInternal toInternal = new ToInternal(to);
        Record<K, V> toForward = new Record<K, V>(key, value, toInternal.hasTimestamp() ? toInternal.timestamp() : this.timestamp(), this.headers());
        this.forward(toForward, toInternal.child());
    }

    @Override
    public <K, V> void forward(FixedKeyRecord<K, V> record) {
        this.forward(new Record<K, V>(record.key(), record.value(), record.timestamp(), record.headers()));
    }

    @Override
    public <K, V> void forward(FixedKeyRecord<K, V> record, String childName) {
        this.forward(new Record<K, V>(record.key(), record.value(), record.timestamp(), record.headers()), childName);
    }

    @Override
    public <K, V> void forward(Record<K, V> record) {
        this.forward(record, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <K, V> void forward(Record<K, V> record, String childName) {
        this.throwUnsupportedOperationExceptionIfStandby("forward");
        ProcessorNode<?, ?, ?, ?> previousNode = this.currentNode();
        if (previousNode == null) {
            throw new StreamsException("Current node is unknown. This can happen if 'forward()' is called in an illegal scope. The root cause could be that a 'Processor' instance is shared. To avoid this error, make sure that your suppliers return new instances each time 'get()' of Supplier is called and do not return the same object reference multiple times.");
        }
        ProcessorRecordContext previousContext = this.recordContext;
        try {
            if (this.recordContext != null && (record.timestamp() != this.timestamp() || record.headers() != this.headers())) {
                this.recordContext = new ProcessorRecordContext(record.timestamp(), this.recordContext.offset(), this.recordContext.partition(), this.recordContext.topic(), record.headers(), this.recordContext.sourceRawKey(), this.recordContext.sourceRawValue());
            }
            if (childName == null) {
                List<ProcessorNode<?, ?, ?, ?>> children = this.currentNode().children();
                for (ProcessorNode<?, ?, ?, ?> child : children) {
                    this.forwardInternal(child, record);
                }
            } else {
                ProcessorNode<?, ?, ?, ?> child = this.currentNode().child(childName);
                if (child == null) {
                    throw new StreamsException("Unknown downstream node: " + childName + " either does not exist or is not connected to this processor.");
                }
                this.forwardInternal(child, record);
            }
        }
        finally {
            this.recordContext = previousContext;
            this.setCurrentNode(previousNode);
        }
    }

    private <K, V> void forwardInternal(ProcessorNode<K, V, ?, ?> child, Record<K, V> record) {
        this.setCurrentNode(child);
        child.process(record);
        if (child.isTerminalNode()) {
            this.streamTask.maybeRecordE2ELatency(record.timestamp(), this.currentSystemTimeMs(), child.name());
        }
    }

    @Override
    public void commit() {
        this.throwUnsupportedOperationExceptionIfStandby("commit");
        this.streamTask.requestCommit();
    }

    @Override
    public Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback) throws IllegalArgumentException {
        this.throwUnsupportedOperationExceptionIfStandby("schedule");
        String msgPrefix = ApiUtils.prepareMillisCheckFailMsgPrefix(interval, "interval");
        long intervalMs = ApiUtils.validateMillisecondDuration(interval, msgPrefix);
        if (intervalMs < 1L) {
            throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
        }
        return this.streamTask.schedule(intervalMs, type, callback);
    }

    @Override
    public String topic() {
        this.throwUnsupportedOperationExceptionIfStandby("topic");
        return super.topic();
    }

    @Override
    public int partition() {
        this.throwUnsupportedOperationExceptionIfStandby("partition");
        return super.partition();
    }

    @Override
    public long offset() {
        this.throwUnsupportedOperationExceptionIfStandby("offset");
        return super.offset();
    }

    @Override
    public long timestamp() {
        this.throwUnsupportedOperationExceptionIfStandby("timestamp");
        return super.timestamp();
    }

    @Override
    public long currentStreamTimeMs() {
        return this.streamTask.streamTime();
    }

    @Override
    public ProcessorNode<?, ?, ?, ?> currentNode() {
        this.throwUnsupportedOperationExceptionIfStandby("currentNode");
        return super.currentNode();
    }

    @Override
    public void setRecordContext(ProcessorRecordContext recordContext) {
        this.throwUnsupportedOperationExceptionIfStandby("setRecordContext");
        super.setRecordContext(recordContext);
    }

    @Override
    public ProcessorRecordContext recordContext() {
        this.throwUnsupportedOperationExceptionIfStandby("recordContext");
        return super.recordContext();
    }

    private void throwUnsupportedOperationExceptionIfStandby(String operationName) {
        if (this.taskType() == Task.TaskType.STANDBY) {
            throw new UnsupportedOperationException("this should not happen: " + operationName + "() is not supported in standby tasks.");
        }
    }
}

