package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import akka.actor.Props$;
import akka.pattern.AskableActorRef$;
import akka.pattern.package$;
import akka.util.Timeout;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.WaspConsumersSparkPlugin;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkBatchReader;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkLegacyStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkBatchWriter;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkLegacyStreamingWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.bl.IndexBL;
import it.agilelab.bigdata.wasp.core.bl.IndexBLImp;
import it.agilelab.bigdata.wasp.core.datastores.DatastoreProduct;
import it.agilelab.bigdata.wasp.core.datastores.DatastoreProduct$ElasticProduct$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.IndexModel;
import it.agilelab.bigdata.wasp.core.models.LegacyStreamingETLModel;
import it.agilelab.bigdata.wasp.core.models.ReaderModel;
import it.agilelab.bigdata.wasp.core.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.core.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.core.models.WriterModel;
import it.agilelab.bigdata.wasp.core.models.configuration.ValidationRule;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.core.utils.WaspDB;
import java.util.concurrent.TimeUnit;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.StreamingContext;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.concurrent.Await$;
import scala.concurrent.duration.Duration$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: ElasticConsumersSpark.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015h\u0001B\u0001\u0003\u0001M\u0011Q#\u00127bgRL7mQ8ogVlWM]:Ta\u0006\u00148N\u0003\u0002\u0004\t\u00059Q\r\\1ti&\u001c'BA\u0003\u0007\u0003\u001d\u0001H.^4j]NT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011!C2p]N,X.\u001a:t\u0015\tYA\"\u0001\u0003xCN\u0004(BA\u0007\u000f\u0003\u001d\u0011\u0017n\u001a3bi\u0006T!a\u0004\t\u0002\u0011\u0005<\u0017\u000e\\3mC\nT\u0011!E\u0001\u0003SR\u001c\u0001a\u0005\u0003\u0001)iq\u0002CA\u000b\u0019\u001b\u00051\"\"A\f\u0002\u000bM\u001c\u0017\r\\1\n\u0005e1\"AB!osJ+g\r\u0005\u0002\u001c95\tA!\u0003\u0002\u001e\t\tAr+Y:q\u0007>t7/^7feN\u001c\u0006/\u0019:l!2,x-\u001b8\u0011\u0005}!S\"\u0001\u0011\u000b\u0005\u0005\u0012\u0013a\u00027pO\u001eLgn\u001a\u0006\u0003G)\tAaY8sK&\u0011Q\u0005\t\u0002\b\u0019><w-\u001b8h\u0011\u00159\u0003\u0001\"\u0001)\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0006\u0005\u0002+\u00015\t!\u0001C\u0005-\u0001\u0001\u0007\t\u0019!C\u0001[\u00059\u0011N\u001c3fq\ncU#\u0001\u0018\u0011\u0005=\u0012T\"\u0001\u0019\u000b\u0005E\u0012\u0013A\u00012m\u0013\t\u0019\u0004GA\u0004J]\u0012,\u0007P\u0011'\t\u0013U\u0002\u0001\u0019!a\u0001\n\u00031\u0014aC5oI\u0016D(\tT0%KF$\"a\u000e\u001e\u0011\u0005UA\u0014BA\u001d\u0017\u0005\u0011)f.\u001b;\t\u000fm\"\u0014\u0011!a\u0001]\u0005\u0019\u0001\u0010J\u0019\t\ru\u0002\u0001\u0015)\u0003/\u0003!Ig\u000eZ3y\u00052\u0003\u0003\"C \u0001\u0001\u0004\u0005\r\u0011\"\u0001A\u0003I)G.Y:uS\u000e\fE-\\5o\u0003\u000e$xN]0\u0016\u0003\u0005\u0003\"AQ$\u000e\u0003\rS!\u0001R#\u0002\u000b\u0005\u001cGo\u001c:\u000b\u0003\u0019\u000bA!Y6lC&\u0011\u0001j\u0011\u0002\t\u0003\u000e$xN\u001d*fM\"I!\n\u0001a\u0001\u0002\u0004%\taS\u0001\u0017K2\f7\u000f^5d\u0003\u0012l\u0017N\\!di>\u0014xl\u0018\u0013fcR\u0011q\u0007\u0014\u0005\bw%\u000b\t\u00111\u0001B\u0011\u0019q\u0005\u0001)Q\u0005\u0003\u0006\u0019R\r\\1ti&\u001c\u0017\tZ7j]\u0006\u001bGo\u001c:`A!)\u0001\u000b\u0001C!#\u0006\u0001B-\u0019;bgR|'/\u001a)s_\u0012,8\r^\u000b\u0002%B\u00111KV\u0007\u0002)*\u0011QKI\u0001\u000bI\u0006$\u0018m\u001d;pe\u0016\u001c\u0018BA,U\u0005A!\u0015\r^1ti>\u0014X\r\u0015:pIV\u001cG\u000fC\u0003Z\u0001\u0011\u0005#,\u0001\u0006j]&$\u0018.\u00197ju\u0016$\"aN.\t\u000bqC\u0006\u0019A/\u0002\r]\f7\u000f\u001d#C!\tq\u0016-D\u0001`\u0015\t\u0001'%A\u0003vi&d7/\u0003\u0002c?\n1q+Y:q\t\nCQ\u0001\u001a\u0001\u0005B\u0015\f!cZ3u-\u0006d\u0017\u000eZ1uS>t'+\u001e7fgV\ta\rE\u0002h_Jt!\u0001[7\u000f\u0005%dW\"\u00016\u000b\u0005-\u0014\u0012A\u0002\u001fs_>$h(C\u0001\u0018\u0013\tqg#A\u0004qC\u000e\\\u0017mZ3\n\u0005A\f(aA*fc*\u0011aN\u0006\t\u0003gbl\u0011\u0001\u001e\u0006\u0003kZ\fQbY8oM&<WO]1uS>t'BA<#\u0003\u0019iw\u000eZ3mg&\u0011\u0011\u0010\u001e\u0002\u000f-\u0006d\u0017\u000eZ1uS>t'+\u001e7f\u0011\u0015Y\b\u0001\"\u0011}\u0003u9W\r^*qCJ\\G*Z4bGf\u001cFO]3b[&twm\u0016:ji\u0016\u0014HcB?\u0002\b\u0005\u0005\u0012Q\u0006\t\u0004}\u0006\rQ\"A@\u000b\u0007\u0005\u0005a!A\u0004xe&$XM]:\n\u0007\u0005\u0015qP\u0001\u000eTa\u0006\u00148\u000eT3hC\u000eL8\u000b\u001e:fC6LgnZ,sSR,'\u000fC\u0004\u0002\ni\u0004\r!a\u0003\u0002\u0007M\u001c8\r\u0005\u0003\u0002\u000e\u0005uQBAA\b\u0015\u0011\t\t\"a\u0005\u0002\u0013M$(/Z1nS:<'bA\u0004\u0002\u0016)!\u0011qCA\r\u0003\u0019\t\u0007/Y2iK*\u0011\u00111D\u0001\u0004_J<\u0017\u0002BA\u0010\u0003\u001f\u0011\u0001c\u0015;sK\u0006l\u0017N\\4D_:$X\r\u001f;\t\u000f\u0005\r\"\u00101\u0001\u0002&\u00059B.Z4bGf\u001cFO]3b[&tw-\u0012+M\u001b>$W\r\u001c\t\u0005\u0003O\tI#D\u0001w\u0013\r\tYC\u001e\u0002\u0018\u0019\u0016<\u0017mY=TiJ,\u0017-\\5oO\u0016#F*T8eK2Dq!a\f{\u0001\u0004\t\t$A\u0006xe&$XM]'pI\u0016d\u0007\u0003BA\u0014\u0003gI1!!\u000ew\u0005-9&/\u001b;fe6{G-\u001a7\t\u000f\u0005e\u0002\u0001\"\u0011\u0002<\u0005ir-\u001a;Ta\u0006\u00148\u000eT3hC\u000eL8\u000b\u001e:fC6Lgn\u001a*fC\u0012,'\u000f\u0006\u0005\u0002>\u0005%\u00131JA'!\u0011\ty$!\u0012\u000e\u0005\u0005\u0005#bAA\"\r\u00059!/Z1eKJ\u001c\u0018\u0002BA$\u0003\u0003\u0012!d\u00159be.dUmZ1dsN#(/Z1nS:<'+Z1eKJD\u0001\"!\u0003\u00028\u0001\u0007\u00111\u0002\u0005\t\u0003G\t9\u00041\u0001\u0002&!A\u0011qJA\u001c\u0001\u0004\t\t&A\u0006sK\u0006$WM]'pI\u0016d\u0007\u0003BA\u0014\u0003'J1!!\u0016w\u0005-\u0011V-\u00193fe6{G-\u001a7\t\u000f\u0005e\u0003\u0001\"\u0011\u0002\\\u0005\ts-\u001a;Ta\u0006\u00148n\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e<&/\u001b;feRA\u0011QLA2\u0003g\ni\bE\u0002+\u0003?J1!!\u0019\u0003\u0005-*E.Y:uS\u000e\u001cX-\u0019:dQN\u0003\u0018M]6TiJ,8\r^;sK\u0012\u001cFO]3b[&twm\u0016:ji\u0016\u0014\b\u0002CA3\u0003/\u0002\r!a\u001a\u0002\u0005M\u001c\b\u0003BA5\u0003_j!!a\u001b\u000b\t\u00055\u00141C\u0001\u0004gFd\u0017\u0002BA9\u0003W\u0012Ab\u00159be.\u001cVm]:j_:D\u0001\"!\u001e\u0002X\u0001\u0007\u0011qO\u0001\u001cgR\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ#U\u00196{G-\u001a7\u0011\t\u0005\u001d\u0012\u0011P\u0005\u0004\u0003w2(aG*ueV\u001cG/\u001e:fIN#(/Z1nS:<W\t\u0016'N_\u0012,G\u000e\u0003\u0005\u00020\u0005]\u0003\u0019AA\u0019\u0011\u001d\t\t\t\u0001C!\u0003\u0007\u000b\u0011eZ3u'B\f'o[*ueV\u001cG/\u001e:fIN#(/Z1nS:<'+Z1eKJ$\u0002\"!\"\u0002\f\u00065\u0015q\u0012\t\u0005\u0003\u007f\t9)\u0003\u0003\u0002\n\u0006\u0005#AH*qCJ\\7\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oOJ+\u0017\rZ3s\u0011!\t)'a A\u0002\u0005\u001d\u0004\u0002CA;\u0003\u007f\u0002\r!a\u001e\t\u0011\u0005E\u0015q\u0010a\u0001\u0003'\u000bAc\u001d;sK\u0006l\u0017N\\4SK\u0006$WM]'pI\u0016d\u0007\u0003BA\u0014\u0003+K1!a&w\u0005Q\u0019FO]3b[&twMU3bI\u0016\u0014Xj\u001c3fY\"9\u00111\u0014\u0001\u0005B\u0005u\u0015aE4fiN\u0003\u0018M]6CCR\u001c\u0007n\u0016:ji\u0016\u0014HCBAP\u0003K\u000b\t\fE\u0002\u007f\u0003CK1!a)��\u0005A\u0019\u0006/\u0019:l\u0005\u0006$8\r[,sSR,'\u000f\u0003\u0005\u0002(\u0006e\u0005\u0019AAU\u0003\t\u00198\r\u0005\u0003\u0002,\u00065VBAA\n\u0013\u0011\ty+a\u0005\u0003\u0019M\u0003\u0018M]6D_:$X\r\u001f;\t\u0011\u0005=\u0012\u0011\u0014a\u0001\u0003cAq!!.\u0001\t\u0003\n9,A\nhKR\u001c\u0006/\u0019:l\u0005\u0006$8\r\u001b*fC\u0012,'\u000f\u0006\u0004\u0002:\u0006}\u0016\u0011\u0019\t\u0005\u0003\u007f\tY,\u0003\u0003\u0002>\u0006\u0005#\u0001E*qCJ\\')\u0019;dQJ+\u0017\rZ3s\u0011!\t9+a-A\u0002\u0005%\u0006\u0002CA(\u0003g\u0003\r!!\u0015\t\u000f\u0005\u0015\u0007\u0001\"\u0003\u0002H\u0006q1\u000f^1siV\u0004X\t\\1ti&\u001cG\u0003BAe\u00037$2aNAf\u0011!\ti-a1A\u0004\u0005=\u0017a\u0002;j[\u0016|W\u000f\u001e\t\u0005\u0003#\f9.\u0004\u0002\u0002T*\u0019\u0011Q[#\u0002\tU$\u0018\u000e\\\u0005\u0005\u00033\f\u0019NA\u0004US6,w.\u001e;\t\u0011\u0005u\u00171\u0019a\u0001\u0003?\fQc]3sm&\u001cWm\u001d+j[\u0016|W\u000f^'jY2L7\u000fE\u0002\u0016\u0003CL1!a9\u0017\u0005\u0011auN\\4")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticConsumersSpark.class */
public class ElasticConsumersSpark implements WaspConsumersSparkPlugin, Logging {
    private IndexBL indexBL;
    private ActorRef elasticAdminActor_;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public IndexBL indexBL() {
        return this.indexBL;
    }

    public void indexBL_$eq(IndexBL indexBL) {
        this.indexBL = indexBL;
    }

    public ActorRef elasticAdminActor_() {
        return this.elasticAdminActor_;
    }

    public void elasticAdminActor__$eq(ActorRef actorRef) {
        this.elasticAdminActor_ = actorRef;
    }

    public DatastoreProduct datastoreProduct() {
        return DatastoreProduct$ElasticProduct$.MODULE$;
    }

    public void initialize(WaspDB waspDB) {
        logger().info(new ElasticConsumersSpark$$anonfun$initialize$1(this));
        indexBL_$eq(new IndexBLImp(waspDB));
        logger().info(new ElasticConsumersSpark$$anonfun$initialize$2(this));
        elasticAdminActor__$eq(WaspSystem$.MODULE$.actorSystem().actorOf(Props$.MODULE$.apply(new ElasticConsumersSpark$$anonfun$initialize$3(this), ClassTag$.MODULE$.apply(ElasticAdminActor.class)), ElasticAdminActor$.MODULE$.name()));
        startupElastic(WaspSystem$.MODULE$.waspConfig().servicesTimeoutMillis(), new Timeout(r0 - 1000, TimeUnit.MILLISECONDS));
    }

    public Seq<ValidationRule> getValidationRules() {
        return Seq$.MODULE$.apply(Nil$.MODULE$);
    }

    public SparkLegacyStreamingWriter getSparkLegacyStreamingWriter(StreamingContext streamingContext, LegacyStreamingETLModel legacyStreamingETLModel, WriterModel writerModel) {
        logger().info(new ElasticConsu$$$$6bb7e2cfb211f8a5214f31c1bc5df3b$$$$mingWriter$1(this, writerModel));
        return new ElasticsearchSparkLegacyStreamingWriter(indexBL(), streamingContext, writerModel.datastoreModelName(), elasticAdminActor_());
    }

    public SparkLegacyStreamingReader getSparkLegacyStreamingReader(StreamingContext streamingContext, LegacyStreamingETLModel legacyStreamingETLModel, ReaderModel readerModel) {
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The datastore product ", " is not a valid streaming source! Reader model ", " is not valid."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{datastoreProduct(), readerModel}));
        logger().error(new ElasticConsu$$$$28a39f5ed0959fc3d1e9e6385360b4d6$$$$mingReader$1(this, s));
        throw new UnsupportedOperationException(s);
    }

    /* renamed from: getSparkStructuredStreamingWriter, reason: merged with bridge method [inline-methods] */
    public ElasticsearchSparkStructuredStreamingWriter m29getSparkStructuredStreamingWriter(SparkSession sparkSession, StructuredStreamingETLModel structuredStreamingETLModel, WriterModel writerModel) {
        logger().info(new ElasticConsu$$$$ea3933ad55471228b8fdd9342c9b177$$$$mingWriter$1(this, writerModel));
        return new ElasticsearchSparkStructuredStreamingWriter(indexBL(), sparkSession, writerModel.datastoreModelName(), elasticAdminActor_());
    }

    public SparkStructuredStreamingReader getSparkStructuredStreamingReader(SparkSession sparkSession, StructuredStreamingETLModel structuredStreamingETLModel, StreamingReaderModel streamingReaderModel) {
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The datastore product ", " is not a valid streaming source! Reader model ", " is not valid."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{datastoreProduct(), streamingReaderModel}));
        logger().error(new ElasticConsu$$$$54863daaf58f8a197d35b1a927b0e4bc$$$$mingReader$1(this, s));
        throw new UnsupportedOperationException(s);
    }

    public SparkBatchWriter getSparkBatchWriter(SparkContext sparkContext, WriterModel writerModel) {
        logger().info(new ElasticConsu$$$$8444be95e8d8fac664e54d9cbc7eba83$$$$atchWriter$1(this, writerModel));
        return new ElasticsearchSparkBatchWriter(indexBL(), sparkContext, writerModel.name(), elasticAdminActor_());
    }

    public SparkBatchReader getSparkBatchReader(SparkContext sparkContext, ReaderModel readerModel) {
        Option byName = indexBL().getByName(readerModel.name());
        if (!byName.isDefined()) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Index model not found: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{readerModel}));
            logger().error(new ElasticConsu$$$$c91fdfe059fd1d7223e397e08b4d532$$$$atchReader$3(this, s));
            throw new Exception(s);
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        logger().info(new ElasticConsu$$$$92a6cdcc7ad55446d7d1fc1583136$$$$atchReader$1(this, indexModel, eventuallyTimedName));
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"There no define schema in the index configuration: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The index name must be all lowercase: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        if (BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(elasticAdminActor_(), new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            return new ElasticsearchSparkBatchReader(indexModel);
        }
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error creating elastic index: ", " with this index name ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel, eventuallyTimedName}));
        logger().error(new ElasticConsu$$$$fcaa236ee5ca995a9b8a19e93f2910$$$$atchReader$2(this, s2));
        throw new Exception(s2);
    }

    private void startupElastic(long j, Timeout timeout) {
        logger().info(new ElasticConsumersSpark$$anonfun$startupElastic$1(this));
        ActorRef ask = package$.MODULE$.ask(elasticAdminActor_());
        Initialization initialization = new Initialization(ConfigManager$.MODULE$.getElasticConfig());
        boolean z = false;
        Some some = null;
        Option value = Await$.MODULE$.ready(AskableActorRef$.MODULE$.$qmark$extension1(ask, initialization, timeout, AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, initialization)), Duration$.MODULE$.apply(j, TimeUnit.MILLISECONDS)).value();
        if (value instanceof Some) {
            z = true;
            some = (Some) value;
            Failure failure = (Try) some.x();
            if (failure instanceof Failure) {
                Throwable exception = failure.exception();
                logger().error(new ElasticConsumersSpark$$anonfun$startupElastic$2(this, exception));
                throw new Exception(exception);
            }
        }
        if (z && (((Try) some.x()) instanceof Success)) {
            logger().info(new ElasticConsumersSpark$$anonfun$startupElastic$3(this));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(value)) {
                throw new MatchError(value);
            }
            throw new UnknownError("Unknown error during Elastic connection initialization");
        }
    }

    public ElasticConsumersSpark() {
        Logging.class.$init$(this);
    }
}
