package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.bl.IndexBL;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.IndexModel;
import it.agilelab.bigdata.wasp.core.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ElasticWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005a\u0001B\u0001\u0003\u0001M\u0011Q%\u00127bgRL7m\u00159be.\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h/JLG/\u001a:\u000b\u0005\r!\u0011aB3mCN$\u0018n\u0019\u0006\u0003\u000b\u0019\tq\u0001\u001d7vO&t7O\u0003\u0002\b\u0011\u0005)1\u000f]1sW*\u0011\u0011BC\u0001\nG>t7/^7feNT!a\u0003\u0007\u0002\t]\f7\u000f\u001d\u0006\u0003\u001b9\tqAY5hI\u0006$\u0018M\u0003\u0002\u0010!\u0005A\u0011mZ5mK2\f'MC\u0001\u0012\u0003\tIGo\u0001\u0001\u0014\u000b\u0001!\"\u0004\t\u0015\u0011\u0005UAR\"\u0001\f\u000b\u0003]\tQa]2bY\u0006L!!\u0007\f\u0003\r\u0005s\u0017PU3g!\tYb$D\u0001\u001d\u0015\tib!A\u0004xe&$XM]:\n\u0005}a\"AH*qCJ\\7\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO^\u0013\u0018\u000e^3s!\t\tc%D\u0001#\u0015\t\u0019C%A\u0003vi&d7O\u0003\u0002&\u0015\u0005!1m\u001c:f\u0013\t9#E\u0001\u000bFY\u0006\u001cH/[2D_:4\u0017nZ;sCRLwN\u001c\t\u0003S1j\u0011A\u000b\u0006\u0003W\u0011\nq\u0001\\8hO&tw-\u0003\u0002.U\t9Aj\\4hS:<\u0007\u0002C\u0018\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0019\u0002\u000f%tG-\u001a=C\u0019B\u0011\u0011\u0007N\u0007\u0002e)\u00111\u0007J\u0001\u0003E2L!!\u000e\u001a\u0003\u000f%sG-\u001a=C\u0019\"Aq\u0007\u0001B\u0001B\u0003%\u0001(\u0001\u0002tgB\u0011\u0011(Q\u0007\u0002u)\u00111\bP\u0001\u0004gFd'BA\u0004>\u0015\tqt(\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0001\u0006\u0019qN]4\n\u0005\tS$\u0001D*qCJ\\7+Z:tS>t\u0007\u0002\u0003#\u0001\u0005\u0003\u0005\u000b\u0011B#\u0002\t9\fW.\u001a\t\u0003\r&s!!F$\n\u0005!3\u0012A\u0002)sK\u0012,g-\u0003\u0002K\u0017\n11\u000b\u001e:j]\u001eT!\u0001\u0013\f\t\u00115\u0003!\u0011!Q\u0001\n9\u000b\u0011#\u001a7bgRL7-\u00113nS:\f5\r^8s!\tyE+D\u0001Q\u0015\t\t&+A\u0003bGR|'OC\u0001T\u0003\u0011\t7n[1\n\u0005U\u0003&\u0001C!di>\u0014(+\u001a4\t\u000b]\u0003A\u0011\u0001-\u0002\rqJg.\u001b;?)\u0015I6\fX/_!\tQ\u0006!D\u0001\u0003\u0011\u0015yc\u000b1\u00011\u0011\u00159d\u000b1\u00019\u0011\u0015!e\u000b1\u0001F\u0011\u0015ie\u000b1\u0001O\u0011\u0015\u0001\u0007\u0001\"\u0011b\u0003\u00159(/\u001b;f)\u0011\u0011\u0007\u000e @\u0011\u0005\r4W\"\u00013\u000b\u0005\u0015T\u0014!C:ue\u0016\fW.\u001b8h\u0013\t9GM\u0001\bTiJ,\u0017-\\5oOF+XM]=\t\u000b%|\u0006\u0019\u00016\u0002\rM$(/Z1n!\tY\u0017P\u0004\u0002mo:\u0011QN\u001e\b\u0003]Vt!a\u001c;\u000f\u0005A\u001cX\"A9\u000b\u0005I\u0014\u0012A\u0002\u001fs_>$h(C\u0001A\u0013\tqt(\u0003\u0002\b{%\u00111\bP\u0005\u0003qj\nq\u0001]1dW\u0006<W-\u0003\u0002{w\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0003qjBQ!`0A\u0002\u0015\u000b\u0011\"];feft\u0015-\\3\t\u000b}|\u0006\u0019A#\u0002\u001b\rDWmY6q_&tG\u000fR5s\u0001")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticSparkStructuredStreamingWriter.class */
public class ElasticSparkStructuredStreamingWriter implements SparkStructuredStreamingWriter, ElasticConfiguration, Logging {
    private final IndexBL indexBL;
    private final String name;
    private final ActorRef elasticAdminActor;
    private final WaspLogger logger;
    private final ElasticConfigModel elasticConfig;
    private volatile boolean bitmap$0;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.elasticConfig = ElasticConfiguration.class.elasticConfig(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.elasticConfig;
        }
    }

    public ElasticConfigModel elasticConfig() {
        return this.bitmap$0 ? this.elasticConfig : elasticConfig$lzycompute();
    }

    public StreamingQuery write(Dataset<Row> dataset, String str, String str2) {
        Option byName = this.indexBL.getByName(this.name);
        if (!byName.isDefined()) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The index '", "' does not exits pay ATTENTION spark won't start"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.name}));
            logger().error(new ElasticSpark$$$$67fbc633c3ae3cb75c5f5e6c259e59c$$$$nfun$write$8(this, s));
            throw new Exception(s);
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        String resource = indexModel.resource();
        logger().info(new ElasticSpark$$$$525febd48f39272e97cea37f83bb395$$$$nfun$write$6(this, indexModel, eventuallyTimedName));
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"There no define schema in the index configuration: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The index name must be all lowercase: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        Map $plus$plus = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("checkpointLocation"), str2)})).$plus$plus(Option$.MODULE$.option2Iterable(((IndexModel) byName.get()).idField().map(new ElasticSparkStructuredStreamingWriter$$anonfun$3(this))));
        if (BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(this.elasticAdminActor, new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            DataStreamWriter queryName = dataset.writeStream().options($plus$plus).format("es").queryName(str);
            return ConfigManager$.MODULE$.getSparkStreamingConfig().triggerIntervalMs().isDefined() ? queryName.trigger(Trigger.ProcessingTime(BoxesRunTime.unboxToLong(ConfigManager$.MODULE$.getSparkStreamingConfig().triggerIntervalMs().get()))).start(resource) : queryName.start(resource);
        }
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error creating elastic index: ", " with this index name ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel, eventuallyTimedName}));
        logger().error(new ElasticSpark$$$$f4933410a99568a9a6f191adb819714$$$$nfun$write$7(this, s2));
        throw new Exception(s2);
    }

    public ElasticSparkStructuredStreamingWriter(IndexBL indexBL, SparkSession sparkSession, String str, ActorRef actorRef) {
        this.indexBL = indexBL;
        this.name = str;
        this.elasticAdminActor = actorRef;
        ElasticConfiguration.class.$init$(this);
        Logging.class.$init$(this);
    }
}
