package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;

import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;

/* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.class */
public class DatafeedDelayedDataDetector implements DelayedDataDetector {
    private static final String DATE_BUCKETS = "date_buckets";
    private final long bucketSpan;
    private final long window;
    private final Client client;
    private final String timeField;
    private final String jobId;
    private final QueryBuilder datafeedQuery;
    private final String[] datafeedIndices;
    private final IndicesOptions indicesOptions;
    private final Map<String, Object> runtimeMappings;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DatafeedDelayedDataDetector(long j, long j2, String str, String str2, QueryBuilder queryBuilder, String[] strArr, IndicesOptions indicesOptions, Map<String, Object> map, Client client) {
        this.bucketSpan = j;
        this.window = j2;
        this.jobId = str;
        this.timeField = str2;
        this.datafeedQuery = queryBuilder;
        this.datafeedIndices = strArr;
        this.indicesOptions = (IndicesOptions) Objects.requireNonNull(indicesOptions);
        this.runtimeMappings = (Map) Objects.requireNonNull(map);
        this.client = client;
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector
    public List<DelayedDataDetectorFactory.BucketWithMissingData> detectMissingData(long j) {
        long alignToFloor = Intervals.alignToFloor(j, this.bucketSpan);
        long alignToFloor2 = Intervals.alignToFloor(j - this.window, this.bucketSpan);
        if (alignToFloor <= alignToFloor2) {
            return Collections.emptyList();
        }
        List<Bucket> checkBucketEvents = checkBucketEvents(alignToFloor2, alignToFloor);
        Map<Long, Long> checkCurrentBucketEventCount = checkCurrentBucketEventCount(alignToFloor2, alignToFloor);
        return (List) checkBucketEvents.stream().filter(bucket -> {
            return calculateMissing(checkCurrentBucketEventCount, bucket) > 0;
        }).map(bucket2 -> {
            return DelayedDataDetectorFactory.BucketWithMissingData.fromMissingAndBucket(calculateMissing(checkCurrentBucketEventCount, bucket2), bucket2);
        }).collect(Collectors.toList());
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector
    public long getWindow() {
        return this.window;
    }

    private List<Bucket> checkBucketEvents(long j, long j2) {
        GetBucketsAction.Request request = new GetBucketsAction.Request(this.jobId);
        request.setStart(Long.toString(j));
        request.setEnd(Long.toString(j2));
        request.setSort("timestamp");
        request.setDescending(false);
        request.setExcludeInterim(true);
        request.setPageParams(new PageParams(0, (int) ((j2 - j) / this.bucketSpan)));
        ThreadContext.StoredContext stashWithOrigin = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
        try {
            List<Bucket> results = ((GetBucketsAction.Response) this.client.execute(GetBucketsAction.INSTANCE, request).actionGet()).getBuckets().results();
            if (stashWithOrigin != null) {
                stashWithOrigin.close();
            }
            return results;
        } catch (Throwable th) {
            if (stashWithOrigin != null) {
                try {
                    stashWithOrigin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Map<Long, Long> checkCurrentBucketEventCount(long j, long j2) {
        SearchRequest indicesOptions = new SearchRequest(this.datafeedIndices).source(new SearchSourceBuilder().size(0).aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).fixedInterval(new DateHistogramInterval(this.bucketSpan + "ms")).field(this.timeField)).query(ExtractorUtils.wrapInTimeRangeQuery(this.datafeedQuery, this.timeField, j, j2)).runtimeMappings(this.runtimeMappings)).indicesOptions(this.indicesOptions);
        ThreadContext.StoredContext stashWithOrigin = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
        try {
            List<Histogram.Bucket> buckets = ((SearchResponse) this.client.execute(SearchAction.INSTANCE, indicesOptions).actionGet()).getAggregations().get(DATE_BUCKETS).getBuckets();
            Map<Long, Long> newMapWithExpectedSize = Maps.newMapWithExpectedSize(buckets.size());
            for (Histogram.Bucket bucket : buckets) {
                long histogramKeyToEpoch = toHistogramKeyToEpoch(bucket.getKey());
                if (histogramKeyToEpoch < 0) {
                    throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp");
                }
                newMapWithExpectedSize.put(Long.valueOf(histogramKeyToEpoch), Long.valueOf(bucket.getDocCount()));
            }
            if (stashWithOrigin != null) {
                stashWithOrigin.close();
            }
            return newMapWithExpectedSize;
        } catch (Throwable th) {
            if (stashWithOrigin != null) {
                try {
                    stashWithOrigin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static long toHistogramKeyToEpoch(Object obj) {
        if (obj instanceof ZonedDateTime) {
            return ((ZonedDateTime) obj).toInstant().toEpochMilli();
        }
        if (obj instanceof Double) {
            return ((Double) obj).longValue();
        }
        if (obj instanceof Long) {
            return ((Long) obj).longValue();
        }
        return -1L;
    }

    private static long calculateMissing(Map<Long, Long> map, Bucket bucket) {
        return map.getOrDefault(Long.valueOf(bucket.getEpoch() * 1000), 0L).longValue() - bucket.getEventCount();
    }
}
