package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.bl.TopicBL;
import it.agilelab.bigdata.wasp.core.datastores.TopicCategory;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.DatastoreModel;
import it.agilelab.bigdata.wasp.core.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.core.models.MultiTopicModel$;
import it.agilelab.bigdata.wasp.core.models.TopicModel;
import it.agilelab.bigdata.wasp.core.models.configuration.TinyKafkaConfig;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaSparkStructuredStreamingWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001u4A!\u0001\u0002\u0001'\t\u00193*\u00194lCN\u0003\u0018M]6TiJ,8\r^;sK\u0012\u001cFO]3b[&twm\u0016:ji\u0016\u0014(BA\u0002\u0005\u0003\u0015Y\u0017MZ6b\u0015\t)a!A\u0004qYV<\u0017N\\:\u000b\u0005\u001dA\u0011!B:qCJ\\'BA\u0005\u000b\u0003%\u0019wN\\:v[\u0016\u00148O\u0003\u0002\f\u0019\u0005!q/Y:q\u0015\tia\"A\u0004cS\u001e$\u0017\r^1\u000b\u0005=\u0001\u0012\u0001C1hS2,G.\u00192\u000b\u0003E\t!!\u001b;\u0004\u0001M!\u0001\u0001\u0006\u000e!!\t)\u0002$D\u0001\u0017\u0015\u00059\u0012!B:dC2\f\u0017BA\r\u0017\u0005\u0019\te.\u001f*fMB\u00111DH\u0007\u00029)\u0011QDB\u0001\boJLG/\u001a:t\u0013\tyBD\u0001\u0010Ta\u0006\u00148n\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e<&/\u001b;feB\u0011\u0011EJ\u0007\u0002E)\u00111\u0005J\u0001\bY><w-\u001b8h\u0015\t)#\"\u0001\u0003d_J,\u0017BA\u0014#\u0005\u001daunZ4j]\u001eD\u0001\"\u000b\u0001\u0003\u0002\u0003\u0006IAK\u0001\bi>\u0004\u0018n\u0019\"M!\tYc&D\u0001-\u0015\tiC%\u0001\u0002cY&\u0011q\u0006\f\u0002\b)>\u0004\u0018n\u0019\"M\u0011!\t\u0004A!A!\u0002\u0013\u0011\u0014a\u0006;pa&\u001cG)\u0019;bgR|'/Z'pI\u0016dg*Y7f!\t\u0019dG\u0004\u0002\u0016i%\u0011QGF\u0001\u0007!J,G-\u001a4\n\u0005]B$AB*ue&twM\u0003\u00026-!A!\b\u0001B\u0001B\u0003%1(\u0001\u0002tgB\u0011A\bR\u0007\u0002{)\u0011ahP\u0001\u0004gFd'BA\u0004A\u0015\t\t%)\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0007\u0006\u0019qN]4\n\u0005\u0015k$\u0001D*qCJ\\7+Z:tS>t\u0007\"B$\u0001\t\u0003A\u0015A\u0002\u001fj]&$h\b\u0006\u0003J\u00172k\u0005C\u0001&\u0001\u001b\u0005\u0011\u0001\"B\u0015G\u0001\u0004Q\u0003\"B\u0019G\u0001\u0004\u0011\u0004\"\u0002\u001eG\u0001\u0004Y\u0004\"B(\u0001\t\u0003\u0002\u0016!B<sSR,GCA)[!\r\u0011VkV\u0007\u0002'*\u0011A+P\u0001\ngR\u0014X-Y7j]\u001eL!AV*\u0003!\u0011\u000bG/Y*ue\u0016\fWn\u0016:ji\u0016\u0014\bC\u0001\u001fY\u0013\tIVHA\u0002S_^DQa\u0017(A\u0002q\u000baa\u001d;sK\u0006l\u0007CA/l\u001d\tq\u0016N\u0004\u0002`Q:\u0011\u0001m\u001a\b\u0003C\u001at!AY3\u000e\u0003\rT!\u0001\u001a\n\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0019\u0015BA!C\u0013\t9\u0001)\u0003\u0002?\u007f%\u0011!.P\u0001\ba\u0006\u001c7.Y4f\u0013\taWNA\u0005ECR\fgI]1nK*\u0011!.\u0010\u0005\u0006_\u0002!I\u0001]\u0001\rC\u0012$7*\u00194lC\u000e{gN\u001a\u000b\u0004#F\u001c\b\"\u0002:o\u0001\u0004\t\u0016a\u00013to\")AO\u001ca\u0001k\u0006\u0019Ao[2\u0011\u0005Y\\X\"A<\u000b\u0005aL\u0018!D2p]\u001aLw-\u001e:bi&|gN\u0003\u0002{I\u00051Qn\u001c3fYNL!\u0001`<\u0003\u001fQKg._&bM.\f7i\u001c8gS\u001e\u0004")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaSparkStructuredStreamingWriter.class */
public class KafkaSparkStructuredStreamingWriter implements SparkStructuredStreamingWriter, Logging {
    private final TopicBL topicBL;
    private final String topicDatastoreModelName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public DataStreamWriter<Row> write(Dataset<Row> dataset) {
        TinyKafkaConfig tinyConfig = ConfigManager$.MODULE$.getKafkaConfig().toTinyConfig();
        Option<DatastoreModel<TopicCategory>> byName = this.topicBL.getByName(this.topicDatastoreModelName);
        Tuple2<Option<String>, Seq<TopicModel>> retrieveTopicFieldNameAndTopicModels = KafkaWriters$.MODULE$.retrieveTopicFieldNameAndTopicModels(byName, this.topicBL, this.topicDatastoreModelName);
        if (retrieveTopicFieldNameAndTopicModels == null) {
            throw new MatchError(retrieveTopicFieldNameAndTopicModels);
        }
        Tuple2 tuple2 = new Tuple2((Option) retrieveTopicFieldNameAndTopicModels._1(), (Seq) retrieveTopicFieldNameAndTopicModels._2());
        Option<String> option = (Option) tuple2._1();
        Seq<TopicModel> seq = (Seq) tuple2._2();
        DatastoreModel datastoreModel = (DatastoreModel) byName.get();
        TopicModel topicModel = (TopicModel) seq.head();
        MultiTopicModel$.MODULE$.validateTopicModels(seq);
        logger().info(new KafkaSparkSt$$$$838f6f65a2946eb9201f555b776739f8$$$$nfun$write$1(this, datastoreModel));
        if (datastoreModel instanceof MultiTopicModel) {
            logger().info(new KafkaSparkSt$$$$582c96fdf4f44b461cd5393ad3481d1$$$$nfun$write$2(this, seq, datastoreModel));
        }
        KafkaWriters$.MODULE$.askToCheckOrCreateTopics(seq);
        logger().debug(new KafkaSparkSt$$$$8d88591d53cc1691e89626f5dbab1af$$$$nfun$write$3(this, dataset));
        KafkaWriters$.MODULE$.prepareDfToWrite(dataset, option, seq, topicModel, topicModel.keyFieldName(), topicModel.headersFieldName(), topicModel.valueFieldsNames());
        DataStreamWriter<Row> format = dataset.writeStream().format("kafka");
        return addKafkaConf(option.isDefined() ? format : format.option("topic", topicModel.name()), tinyConfig).option("kafka.compression.type", ((TopicModel) seq.head()).topicCompression().kafkaProp());
    }

    private DataStreamWriter<Row> addKafkaConf(DataStreamWriter<Row> dataStreamWriter, TinyKafkaConfig tinyKafkaConfig) {
        return dataStreamWriter.option("kafka.bootstrap.servers", ((TraversableOnce) tinyKafkaConfig.connections().map(new KafkaSparkStructuredStreamingWriter$$anonfun$1(this), Seq$.MODULE$.canBuildFrom())).mkString(",")).option("kafka.partitioner.class", tinyKafkaConfig.partitioner_fqcn()).option("kafka.batch.size", BoxesRunTime.boxToInteger(tinyKafkaConfig.batch_send_size()).toString()).option("kafka.acks", tinyKafkaConfig.acks()).options(((TraversableOnce) tinyKafkaConfig.others().map(new KafkaSparkSt$$$$15874718397b1bf75e64d75be932da3e$$$$dKafkaConf$1(this), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public KafkaSparkStructuredStreamingWriter(TopicBL topicBL, String str, SparkSession sparkSession) {
        this.topicBL = topicBL;
        this.topicDatastoreModelName = str;
        Logging.class.$init$(this);
    }
}
