package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkLegacyStreamingWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.bl.IndexBL;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.IndexModel;
import it.agilelab.bigdata.wasp.core.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ElasticWriters.scala */
@ScalaSignature(bytes = "\u0006\u000154A!\u0001\u0002\u0001'\t9S\t\\1ti&\u001c7/Z1sG\"\u001c\u0006/\u0019:l\u0019\u0016<\u0017mY=TiJ,\u0017-\\5oO^\u0013\u0018\u000e^3s\u0015\t\u0019A!A\u0004fY\u0006\u001cH/[2\u000b\u0005\u00151\u0011a\u00029mk\u001eLgn\u001d\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\u0013\r|gn];nKJ\u001c(BA\u0006\r\u0003\u00119\u0018m\u001d9\u000b\u00055q\u0011a\u00022jO\u0012\fG/\u0019\u0006\u0003\u001fA\t\u0001\"Y4jY\u0016d\u0017M\u0019\u0006\u0002#\u0005\u0011\u0011\u000e^\u0002\u0001'\u0015\u0001AC\u0007\u0011)!\t)\u0002$D\u0001\u0017\u0015\u00059\u0012!B:dC2\f\u0017BA\r\u0017\u0005\u0019\te.\u001f*fMB\u00111DH\u0007\u00029)\u0011QDB\u0001\boJLG/\u001a:t\u0013\tyBD\u0001\u000eTa\u0006\u00148\u000eT3hC\u000eL8\u000b\u001e:fC6LgnZ,sSR,'\u000f\u0005\u0002\"M5\t!E\u0003\u0002$I\u0005)Q\u000f^5mg*\u0011QEC\u0001\u0005G>\u0014X-\u0003\u0002(E\t!R\t\\1ti&\u001c7i\u001c8gS\u001e,(/\u0019;j_:\u0004\"!\u000b\u0017\u000e\u0003)R!a\u000b\u0013\u0002\u000f1|wmZ5oO&\u0011QF\u000b\u0002\b\u0019><w-\u001b8h\u0011!y\u0003A!A!\u0002\u0013\u0001\u0014aB5oI\u0016D(\t\u0014\t\u0003cQj\u0011A\r\u0006\u0003g\u0011\n!A\u00197\n\u0005U\u0012$aB%oI\u0016D(\t\u0014\u0005\to\u0001\u0011\t\u0011)A\u0005q\u0005\u00191o]2\u0011\u0005e\nU\"\u0001\u001e\u000b\u0005mb\u0014!C:ue\u0016\fW.\u001b8h\u0015\t9QH\u0003\u0002?\u007f\u00051\u0011\r]1dQ\u0016T\u0011\u0001Q\u0001\u0004_J<\u0017B\u0001\";\u0005A\u0019FO]3b[&twmQ8oi\u0016DH\u000f\u0003\u0005E\u0001\t\u0005\t\u0015!\u0003F\u0003\u0011q\u0017-\\3\u0011\u0005\u0019KeBA\u000bH\u0013\tAe#\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u0015.\u0013aa\u0015;sS:<'B\u0001%\u0017\u0011!i\u0005A!A!\u0002\u0013q\u0015!E3mCN$\u0018nY!e[&t\u0017i\u0019;peB\u0011q\nV\u0007\u0002!*\u0011\u0011KU\u0001\u0006C\u000e$xN\u001d\u0006\u0002'\u0006!\u0011m[6b\u0013\t)\u0006K\u0001\u0005BGR|'OU3g\u0011\u00159\u0006\u0001\"\u0001Y\u0003\u0019a\u0014N\\5u}Q)\u0011l\u0017/^=B\u0011!\fA\u0007\u0002\u0005!)qF\u0016a\u0001a!)qG\u0016a\u0001q!)AI\u0016a\u0001\u000b\")QJ\u0016a\u0001\u001d\")\u0001\r\u0001C!C\u0006)qO]5uKR\u0011!-\u001a\t\u0003+\rL!\u0001\u001a\f\u0003\tUs\u0017\u000e\u001e\u0005\u0006M~\u0003\raZ\u0001\u0007gR\u0014X-Y7\u0011\u0007!\\W)D\u0001j\u0015\tQ'(A\u0004egR\u0014X-Y7\n\u00051L'a\u0002#TiJ,\u0017-\u001c")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticsearchSparkLegacyStreamingWriter.class */
public class ElasticsearchSparkLegacyStreamingWriter implements SparkLegacyStreamingWriter, ElasticConfiguration, Logging {
    private final IndexBL indexBL;
    private final StreamingContext ssc;
    public final String it$agilelab$bigdata$wasp$consumers$spark$plugins$elastic$ElasticsearchSparkLegacyStreamingWriter$$name;
    private final ActorRef elasticAdminActor;
    private final WaspLogger logger;
    private final ElasticConfigModel elasticConfig;
    private volatile boolean bitmap$0;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.elasticConfig = ElasticConfiguration.class.elasticConfig(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.elasticConfig;
        }
    }

    public ElasticConfigModel elasticConfig() {
        return this.bitmap$0 ? this.elasticConfig : elasticConfig$lzycompute();
    }

    public void write(DStream<String> dStream) {
        Option byName = this.indexBL.getByName(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$elastic$ElasticsearchSparkLegacyStreamingWriter$$name);
        if (!byName.isDefined()) {
            logger().warn(new Elasticsearc$$$$6e36a59f5c8f67e599a6bc6951f8952e$$$$nfun$write$5(this));
            return;
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        logger().info(new Elasticsearc$$$$43dc67d1669d449dc3ef6128d5d078e2$$$$nfun$write$1(this, indexModel, eventuallyTimedName));
        if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(this.elasticAdminActor, new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error creating index ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel}));
            logger().error(new Elasticsearc$$$$97489d98665988c56ed86f369bdd77$$$$nfun$write$4(this, s));
            throw new Exception(s);
        }
        Broadcast broadcast = this.ssc.sparkContext().broadcast(indexModel.resource(), ClassTag$.MODULE$.apply(String.class));
        Broadcast broadcast2 = this.ssc.sparkContext().broadcast(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.nodes"), ((TraversableOnce) elasticConfig().connections().filter(new Elasticsearc$$$$e637555fc838c3be81db65cf8d4c0$$$$r$$anonfun$1(this))).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.input.json"), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.batch.size.entries"), "1")})).$plus$plus(Option$.MODULE$.option2Iterable(((IndexModel) byName.get()).idField().map(new Elasticsearc$$$$d4d517178779bd7e3ee1c42c737a462b$$$$r$$anonfun$2(this)))), ClassTag$.MODULE$.apply(Map.class));
        logger().info(new Elasticsearc$$$$5b26e7e535bbf819db73925be5340bd$$$$nfun$write$2(this, broadcast, broadcast2));
        dStream.foreachRDD(new Elasticsearc$$$$4f1e954aa65868d32bd66f39b4c29d4$$$$nfun$write$3(this, broadcast, broadcast2));
    }

    public ElasticsearchSparkLegacyStreamingWriter(IndexBL indexBL, StreamingContext streamingContext, String str, ActorRef actorRef) {
        this.indexBL = indexBL;
        this.ssc = streamingContext;
        this.it$agilelab$bigdata$wasp$consumers$spark$plugins$elastic$ElasticsearchSparkLegacyStreamingWriter$$name = str;
        this.elasticAdminActor = actorRef;
        ElasticConfiguration.class.$init$(this);
        Logging.class.$init$(this);
    }
}
