package org.elasticsearch.xpack.ml.inference.pytorch.process;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Instant;
import java.util.Iterator;
import java.util.LongSummaryStatistics;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.ml.inference.pytorch.results.ErrorResult;
import org.elasticsearch.xpack.ml.inference.pytorch.results.PyTorchInferenceResult;
import org.elasticsearch.xpack.ml.inference.pytorch.results.PyTorchResult;
import org.elasticsearch.xpack.ml.inference.pytorch.results.ThreadSettings;

/* loaded from: input_file:org/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor.class */
public class PyTorchResultProcessor {
    private static final Logger logger;
    static long REPORTING_PERIOD_MS;
    private final ConcurrentMap<String, PendingResult> pendingResults;
    private final String deploymentId;
    private final Consumer<ThreadSettings> threadSettingsConsumer;
    private volatile boolean isStopping;
    private final LongSummaryStatistics timingStats;
    private int errorCount;
    private long cacheHitCount;
    private long peakThroughput;
    private LongSummaryStatistics lastPeriodSummaryStats;
    private long lastPeriodCacheHitCount;
    private RecentStats lastPeriodStats;
    private long currentPeriodEndTimeMs;
    private long lastResultTimeMs;
    private final long startTime;
    private final LongSupplier currentTimeMsSupplier;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$PendingResult.class */
    public static class PendingResult {
        public final ActionListener<PyTorchResult> listener;

        public PendingResult(ActionListener<PyTorchResult> actionListener) {
            this.listener = (ActionListener) Objects.requireNonNull(actionListener);
        }
    }

    /* loaded from: input_file:org/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats.class */
    public static final class RecentStats extends Record {
        private final long requestsProcessed;
        private final Double avgInferenceTime;
        private final long cacheHitCount;

        public RecentStats(long j, Double d, long j2) {
            this.requestsProcessed = j;
            this.avgInferenceTime = d;
            this.cacheHitCount = j2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RecentStats.class), RecentStats.class, "requestsProcessed;avgInferenceTime;cacheHitCount", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->requestsProcessed:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->avgInferenceTime:Ljava/lang/Double;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->cacheHitCount:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RecentStats.class), RecentStats.class, "requestsProcessed;avgInferenceTime;cacheHitCount", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->requestsProcessed:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->avgInferenceTime:Ljava/lang/Double;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->cacheHitCount:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RecentStats.class, Object.class), RecentStats.class, "requestsProcessed;avgInferenceTime;cacheHitCount", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->requestsProcessed:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->avgInferenceTime:Ljava/lang/Double;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;->cacheHitCount:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public long requestsProcessed() {
            return this.requestsProcessed;
        }

        public Double avgInferenceTime() {
            return this.avgInferenceTime;
        }

        public long cacheHitCount() {
            return this.cacheHitCount;
        }
    }

    /* loaded from: input_file:org/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats.class */
    public static final class ResultStats extends Record {
        private final LongSummaryStatistics timingStats;
        private final int errorCount;
        private final long cacheHitCount;
        private final int numberOfPendingResults;
        private final Instant lastUsed;
        private final long peakThroughput;
        private final RecentStats recentStats;

        public ResultStats(LongSummaryStatistics longSummaryStatistics, int i, long j, int i2, Instant instant, long j2, RecentStats recentStats) {
            this.timingStats = longSummaryStatistics;
            this.errorCount = i;
            this.cacheHitCount = j;
            this.numberOfPendingResults = i2;
            this.lastUsed = instant;
            this.peakThroughput = j2;
            this.recentStats = recentStats;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ResultStats.class), ResultStats.class, "timingStats;errorCount;cacheHitCount;numberOfPendingResults;lastUsed;peakThroughput;recentStats", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->timingStats:Ljava/util/LongSummaryStatistics;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->errorCount:I", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->cacheHitCount:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->numberOfPendingResults:I", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->lastUsed:Ljava/time/Instant;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->peakThroughput:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->recentStats:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ResultStats.class), ResultStats.class, "timingStats;errorCount;cacheHitCount;numberOfPendingResults;lastUsed;peakThroughput;recentStats", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->timingStats:Ljava/util/LongSummaryStatistics;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->errorCount:I", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->cacheHitCount:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->numberOfPendingResults:I", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->lastUsed:Ljava/time/Instant;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->peakThroughput:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->recentStats:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ResultStats.class, Object.class), ResultStats.class, "timingStats;errorCount;cacheHitCount;numberOfPendingResults;lastUsed;peakThroughput;recentStats", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->timingStats:Ljava/util/LongSummaryStatistics;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->errorCount:I", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->cacheHitCount:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->numberOfPendingResults:I", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->lastUsed:Ljava/time/Instant;", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->peakThroughput:J", "FIELD:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$ResultStats;->recentStats:Lorg/elasticsearch/xpack/ml/inference/pytorch/process/PyTorchResultProcessor$RecentStats;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public LongSummaryStatistics timingStats() {
            return this.timingStats;
        }

        public int errorCount() {
            return this.errorCount;
        }

        public long cacheHitCount() {
            return this.cacheHitCount;
        }

        public int numberOfPendingResults() {
            return this.numberOfPendingResults;
        }

        public Instant lastUsed() {
            return this.lastUsed;
        }

        public long peakThroughput() {
            return this.peakThroughput;
        }

        public RecentStats recentStats() {
            return this.recentStats;
        }
    }

    public PyTorchResultProcessor(String str, Consumer<ThreadSettings> consumer) {
        this(str, consumer, System::currentTimeMillis);
    }

    PyTorchResultProcessor(String str, Consumer<ThreadSettings> consumer, LongSupplier longSupplier) {
        this.pendingResults = new ConcurrentHashMap();
        this.deploymentId = (String) Objects.requireNonNull(str);
        this.timingStats = new LongSummaryStatistics();
        this.lastPeriodSummaryStats = new LongSummaryStatistics();
        this.threadSettingsConsumer = (Consumer) Objects.requireNonNull(consumer);
        this.currentTimeMsSupplier = longSupplier;
        this.startTime = longSupplier.getAsLong();
        this.currentPeriodEndTimeMs = this.startTime + REPORTING_PERIOD_MS;
    }

    public void registerRequest(String str, ActionListener<PyTorchResult> actionListener) {
        this.pendingResults.computeIfAbsent(str, str2 -> {
            return new PendingResult(actionListener);
        });
    }

    public void ignoreResponseWithoutNotifying(String str) {
        this.pendingResults.remove(str);
    }

    public void process(PyTorchProcess pyTorchProcess) {
        try {
            try {
                Iterator<PyTorchResult> readResults = pyTorchProcess.readResults();
                while (readResults.hasNext()) {
                    PyTorchResult next = readResults.next();
                    if (next.inferenceResult() != null) {
                        processInferenceResult(next);
                    }
                    ThreadSettings threadSettings = next.threadSettings();
                    if (threadSettings != null) {
                        this.threadSettingsConsumer.accept(threadSettings);
                        processThreadSettings(next);
                    }
                    if (next.errorResult() != null) {
                        processErrorResult(next);
                    }
                }
                this.pendingResults.forEach((str, pendingResult) -> {
                    pendingResult.listener.onResponse(new PyTorchResult(null, null, new ErrorResult(str, "inference canceled as process is stopping")));
                });
                this.pendingResults.clear();
            } catch (Exception e) {
                if (!this.isStopping) {
                    logger.error(() -> {
                        return "[" + this.deploymentId + "] Error processing results";
                    }, e);
                }
                this.pendingResults.forEach((str2, pendingResult2) -> {
                    pendingResult2.listener.onResponse(new PyTorchResult(null, null, new ErrorResult(str2, this.isStopping ? "inference canceled as process is stopping" : "inference native process died unexpectedly with failure [" + e.getMessage() + "]")));
                });
                this.pendingResults.clear();
                this.pendingResults.forEach((str3, pendingResult3) -> {
                    pendingResult3.listener.onResponse(new PyTorchResult(null, null, new ErrorResult(str3, "inference canceled as process is stopping")));
                });
                this.pendingResults.clear();
            }
            logger.debug(() -> {
                return "[" + this.deploymentId + "] Results processing finished";
            });
        } catch (Throwable th) {
            this.pendingResults.forEach((str32, pendingResult32) -> {
                pendingResult32.listener.onResponse(new PyTorchResult(null, null, new ErrorResult(str32, "inference canceled as process is stopping")));
            });
            this.pendingResults.clear();
            throw th;
        }
    }

    void processInferenceResult(PyTorchResult pyTorchResult) {
        PyTorchInferenceResult inferenceResult = pyTorchResult.inferenceResult();
        if (!$assertionsDisabled && inferenceResult == null) {
            throw new AssertionError();
        }
        logger.trace(() -> {
            return Strings.format("[%s] Parsed result with id [%s]", new Object[]{this.deploymentId, inferenceResult.getRequestId()});
        });
        processResult(inferenceResult);
        PendingResult remove = this.pendingResults.remove(inferenceResult.getRequestId());
        if (remove == null) {
            logger.debug(() -> {
                return Strings.format("[%s] no pending result for [%s]", new Object[]{this.deploymentId, inferenceResult.getRequestId()});
            });
        } else {
            remove.listener.onResponse(pyTorchResult);
        }
    }

    void processThreadSettings(PyTorchResult pyTorchResult) {
        ThreadSettings threadSettings = pyTorchResult.threadSettings();
        if (!$assertionsDisabled && threadSettings == null) {
            throw new AssertionError();
        }
        logger.trace(() -> {
            return Strings.format("[%s] Parsed result with id [%s]", new Object[]{this.deploymentId, threadSettings.requestId()});
        });
        PendingResult remove = this.pendingResults.remove(threadSettings.requestId());
        if (remove == null) {
            logger.debug(() -> {
                return Strings.format("[%s] no pending result for [%s]", new Object[]{this.deploymentId, threadSettings.requestId()});
            });
        } else {
            remove.listener.onResponse(pyTorchResult);
        }
    }

    void processErrorResult(PyTorchResult pyTorchResult) {
        ErrorResult errorResult = pyTorchResult.errorResult();
        if (!$assertionsDisabled && errorResult == null) {
            throw new AssertionError();
        }
        this.errorCount++;
        logger.trace(() -> {
            return Strings.format("[%s] Parsed error with id [%s]", new Object[]{this.deploymentId, errorResult.requestId()});
        });
        PendingResult remove = this.pendingResults.remove(errorResult.requestId());
        if (remove == null) {
            logger.debug(() -> {
                return Strings.format("[%s] no pending result for [%s]", new Object[]{this.deploymentId, errorResult.requestId()});
            });
        } else {
            remove.listener.onResponse(pyTorchResult);
        }
    }

    public synchronized ResultStats getResultStats() {
        long alignToFloor = this.startTime + Intervals.alignToFloor(this.currentTimeMsSupplier.getAsLong() - this.startTime, REPORTING_PERIOD_MS);
        RecentStats recentStats = null;
        if (this.lastResultTimeMs >= alignToFloor) {
            recentStats = this.lastPeriodStats;
        } else if (this.lastResultTimeMs >= alignToFloor - REPORTING_PERIOD_MS) {
            recentStats = new RecentStats(this.lastPeriodSummaryStats.getCount(), Double.valueOf(this.lastPeriodSummaryStats.getAverage()), this.lastPeriodCacheHitCount);
            this.peakThroughput = Math.max(this.peakThroughput, this.lastPeriodSummaryStats.getCount());
        }
        if (recentStats == null) {
            recentStats = new RecentStats(0L, null, 0L);
        }
        return new ResultStats(new LongSummaryStatistics(this.timingStats.getCount(), this.timingStats.getMin(), this.timingStats.getMax(), this.timingStats.getSum()), this.errorCount, this.cacheHitCount, this.pendingResults.size(), this.lastResultTimeMs > 0 ? Instant.ofEpochMilli(this.lastResultTimeMs) : null, this.peakThroughput, recentStats);
    }

    private synchronized void processResult(PyTorchInferenceResult pyTorchInferenceResult) {
        this.timingStats.accept(pyTorchInferenceResult.getTimeMs());
        this.lastResultTimeMs = this.currentTimeMsSupplier.getAsLong();
        if (this.lastResultTimeMs > this.currentPeriodEndTimeMs) {
            this.peakThroughput = Math.max(this.peakThroughput, this.lastPeriodSummaryStats.getCount());
            if (this.lastResultTimeMs > this.currentPeriodEndTimeMs + REPORTING_PERIOD_MS) {
                this.lastPeriodStats = null;
            } else {
                this.lastPeriodStats = new RecentStats(this.lastPeriodSummaryStats.getCount(), Double.valueOf(this.lastPeriodSummaryStats.getAverage()), this.lastPeriodCacheHitCount);
            }
            this.lastPeriodCacheHitCount = 0L;
            this.lastPeriodSummaryStats = new LongSummaryStatistics();
            this.lastPeriodSummaryStats.accept(pyTorchInferenceResult.getTimeMs());
            this.currentPeriodEndTimeMs = this.startTime + Intervals.alignToCeil(this.lastResultTimeMs - this.startTime, REPORTING_PERIOD_MS);
        } else {
            this.lastPeriodSummaryStats.accept(pyTorchInferenceResult.getTimeMs());
        }
        if (pyTorchInferenceResult.isCacheHit()) {
            this.cacheHitCount++;
            this.lastPeriodCacheHitCount++;
        }
    }

    public void stop() {
        this.isStopping = true;
    }

    static {
        $assertionsDisabled = !PyTorchResultProcessor.class.desiredAssertionStatus();
        logger = LogManager.getLogger(PyTorchResultProcessor.class);
        REPORTING_PERIOD_MS = TimeValue.timeValueMinutes(1L).millis();
    }
}
