/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ml.engine.ingest;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.ml.common.transport.batch.MLBatchIngestionInput;
import org.opensearch.ml.engine.annotation.Ingester;
import org.opensearch.ml.engine.ingest.AbstractIngestion;
import org.opensearch.transport.client.Client;

@Ingester(value="openai")
public class OpenAIDataIngestion
extends AbstractIngestion {
    @Generated
    private static final Logger log = LogManager.getLogger(OpenAIDataIngestion.class);
    private static final String API_KEY = "openAI_key";
    private static final String API_URL = "https://api.openai.com/v1/files/";
    public static final String SOURCE = "source";

    public OpenAIDataIngestion(Client client) {
        super(client);
    }

    @Override
    public double ingest(MLBatchIngestionInput mlBatchIngestionInput, int bulkSize) {
        List sources = (List)mlBatchIngestionInput.getDataSources().get(SOURCE);
        if (Objects.isNull(sources) || sources.isEmpty()) {
            return 100.0;
        }
        boolean isSoleSource = sources.size() == 1;
        List<Double> successRates = Collections.synchronizedList(new ArrayList());
        for (int sourceIndex = 0; sourceIndex < sources.size(); ++sourceIndex) {
            successRates.add(this.ingestSingleSource((String)sources.get(sourceIndex), mlBatchIngestionInput, sourceIndex, isSoleSource, bulkSize));
        }
        return this.calculateSuccessRate(successRates);
    }

    private double ingestSingleSource(String fileId, MLBatchIngestionInput mlBatchIngestionInput, int sourceIndex, boolean isSoleSource, int bulkSize) {
        double successRate = 0.0;
        try {
            String apiKey = (String)mlBatchIngestionInput.getCredential().get(API_KEY);
            URL url = new URL(API_URL + fileId + "/content");
            HttpURLConnection connection = (HttpURLConnection)url.openConnection();
            connection.setRequestMethod("GET");
            connection.setRequestProperty("Authorization", "Bearer " + apiKey);
            try (InputStreamReader inputStreamReader = AccessController.doPrivileged(() -> new InputStreamReader(connection.getInputStream()));
                 BufferedReader reader = new BufferedReader(inputStreamReader);){
                CompletableFuture<Void> future;
                String line;
                ArrayList<String> linesBuffer = new ArrayList<String>();
                int lineCount = 0;
                AtomicInteger successfulBatches = new AtomicInteger(0);
                AtomicInteger failedBatches = new AtomicInteger(0);
                ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
                while ((line = reader.readLine()) != null) {
                    linesBuffer.add(line);
                    if (++lineCount % bulkSize != 0) continue;
                    future = new CompletableFuture();
                    this.batchIngest(linesBuffer, mlBatchIngestionInput, this.getBulkResponseListener(successfulBatches, failedBatches, future), sourceIndex, isSoleSource);
                    futures.add(future);
                    linesBuffer.clear();
                }
                if (!linesBuffer.isEmpty()) {
                    future = new CompletableFuture<Void>();
                    this.batchIngest(linesBuffer, mlBatchIngestionInput, this.getBulkResponseListener(successfulBatches, failedBatches, future), sourceIndex, isSoleSource);
                    futures.add(future);
                }
                reader.close();
                CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
                allFutures.join();
                int totalBatches = successfulBatches.get() + failedBatches.get();
                successRate = totalBatches == 0 ? 100.0 : (double)successfulBatches.get() / (double)totalBatches * 100.0;
            }
        }
        catch (PrivilegedActionException e) {
            throw new RuntimeException("Failed to read from OpenAI file API: ", e);
        }
        catch (Exception e) {
            log.error(e.getMessage());
            throw new OpenSearchStatusException("Failed to batch ingest: " + e.getMessage(), RestStatus.INTERNAL_SERVER_ERROR, new Object[0]);
        }
        return successRate;
    }
}

