package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkBatchWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.bl.IndexBL;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.IndexModel;
import it.agilelab.bigdata.wasp.core.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.elasticsearch.spark.sql.EsSparkSQL$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ElasticWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001e4A!\u0001\u0002\u0001'\tiR\t\\1ti&\u001c7/Z1sG\"\u001c\u0006/\u0019:l\u0005\u0006$8\r[,sSR,'O\u0003\u0002\u0004\t\u00059Q\r\\1ti&\u001c'BA\u0003\u0007\u0003\u001d\u0001H.^4j]NT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011!C2p]N,X.\u001a:t\u0015\tYA\"\u0001\u0003xCN\u0004(BA\u0007\u000f\u0003\u001d\u0011\u0017n\u001a3bi\u0006T!a\u0004\t\u0002\u0011\u0005<\u0017\u000e\\3mC\nT\u0011!E\u0001\u0003SR\u001c\u0001aE\u0003\u0001)i\u0001\u0003\u0006\u0005\u0002\u001615\taCC\u0001\u0018\u0003\u0015\u00198-\u00197b\u0013\tIbC\u0001\u0004B]f\u0014VM\u001a\t\u00037yi\u0011\u0001\b\u0006\u0003;\u0019\tqa\u001e:ji\u0016\u00148/\u0003\u0002 9\t\u00012\u000b]1sW\n\u000bGo\u00195Xe&$XM\u001d\t\u0003C\u0019j\u0011A\t\u0006\u0003G\u0011\nQ!\u001e;jYNT!!\n\u0006\u0002\t\r|'/Z\u0005\u0003O\t\u0012A#\u00127bgRL7mQ8oM&<WO]1uS>t\u0007CA\u0015-\u001b\u0005Q#BA\u0016%\u0003\u001dawnZ4j]\u001eL!!\f\u0016\u0003\u000f1{wmZ5oO\"Aq\u0006\u0001B\u0001B\u0003%\u0001'A\u0004j]\u0012,\u0007P\u0011'\u0011\u0005E\"T\"\u0001\u001a\u000b\u0005M\"\u0013A\u00012m\u0013\t)$GA\u0004J]\u0012,\u0007P\u0011'\t\u0011]\u0002!\u0011!Q\u0001\na\n!a]2\u0011\u0005ezT\"\u0001\u001e\u000b\u0005\u001dY$B\u0001\u001f>\u0003\u0019\t\u0007/Y2iK*\ta(A\u0002pe\u001eL!\u0001\u0011\u001e\u0003\u0019M\u0003\u0018M]6D_:$X\r\u001f;\t\u0011\t\u0003!\u0011!Q\u0001\n\r\u000bAA\\1nKB\u0011Ai\u0012\b\u0003+\u0015K!A\u0012\f\u0002\rA\u0013X\rZ3g\u0013\tA\u0015J\u0001\u0004TiJLgn\u001a\u0006\u0003\rZA\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001T\u0001\u0012K2\f7\u000f^5d\u0003\u0012l\u0017N\\!di>\u0014\bCA'S\u001b\u0005q%BA(Q\u0003\u0015\t7\r^8s\u0015\u0005\t\u0016\u0001B1lW\u0006L!a\u0015(\u0003\u0011\u0005\u001bGo\u001c:SK\u001aDQ!\u0016\u0001\u0005\u0002Y\u000ba\u0001P5oSRtD#B,Z5nc\u0006C\u0001-\u0001\u001b\u0005\u0011\u0001\"B\u0018U\u0001\u0004\u0001\u0004\"B\u001cU\u0001\u0004A\u0004\"\u0002\"U\u0001\u0004\u0019\u0005\"B&U\u0001\u0004a\u0005\"\u00020\u0001\t\u0003z\u0016!B<sSR,GC\u00011d!\t)\u0012-\u0003\u0002c-\t!QK\\5u\u0011\u0015!W\f1\u0001f\u0003\u0011!\u0017\r^1\u0011\u0005\u00194hBA4t\u001d\tA\u0017O\u0004\u0002ja:\u0011!n\u001c\b\u0003W:l\u0011\u0001\u001c\u0006\u0003[J\ta\u0001\u0010:p_Rt\u0014\"\u0001 \n\u0005qj\u0014BA\u0004<\u0013\t\u0011((A\u0002tc2L!\u0001^;\u0002\u000fA\f7m[1hK*\u0011!OO\u0005\u0003ob\u0014\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0005Q,\b")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticsearchSparkBatchWriter.class */
public class ElasticsearchSparkBatchWriter implements SparkBatchWriter, ElasticConfiguration, Logging {
    private final IndexBL indexBL;
    private final SparkContext sc;
    public final String it$agilelab$bigdata$wasp$consumers$spark$plugins$elastic$ElasticsearchSparkBatchWriter$$name;
    private final ActorRef elasticAdminActor;
    private final WaspLogger logger;
    private final ElasticConfigModel elasticConfig;
    private volatile boolean bitmap$0;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.elasticConfig = ElasticConfiguration.class.elasticConfig(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.elasticConfig;
        }
    }

    public ElasticConfigModel elasticConfig() {
        return this.bitmap$0 ? this.elasticConfig : elasticConfig$lzycompute();
    }

    public void write(Dataset<Row> dataset) {
        Option byName = this.indexBL.getByName(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$elastic$ElasticsearchSparkBatchWriter$$name);
        if (!byName.isDefined()) {
            logger().warn(new ElasticsearchSparkBatchWriter$$anonfun$write$13(this));
            return;
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        logger().info(new ElasticsearchSparkBatchWriter$$anonfun$write$9(this, indexModel, eventuallyTimedName));
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"There no define schema in the index configuration: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The index name must be all lowercase: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(this.elasticAdminActor, new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error creating index ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel}));
            logger().error(new ElasticsearchSparkBatchWriter$$anonfun$write$12(this, s));
            throw new Exception(s);
        }
        Map $plus$plus = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.nodes"), this.sc.broadcast(((TraversableOnce) elasticConfig().connections().filter(new ElasticsearchSparkBatchWriter$$anonfun$4(this))).mkString(","), ClassTag$.MODULE$.apply(String.class)).value()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.batch.size.entries"), "1")})).$plus$plus(Option$.MODULE$.option2Iterable(((IndexModel) byName.get()).idField().map(new ElasticsearchSparkBatchWriter$$anonfun$5(this))));
        logger().info(new ElasticsearchSparkBatchWriter$$anonfun$write$10(this, dataset));
        logger().info(new ElasticsearchSparkBatchWriter$$anonfun$write$11(this, indexModel, $plus$plus));
        EsSparkSQL$.MODULE$.saveToEs(dataset, indexModel.resource(), $plus$plus);
    }

    public ElasticsearchSparkBatchWriter(IndexBL indexBL, SparkContext sparkContext, String str, ActorRef actorRef) {
        this.indexBL = indexBL;
        this.sc = sparkContext;
        this.it$agilelab$bigdata$wasp$consumers$spark$plugins$elastic$ElasticsearchSparkBatchWriter$$name = str;
        this.elasticAdminActor = actorRef;
        ElasticConfiguration.class.$init$(this);
        Logging.class.$init$(this);
    }
}
