package org.elasticsearch.xpack.ml.datafeed;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/DatafeedJob.class */
public class DatafeedJob {
    private static final Logger LOGGER = LogManager.getLogger(DatafeedJob.class);
    private static final int NEXT_TASK_DELAY_MS = 100;
    private final AnomalyDetectionAuditor auditor;
    private final AnnotationPersister annotationPersister;
    private final String jobId;
    private final DataDescription dataDescription;
    private final long frequencyMs;
    private final long queryDelayMs;
    private final Client client;
    private final DataExtractorFactory dataExtractorFactory;
    private final DatafeedTimingStatsReporter timingStatsReporter;
    private final Supplier<Long> currentTimeSupplier;
    private final DelayedDataDetector delayedDataDetector;
    private final Integer maxEmptySearches;
    private final long delayedDataCheckFreq;
    private volatile long lookbackStartTimeMs;
    private volatile long latestFinalBucketEndTimeMs;
    private volatile long lastDataCheckTimeMs;
    private volatile Tuple<String, Annotation> lastDataCheckAnnotationWithId;
    private volatile Long lastEndTimeMs;
    private AtomicBoolean running = new AtomicBoolean(true);
    private volatile boolean isIsolated;
    private volatile boolean haveEverSeenData;
    private volatile long consecutiveDelayedDataBuckets;
    private volatile SearchInterval searchInterval;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/DatafeedJob$AnalysisProblemException.class */
    public static class AnalysisProblemException extends ElasticsearchException implements ElasticsearchWrapperException {
        final boolean shouldStop;
        final long nextDelayInMsSinceEpoch;

        AnalysisProblemException(long j, boolean z, Throwable th) {
            super(th);
            this.shouldStop = z;
            this.nextDelayInMsSinceEpoch = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/DatafeedJob$EmptyDataCountException.class */
    public static class EmptyDataCountException extends RuntimeException {
        final long nextDelayInMsSinceEpoch;
        final boolean haveEverSeenData;

        EmptyDataCountException(long j, boolean z) {
            this.nextDelayInMsSinceEpoch = j;
            this.haveEverSeenData = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/DatafeedJob$ExtractionProblemException.class */
    public static class ExtractionProblemException extends ElasticsearchException implements ElasticsearchWrapperException {
        final long nextDelayInMsSinceEpoch;

        ExtractionProblemException(long j, Throwable th) {
            super(th);
            this.nextDelayInMsSinceEpoch = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DatafeedJob(String str, DataDescription dataDescription, long j, long j2, DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter datafeedTimingStatsReporter, Client client, AnomalyDetectionAuditor anomalyDetectionAuditor, AnnotationPersister annotationPersister, Supplier<Long> supplier, DelayedDataDetector delayedDataDetector, Integer num, long j3, long j4, boolean z, long j5) {
        this.jobId = str;
        this.dataDescription = (DataDescription) Objects.requireNonNull(dataDescription);
        this.frequencyMs = j;
        this.queryDelayMs = j2;
        this.dataExtractorFactory = dataExtractorFactory;
        this.timingStatsReporter = datafeedTimingStatsReporter;
        this.client = client;
        this.auditor = anomalyDetectionAuditor;
        this.annotationPersister = annotationPersister;
        this.currentTimeSupplier = supplier;
        this.delayedDataDetector = delayedDataDetector;
        this.maxEmptySearches = num;
        this.latestFinalBucketEndTimeMs = j3;
        long max = Math.max(j3, j4);
        if (max > 0) {
            this.lastEndTimeMs = Long.valueOf(max);
        }
        this.haveEverSeenData = z;
        this.delayedDataCheckFreq = j5;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void isolate() {
        this.isIsolated = true;
        this.timingStatsReporter.disallowPersisting();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isIsolated() {
        return this.isIsolated;
    }

    public String getJobId() {
        return this.jobId;
    }

    public Integer getMaxEmptySearches() {
        return this.maxEmptySearches;
    }

    public void finishReportingTimingStats() {
        this.timingStatsReporter.finishReporting();
    }

    @Nullable
    public SearchInterval getSearchInterval() {
        return this.searchInterval;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long runLookBack(long j, Long l) throws Exception {
        this.lookbackStartTimeMs = skipToStartTime(j);
        Optional ofNullable = Optional.ofNullable(l);
        long longValue = ((Long) ofNullable.orElse(Long.valueOf(this.currentTimeSupplier.get().longValue() - this.queryDelayMs))).longValue();
        boolean isPresent = ofNullable.isPresent();
        if (longValue <= this.lookbackStartTimeMs) {
            if (isPresent) {
                return null;
            }
            this.auditor.info(this.jobId, Messages.getMessage("Datafeed started in real-time"));
            return Long.valueOf(nextRealtimeTimestamp());
        }
        Object[] objArr = new Object[3];
        objArr[0] = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(this.lookbackStartTimeMs);
        objArr[1] = l == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(longValue);
        objArr[2] = TimeValue.timeValueMillis(this.frequencyMs).getStringRep();
        String message = Messages.getMessage("Datafeed started (from: {0} to: {1}) with frequency [{2}]", objArr);
        this.auditor.info(this.jobId, message);
        LOGGER.info("[{}] {}", this.jobId, message);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setCalcInterim(true);
        run(this.lookbackStartTimeMs, longValue, request);
        if (shouldPersistAfterLookback(isPresent)) {
            sendPersistRequest();
        }
        if (!isRunning() || this.isIsolated) {
            if (this.isIsolated) {
                return null;
            }
            LOGGER.debug("[{}] Lookback finished after being stopped", this.jobId);
            return null;
        }
        this.auditor.info(this.jobId, Messages.getMessage("Datafeed lookback completed"));
        LOGGER.info("[{}] Lookback has finished", this.jobId);
        if (isPresent) {
            return null;
        }
        this.auditor.info(this.jobId, Messages.getMessage("Datafeed continued in real-time"));
        return Long.valueOf(nextRealtimeTimestamp());
    }

    private long skipToStartTime(long j) {
        if (this.lastEndTimeMs == null) {
            return j;
        }
        if (this.lastEndTimeMs.longValue() + 1 > j) {
            return this.lastEndTimeMs.longValue() + 1;
        }
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setSkipTime(String.valueOf(j));
        FlushJobAction.Response flushJob = flushJob(request);
        LOGGER.info("[{}] Skipped to time [{}]", this.jobId, Long.valueOf(flushJob.getLastFinalizedBucketEnd().toEpochMilli()));
        return flushJob.getLastFinalizedBucketEnd().toEpochMilli();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long runRealtime() throws Exception {
        long max = this.lastEndTimeMs == null ? this.lookbackStartTimeMs : Math.max(this.lookbackStartTimeMs, this.lastEndTimeMs.longValue() + 1);
        long intervalStartEpochMs = toIntervalStartEpochMs(this.currentTimeSupplier.get().longValue() - this.queryDelayMs);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setWaitForNormalization(false);
        request.setCalcInterim(true);
        request.setAdvanceTime(String.valueOf(intervalStartEpochMs));
        run(max, intervalStartEpochMs, request);
        checkForMissingDataIfNecessary();
        return nextRealtimeTimestamp();
    }

    private void checkForMissingDataIfNecessary() {
        if (isRunning() && !this.isIsolated && checkForMissingDataTriggered()) {
            this.lastDataCheckTimeMs = this.currentTimeSupplier.get().longValue();
            List<DelayedDataDetectorFactory.BucketWithMissingData> detectMissingData = this.delayedDataDetector.detectMissingData(this.latestFinalBucketEndTimeMs);
            if (detectMissingData.isEmpty()) {
                return;
            }
            long sum = detectMissingData.stream().mapToLong((v0) -> {
                return v0.getMissingDocumentCount();
            }).sum();
            Bucket bucket = detectMissingData.get(detectMissingData.size() - 1).getBucket();
            Date date = new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000);
            String message = Messages.getMessage("Datafeed has missed {0} documents due to ingest latency, latest bucket with missing data is [{1}]. Consider increasing query_delay", new Object[]{Long.valueOf(sum), XContentElasticsearchExtension.DEFAULT_FORMATTER.format(bucket.getTimestamp().toInstant())});
            Annotation createDelayedDataAnnotation = createDelayedDataAnnotation(detectMissingData.get(0).getBucket().getTimestamp(), date, message);
            if (this.lastDataCheckAnnotationWithId != null && createDelayedDataAnnotation.getAnnotation().equals(((Annotation) this.lastDataCheckAnnotationWithId.v2()).getAnnotation()) && createDelayedDataAnnotation.getTimestamp().equals(((Annotation) this.lastDataCheckAnnotationWithId.v2()).getTimestamp()) && createDelayedDataAnnotation.getEndTimestamp().equals(((Annotation) this.lastDataCheckAnnotationWithId.v2()).getEndTimestamp())) {
                return;
            }
            if (this.lastDataCheckAnnotationWithId == null) {
                this.consecutiveDelayedDataBuckets = 0L;
            } else if (bucket.getEpoch() * 1000 <= ((Annotation) this.lastDataCheckAnnotationWithId.v2()).getEndTimestamp().getTime() + 1) {
                this.consecutiveDelayedDataBuckets++;
            } else {
                this.consecutiveDelayedDataBuckets = 0L;
            }
            if (shouldWriteDelayedDataAudit()) {
                this.auditor.warning(this.jobId, message);
            }
            if (this.lastDataCheckAnnotationWithId == null) {
                this.lastDataCheckAnnotationWithId = this.annotationPersister.persistAnnotation(null, createDelayedDataAnnotation);
            } else {
                this.lastDataCheckAnnotationWithId = this.annotationPersister.persistAnnotation((String) this.lastDataCheckAnnotationWithId.v1(), updateAnnotation(createDelayedDataAnnotation));
            }
        }
    }

    private boolean shouldWriteDelayedDataAudit() {
        if (this.consecutiveDelayedDataBuckets < 3) {
            return true;
        }
        return this.consecutiveDelayedDataBuckets < 100 ? this.consecutiveDelayedDataBuckets % 10 == 0 : this.consecutiveDelayedDataBuckets % 100 == 0;
    }

    private Annotation createDelayedDataAnnotation(Date date, Date date2, String str) {
        Date date3 = new Date(this.currentTimeSupplier.get().longValue());
        return new Annotation.Builder().setAnnotation(str).setCreateTime(date3).setCreateUsername("_xpack").setTimestamp(date).setEndTimestamp(date2).setJobId(this.jobId).setModifiedTime(date3).setModifiedUsername("_xpack").setType(Annotation.Type.ANNOTATION).setEvent(Annotation.Event.DELAYED_DATA).build();
    }

    private Annotation updateAnnotation(Annotation annotation) {
        return new Annotation.Builder((Annotation) this.lastDataCheckAnnotationWithId.v2()).setAnnotation(annotation.getAnnotation()).setTimestamp(annotation.getTimestamp()).setEndTimestamp(annotation.getEndTimestamp()).setModifiedTime(new Date(this.currentTimeSupplier.get().longValue())).setModifiedUsername("_xpack").build();
    }

    private boolean checkForMissingDataTriggered() {
        return this.currentTimeSupplier.get().longValue() > this.lastDataCheckTimeMs + Math.min(this.delayedDataCheckFreq, this.delayedDataDetector.getWindow());
    }

    public boolean stop() {
        return this.running.compareAndSet(true, false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v25, types: [java.lang.Throwable] */
    private void run(long j, long j2, FlushJobAction.Request request) throws IOException {
        Instant lastFinalizedBucketEnd;
        if (j2 <= j) {
            return;
        }
        LOGGER.trace("[{}] Searching data in: [{}, {})", this.jobId, Long.valueOf(j), Long.valueOf(j2));
        AnalysisProblemException analysisProblemException = null;
        long j3 = 0;
        DataExtractor newExtractor = this.dataExtractorFactory.newExtractor(j, j2);
        while (newExtractor.hasNext()) {
            if ((this.isIsolated || !isRunning()) && !newExtractor.isCancelled()) {
                newExtractor.cancel();
            }
            if (this.isIsolated) {
                return;
            }
            try {
                DataExtractor.Result next = newExtractor.next();
                Optional data = next.data();
                this.searchInterval = next.searchInterval();
                if (this.isIsolated) {
                    return;
                }
                if (data.isPresent()) {
                    try {
                        InputStream inputStream = (InputStream) data.get();
                        try {
                            DataCounts postData = postData(inputStream, XContentType.JSON);
                            LOGGER.trace(() -> {
                                return Strings.format("[%s] Processed another %s records with latest timestamp [%s]", new Object[]{this.jobId, Long.valueOf(postData.getProcessedRecordCount()), postData.getLatestRecordTimeStamp()});
                            });
                            this.timingStatsReporter.reportDataCounts(postData);
                            if (inputStream != null) {
                                inputStream.close();
                            }
                            j3 += postData.getProcessedRecordCount();
                            this.haveEverSeenData |= j3 > 0;
                            if (postData.getLatestRecordTimeStamp() != null) {
                                this.lastEndTimeMs = Long.valueOf(postData.getLatestRecordTimeStamp().getTime());
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        if (e instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        if (this.isIsolated) {
                            return;
                        }
                        LOGGER.error(() -> {
                            return "[" + this.jobId + "] error while posting data";
                        }, e);
                        analysisProblemException = new AnalysisProblemException(nextRealtimeTimestamp(), isConflictException(e), e);
                    }
                }
            } catch (Exception e2) {
                LOGGER.error(() -> {
                    return "[" + this.jobId + "] error while extracting data";
                }, e2);
                if (!e2.toString().contains("doc values")) {
                    throw new ExtractionProblemException(nextRealtimeTimestamp(), e2);
                }
                throw new ExtractionProblemException(nextRealtimeTimestamp(), new IllegalArgumentException("One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds using aggregations"));
            }
        }
        this.lastEndTimeMs = Long.valueOf(Math.max(this.lastEndTimeMs == null ? 0L : this.lastEndTimeMs.longValue(), newExtractor.getEndTime() - 1));
        LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", this.jobId, analysisProblemException, Long.valueOf(j3), this.lastEndTimeMs, Boolean.valueOf(isRunning()), Boolean.valueOf(newExtractor.isCancelled()));
        if (analysisProblemException != null) {
            throw analysisProblemException;
        }
        if (isRunning() && !this.isIsolated && (lastFinalizedBucketEnd = flushJob(request).getLastFinalizedBucketEnd()) != null) {
            this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.toEpochMilli();
        }
        if (j3 == 0) {
            throw new EmptyDataCountException(nextRealtimeTimestamp(), this.haveEverSeenData);
        }
    }

    private DataCounts postData(InputStream inputStream, XContentType xContentType) throws IOException {
        PostDataAction.Request request = new PostDataAction.Request(this.jobId);
        request.setDataDescription(this.dataDescription);
        request.setContent(Streams.readFully(inputStream), xContentType);
        ThreadContext.StoredContext stashWithOrigin = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
        try {
            DataCounts dataCounts = ((PostDataAction.Response) this.client.execute(PostDataAction.INSTANCE, request).actionGet()).getDataCounts();
            if (stashWithOrigin != null) {
                stashWithOrigin.close();
            }
            return dataCounts;
        } catch (Throwable th) {
            if (stashWithOrigin != null) {
                try {
                    stashWithOrigin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private boolean isConflictException(Exception exc) {
        return (exc instanceof ElasticsearchStatusException) && ((ElasticsearchStatusException) exc).status() == RestStatus.CONFLICT;
    }

    private long nextRealtimeTimestamp() {
        return toIntervalStartEpochMs(this.currentTimeSupplier.get().longValue() + this.frequencyMs) + (this.queryDelayMs % this.frequencyMs) + 100;
    }

    private long toIntervalStartEpochMs(long j) {
        return (j / this.frequencyMs) * this.frequencyMs;
    }

    private FlushJobAction.Response flushJob(FlushJobAction.Request request) {
        try {
            LOGGER.trace("[" + this.jobId + "] Sending flush request");
            ThreadContext.StoredContext stashWithOrigin = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
            try {
                FlushJobAction.Response response = (FlushJobAction.Response) this.client.execute(FlushJobAction.INSTANCE, request).actionGet();
                if (stashWithOrigin != null) {
                    stashWithOrigin.close();
                }
                return response;
            } finally {
            }
        } catch (Exception e) {
            LOGGER.debug("[" + this.jobId + "] error while flushing job", e);
            throw new AnalysisProblemException(nextRealtimeTimestamp(), isConflictException(e), e);
        }
    }

    private boolean shouldPersistAfterLookback(boolean z) {
        return (z || this.isIsolated || !isRunning()) ? false : true;
    }

    private void sendPersistRequest() {
        try {
            LOGGER.trace("[" + this.jobId + "] Sending persist request");
            ThreadContext.StoredContext stashWithOrigin = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
            try {
                this.client.execute(PersistJobAction.INSTANCE, new PersistJobAction.Request(this.jobId));
                if (stashWithOrigin != null) {
                    stashWithOrigin.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.debug("[" + this.jobId + "] error while persisting job", e);
        }
    }

    Long lastEndTimeMs() {
        return this.lastEndTimeMs;
    }
}
