package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$;
import it.agilelab.bigdata.wasp.core.bl.TopicBL;
import it.agilelab.bigdata.wasp.core.bl.TopicBLImp;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.DatastoreModel;
import it.agilelab.bigdata.wasp.core.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.core.models.MultiTopicModel$;
import it.agilelab.bigdata.wasp.core.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.core.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.core.models.TopicModel;
import it.agilelab.bigdata.wasp.core.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.core.utils.AvroToRow;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.core.utils.WaspDB$;
import it.agilelab.bigdata.wasp.spark.sql.kafka011.KafkaSparkSQLSchemas$;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaSparkStreamingReaders.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaSparkStructuredStreamingReader$.class */
public final class KafkaSparkStructuredStreamingReader$ implements SparkStructuredStreamingReader, Logging {
    public static final KafkaSparkStructuredStreamingReader$ MODULE$ = null;
    private final WaspLogger logger;

    static {
        new KafkaSparkStructuredStreamingReader$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public Dataset<Row> createStructuredStream(StructuredStreamingETLModel structuredStreamingETLModel, StreamingReaderModel streamingReaderModel, SparkSession sparkSession) {
        Dataset<Row> selectExpr;
        logger().info(new KafkaSparkSt$$$$7f2df191b17a4cfcbada896d20fa7a$$$$uredStream$1(structuredStreamingETLModel, streamingReaderModel));
        logger().info(new KafkaSparkSt$$$$e89cea1aeafa9dcad64483db721f828$$$$uredStream$2(streamingReaderModel));
        Seq<TopicModel> retrieveTopicModelsRecursively = retrieveTopicModelsRecursively(new TopicBLImp(WaspDB$.MODULE$.getDB()), streamingReaderModel.datastoreModelName());
        MultiTopicModel$.MODULE$.validateTopicModels(retrieveTopicModelsRecursively);
        logger().info(new KafkaSparkSt$$$$487829b8e3449310b689d08ca62d9e2c$$$$uredStream$3(retrieveTopicModelsRecursively));
        KafkaConfigModel kafkaConfig = ConfigManager$.MODULE$.getKafkaConfig();
        logger().info(new KafkaSparkSt$$$$9745dcde7b70db7c491754d39618d9f$$$$uredStream$4(kafkaConfig));
        if (!BoxesRunTime.unboxToBoolean(((TraversableOnce) retrieveTopicModelsRecursively.map(new KafkaSparkStructuredStreamingReader$$anonfun$3(), Seq$.MODULE$.canBuildFrom())).reduce(new KafkaSparkStructuredStreamingReader$$anonfun$4()))) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unable to check/create one or more topic; topics: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{retrieveTopicModelsRecursively}));
            logger().error(new KafkaSparkSt$$$$59e3d03a8f1c29ec6de25d9cb6e21434$$$$redStream$10(s));
            throw new Exception(s);
        }
        TopicModel topicModel = (TopicModel) retrieveTopicModelsRecursively.head();
        Option map = streamingReaderModel.rateLimit().map(new KafkaSparkStructuredStreamingReader$$anonfun$1(SparkUtils$.MODULE$.getTriggerIntervalMs(ConfigManager$.MODULE$.getSparkStreamingConfig(), structuredStreamingETLModel))).map(new KafkaSparkStructuredStreamingReader$$anonfun$5());
        Map empty = Map$.MODULE$.empty();
        empty.$plus$plus$eq(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("subscribe"), ((TraversableOnce) retrieveTopicModelsRecursively.map(new KafkaSparkSt$$$$18f4a3408ac2bd9d2c9f4e59e24a54fa$$$$uredStream$5(), Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.bootstrap.servers"), ((TraversableOnce) kafkaConfig.connections().map(new KafkaSparkSt$$$$6e2fd9d66dfcf8c2a872573b5acb17$$$$uredStream$6(), Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafkaConsumer.pollTimeoutMs"), BoxesRunTime.boxToInteger(kafkaConfig.ingestRateToMills()).toString())})));
        empty.$plus$plus$eq(Option$.MODULE$.option2Iterable(map));
        empty.$plus$plus$eq(((TraversableOnce) kafkaConfig.others().map(new KafkaSparkSt$$$$647f32a3525b5f1be3e8338f881cae$$$$uredStream$7(), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        empty.$plus$plus$eq(streamingReaderModel.options());
        logger().info(new KafkaSparkSt$$$$c15ca2e085904911deab388021aade20$$$$uredStream$8(empty));
        Dataset load = sparkSession.readStream().format("kafka").options(empty).load();
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"struct(", ") as kafkaMetadata"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{((Seq) ((TraversableLike) KafkaSparkSQLSchemas$.MODULE$.INPUT_SCHEMA().map(new KafkaSparkStructuredStreamingReader$$anonfun$6(), Seq$.MODULE$.canBuildFrom())).filter(new KafkaSparkStructuredStreamingReader$$anonfun$7())).mkString(", ")}));
        String str = topicModel.topicDataType();
        if ("avro".equals(str)) {
            logger().debug(new KafkaSparkStructuredStreamingReader$$anonfun$8(topicModel));
            AvroToRow avroToRow = new AvroToRow(topicModel.getJsonSchema());
            selectExpr = load.withColumn("value_parsed", functions$.MODULE$.udf(new KafkaSparkStructuredStreamingReader$$anonfun$9(avroToRow), avroToRow.getSchemaSpark()).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("value")}))).drop("value").selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{s2, "value_parsed.*"}));
        } else if ("json".equals(str)) {
            KafkaSparkStructuredStreamingReader$$anonfun$10 kafkaSparkStructuredStreamingReader$$anonfun$10 = new KafkaSparkStructuredStreamingReader$$anonfun$10();
            selectExpr = load.withColumn("value_parsed", functions$.MODULE$.udf(kafkaSparkStructuredStreamingReader$$anonfun$10, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$80f88086d941557d4588d9aa26b282d$$$$pecreator1$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
                }
            }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$9ecea4210a12dacb84fe8112a339748$$$$pecreator2$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{mirror.staticClass("scala.Byte").asType().toTypeConstructor()})));
                }
            })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("value")}))).drop("value").withColumn("value", functions$.MODULE$.from_json(functions$.MODULE$.col("value_parsed"), topicModel.getDataType())).selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{s2, "value.*"}));
        } else if ("plaintext".equals(str)) {
            KafkaSparkStructuredStreamingReader$$anonfun$11 kafkaSparkStructuredStreamingReader$$anonfun$11 = new KafkaSparkStructuredStreamingReader$$anonfun$11();
            selectExpr = load.withColumn("value_string", functions$.MODULE$.udf(kafkaSparkStructuredStreamingReader$$anonfun$11, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$20cebb34dfec4737371136e85c3c284$$$$pecreator3$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
                }
            }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$ced04eecb9afe521fbe0b168ffc221b4$$$$pecreator4$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{mirror.staticClass("scala.Byte").asType().toTypeConstructor()})));
                }
            })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("value")}))).selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{s2, "value_string AS value"}));
        } else {
            if (!"binary".equals(str)) {
                throw new UnsupportedOperationException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unsupported topic data type \"", "\""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{topicModel.topicDataType()})));
            }
            selectExpr = load.selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{s2, "value"}));
        }
        Dataset<Row> dataset = selectExpr;
        logger().debug(new KafkaSparkSt$$$$9a2fbcd661bd353d4da8b482ff03b$$$$uredStream$9(dataset));
        return dataset;
    }

    private Seq<TopicModel> retrieveTopicModelsRecursively(TopicBL topicBL, String str) {
        return it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1(str, topicBL);
    }

    public final Seq it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1(String str, TopicBL topicBL) {
        Seq seq;
        TopicModel topicModel = (DatastoreModel) topicBL.getByName(str).get();
        if (topicModel instanceof TopicModel) {
            seq = (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new TopicModel[]{topicModel}));
        } else {
            if (!(topicModel instanceof MultiTopicModel)) {
                throw new MatchError(topicModel);
            }
            seq = (Seq) ((MultiTopicModel) topicModel).topicModelNames().flatMap(new KafkaSparkSt$$$$95ed5b11a379e9cbcfa5164deded1e$$$$ursively$1$1(topicBL), Seq$.MODULE$.canBuildFrom());
        }
        return seq;
    }

    private KafkaSparkStructuredStreamingReader$() {
        MODULE$ = this;
        Logging.class.$init$(this);
    }
}
