package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import com.typesafe.config.Config;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroConverterExpression;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.bl.TopicBL;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.TopicModel;
import it.agilelab.bigdata.wasp.core.models.configuration.TinyKafkaConfig;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005e\u0001B\u0001\u0003\u0001M\u00111eS1gW\u0006\u001c\u0006/\u0019:l'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ,sSR,'O\u0003\u0002\u0004\t\u0005)1.\u00194lC*\u0011QAB\u0001\ba2,x-\u001b8t\u0015\t9\u0001\"A\u0003ta\u0006\u00148N\u0003\u0002\n\u0015\u0005I1m\u001c8tk6,'o\u001d\u0006\u0003\u00171\tAa^1ta*\u0011QBD\u0001\bE&<G-\u0019;b\u0015\ty\u0001#\u0001\u0005bO&dW\r\\1c\u0015\u0005\t\u0012AA5u\u0007\u0001\u0019B\u0001\u0001\u000b\u001bAA\u0011Q\u0003G\u0007\u0002-)\tq#A\u0003tG\u0006d\u0017-\u0003\u0002\u001a-\t1\u0011I\\=SK\u001a\u0004\"a\u0007\u0010\u000e\u0003qQ!!\b\u0004\u0002\u000f]\u0014\u0018\u000e^3sg&\u0011q\u0004\b\u0002\u001f'B\f'o[*ueV\u001cG/\u001e:fIN#(/Z1nS:<wK]5uKJ\u0004\"!\t\u0014\u000e\u0003\tR!a\t\u0013\u0002\u000f1|wmZ5oO*\u0011QEC\u0001\u0005G>\u0014X-\u0003\u0002(E\t9Aj\\4hS:<\u0007\u0002C\u0015\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0016\u0002\u000fQ|\u0007/[2C\u0019B\u00111FL\u0007\u0002Y)\u0011Q\u0006J\u0001\u0003E2L!a\f\u0017\u0003\u000fQ{\u0007/[2C\u0019\"A\u0011\u0007\u0001B\u0001B\u0003%!'A\fu_BL7\rR1uCN$xN]3N_\u0012,GNT1nKB\u00111G\u000e\b\u0003+QJ!!\u000e\f\u0002\rA\u0013X\rZ3g\u0013\t9\u0004H\u0001\u0004TiJLgn\u001a\u0006\u0003kYA\u0001B\u000f\u0001\u0003\u0002\u0003\u0006IaO\u0001\u0003gN\u0004\"\u0001\u0010#\u000e\u0003uR!AP \u0002\u0007M\fHN\u0003\u0002\b\u0001*\u0011\u0011IQ\u0001\u0007CB\f7\r[3\u000b\u0003\r\u000b1a\u001c:h\u0013\t)UH\u0001\u0007Ta\u0006\u00148nU3tg&|g\u000eC\u0003H\u0001\u0011\u0005\u0001*\u0001\u0004=S:LGO\u0010\u000b\u0005\u0013.cU\n\u0005\u0002K\u00015\t!\u0001C\u0003*\r\u0002\u0007!\u0006C\u00032\r\u0002\u0007!\u0007C\u0003;\r\u0002\u00071\bC\u0003P\u0001\u0011\u0005\u0003+A\u0003xe&$X\r\u0006\u0002R5B\u0019!+V,\u000e\u0003MS!\u0001V\u001f\u0002\u0013M$(/Z1nS:<\u0017B\u0001,T\u0005A!\u0015\r^1TiJ,\u0017-\\,sSR,'\u000f\u0005\u0002=1&\u0011\u0011,\u0010\u0002\u0004%><\b\"B.O\u0001\u0004a\u0016AB:ue\u0016\fW\u000e\u0005\u0002^W:\u0011a,\u001b\b\u0003?\"t!\u0001Y4\u000f\u0005\u00054gB\u00012f\u001b\u0005\u0019'B\u00013\u0013\u0003\u0019a$o\\8u}%\t1)\u0003\u0002B\u0005&\u0011q\u0001Q\u0005\u0003}}J!A[\u001f\u0002\u000fA\f7m[1hK&\u0011A.\u001c\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T!A[\u001f\t\u000b=\u0004A\u0011\u00029\u0002)\r|gN^3siN#(/Z1n\r>\u0014\u0018I\u001e:p)%\t8\u000f\u001f>}\u0003\u001f\t\t\u0002\u0005\u0002sW:\u0011A(\u001b\u0005\u0006i:\u0004\r!^\u0001\rW\u0016Lh)[3mI:\u000bW.\u001a\t\u0004+Y\u0014\u0014BA<\u0017\u0005\u0019y\u0005\u000f^5p]\")\u0011P\u001ca\u0001k\u0006\u0001\u0002.Z1eKJ\u001ch)[3mI:\u000bW.\u001a\u0005\u0006w:\u0004\r!^\u0001\u000fi>\u0004\u0018n\u0019$jK2$g*Y7f\u0011\u0015ih\u000e1\u0001\u007f\u0003A1\u0018\r\\;f\r&,G\u000eZ:OC6,7\u000fE\u0002\u0016m~\u0004R!!\u0001\u0002\nIrA!a\u0001\u0002\b9\u0019!-!\u0002\n\u0003]I!A\u001b\f\n\t\u0005-\u0011Q\u0002\u0002\u0004'\u0016\f(B\u00016\u0017\u0011\u0015Yf\u000e1\u0001]\u0011\u001d\t\u0019B\u001ca\u0001\u0003+\t1\u0003\u001d:pi>$\u0018\u0010]3U_BL7-T8eK2\u0004B!a\u0006\u0002\u001e5\u0011\u0011\u0011\u0004\u0006\u0004\u00037!\u0013AB7pI\u0016d7/\u0003\u0003\u0002 \u0005e!A\u0003+pa&\u001cWj\u001c3fY\"9\u00111\u0005\u0001\u0005\n\u0005\u0015\u0012\u0001F2p]Z,'\u000f^*ue\u0016\fWNR8s\u0015N|g\u000eF\u0007r\u0003O\tI#a\u000b\u0002.\u0005=\u0012\u0011\u0007\u0005\u0007i\u0006\u0005\u0002\u0019A;\t\re\f\t\u00031\u0001v\u0011\u0019Y\u0018\u0011\u0005a\u0001k\"1Q0!\tA\u0002yDaaWA\u0011\u0001\u0004a\u0006\u0002CA\n\u0003C\u0001\r!!\u0006\t\u000f\u0005U\u0002\u0001\"\u0003\u00028\u0005I2m\u001c8wKJ$8\u000b\u001e:fC64uN\u001d)mC&tG/\u001a=u)5\t\u0018\u0011HA\u001e\u0003{\ty$!\u0011\u0002D!1A/a\rA\u0002UDa!_A\u001a\u0001\u0004)\bBB>\u00024\u0001\u0007Q\u000f\u0003\u0004~\u0003g\u0001\rA \u0005\u00077\u0006M\u0002\u0019\u0001/\t\u0011\u0005M\u00111\u0007a\u0001\u0003+Aq!a\u0012\u0001\t\u0013\tI%\u0001\fd_:4XM\u001d;TiJ,\u0017-\u001c$pe\nKg.\u0019:z)5\t\u00181JA'\u0003\u001f\n\t&a\u0015\u0002V!1A/!\u0012A\u0002UDa!_A#\u0001\u0004)\bBB>\u0002F\u0001\u0007Q\u000f\u0003\u0004~\u0003\u000b\u0002\rA \u0005\u00077\u0006\u0015\u0003\u0019\u0001/\t\u0011\u0005M\u0011Q\ta\u0001\u0003+Aq!!\u0017\u0001\t\u0013\tY&A\rbI\u0012$v\u000e]5d\u001d\u0006lWm\u00115fG.LeMT3fI\u0016$Gc\u0002/\u0002^\u0005}\u0013Q\r\u0005\u0007w\u0006]\u0003\u0019A;\t\u0011\u0005\u0005\u0014q\u000ba\u0001\u0003G\na\u0001^8qS\u000e\u001c\bCBA\u0001\u0003\u0013\t)\u0002\u0003\u0004\\\u0003/\u0002\r\u0001\u0018\u0005\b\u0003S\u0002A\u0011BA6\u00031\tG\rZ&bM.\f7i\u001c8g)\u0015\t\u0016QNA9\u0011\u001d\ty'a\u001aA\u0002E\u000b1\u0001Z:x\u0011!\t\u0019(a\u001aA\u0002\u0005U\u0014a\u0001;lGB!\u0011qOA?\u001b\t\tIH\u0003\u0003\u0002|\u0005e\u0011!D2p]\u001aLw-\u001e:bi&|g.\u0003\u0003\u0002��\u0005e$a\u0004+j]f\\\u0015MZ6b\u0007>tg-[4")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaSparkStructuredStreamingWriter.class */
public class KafkaSparkStructuredStreamingWriter implements SparkStructuredStreamingWriter, Logging {
    public final TopicBL it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingWriter$$topicBL;
    public final String it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingWriter$$topicDatastoreModelName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Removed duplicated region for block: B:46:0x0355  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x00ee  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.spark.sql.streaming.DataStreamWriter<org.apache.spark.sql.Row> write(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> r11) {
        /*
            Method dump skipped, instructions count: 944
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkStructuredStreamingWriter.write(org.apache.spark.sql.Dataset):org.apache.spark.sql.streaming.DataStreamWriter");
    }

    private Dataset<Row> convertStreamForAvro(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        Seq seq = (Seq) option4.getOrElse(new KafkaSparkStructuredStreamingWriter$$anonfun$3(this, dataset));
        StructType apply = StructType$.MODULE$.apply((Seq) seq.map(new KafkaSparkStructuredStreamingWriter$$anonfun$4(this, dataset.schema()), Seq$.MODULE$.canBuildFrom()));
        Some some = topicModel.useAvroSchemaManager() ? new Some(ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
        Seq seq2 = (Seq) seq.map(new KafkaSparkStructuredStreamingWriter$$anonfun$5(this), Seq$.MODULE$.canBuildFrom());
        String name = topicModel.name();
        Dataset<Row> select = dataset.select((Seq) ((TraversableOnce) ((TraversableLike) Option$.MODULE$.option2Iterable(option.map(new KafkaSparkStructuredStreamingWriter$$anonfun$8(this))).$plus$plus(Option$.MODULE$.option2Iterable(option2.map(new KafkaSparkStructuredStreamingWriter$$anonfun$9(this))), Iterable$.MODULE$.canBuildFrom())).$plus$plus(Option$.MODULE$.option2Iterable(option3.map(new KafkaSparkStructuredStreamingWriter$$anonfun$10(this))), Iterable$.MODULE$.canBuildFrom())).toSeq().$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{new Column((AvroConverterExpression) (topicModel.useAvroSchemaManager() ? new KafkaSparkStructuredStreamingWriter$$anonfun$6(this, name, "wasp", new Schema.Parser().parse(topicModel.getJsonSchema()), (Config) some.get()) : new KafkaSparkStructuredStreamingWriter$$anonfun$7(this, name, "wasp", new Some(topicModel.getJsonSchema()))).apply(seq2, apply)).as("value")})), Seq$.MODULE$.canBuildFrom()));
        logger().debug(new KafkaSparkSt$$$$f931fda93d9eb388257444e8e9c769$$$$eamForAvro$1(this, select));
        return select;
    }

    private Dataset<Row> convertStreamForJson(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        List list = (List) ((SeqLike) ((List) option.map(new KafkaSparkStructuredStreamingWriter$$anonfun$13(this)).toList().$plus$plus(option2.map(new KafkaSparkStructuredStreamingWriter$$anonfun$14(this)).toList(), List$.MODULE$.canBuildFrom())).$plus$plus(option3.map(new KafkaSparkStructuredStreamingWriter$$anonfun$15(this)).toList(), List$.MODULE$.canBuildFrom())).$colon$plus(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"to_json(struct(", ")) AS value"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{((TraversableOnce) option4.map(new KafkaSparkStructuredStreamingWriter$$anonfun$11(this)).getOrElse(new KafkaSparkStructuredStreamingWriter$$anonfun$12(this))).mkString(", ")})), List$.MODULE$.canBuildFrom());
        logger().debug(new KafkaSparkSt$$$$f92cbcd15aee4fcec78c7409ab92c7d$$$$eamForJson$1(this, list));
        return dataset.selectExpr(list);
    }

    private Dataset<Row> convertStreamForPlaintext(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        Predef$.MODULE$.require(option4.isDefined() && ((SeqLike) option4.get()).size() == 1, new KafkaSparkSt$$$$11f993fd5cd82cb777d2dce394b9ce2$$$$rPlaintext$1(this, option4));
        String str = (String) ((IterableLike) option4.get()).head();
        Option find = dataset.schema().find(new KafkaSparkStructuredStreamingWriter$$anonfun$16(this, str));
        Predef$.MODULE$.require(find.isDefined(), new KafkaSparkSt$$$$ea40ab559c575716e2fc173b16ab7d0$$$$rPlaintext$2(this, dataset, str));
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$ predef$ = Predef$.MODULE$;
        StringType$ stringType$ = StringType$.MODULE$;
        predef$.require(dataType != null ? dataType.equals(stringType$) : stringType$ == null, new KafkaSparkSt$$$$96626d2a9986f32e676da5e4524c48ad$$$$rPlaintext$3(this, str, dataType));
        List list = (List) ((SeqLike) ((List) option.map(new KafkaSparkStructuredStreamingWriter$$anonfun$17(this)).toList().$plus$plus(option2.map(new KafkaSparkStructuredStreamingWriter$$anonfun$18(this)).toList(), List$.MODULE$.canBuildFrom())).$plus$plus(option3.map(new KafkaSparkStructuredStreamingWriter$$anonfun$19(this)).toList(), List$.MODULE$.canBuildFrom())).$colon$plus(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " AS value_string"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})), List$.MODULE$.canBuildFrom());
        logger().debug(new KafkaSparkSt$$$$b5d93998ca1133f36bbefadcadacd8c$$$$rPlaintext$4(this, list));
        return dataset.selectExpr(list).withColumn("value", functions$.MODULE$.udf(new KafkaSparkStructuredStreamingWriter$$anonfun$20(this), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaSparkStructuredStreamingWriter.class.getClassLoader()), new TypeCreator(this) { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$bd9ebc0fb8af6b4821fb19e955c49$$$$pecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{mirror.staticClass("scala.Byte").asType().toTypeConstructor()})));
            }
        }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaSparkStructuredStreamingWriter.class.getClassLoader()), new TypeCreator(this) { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$215be2727edb16e3c6765865f9df32e$$$$pecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("value_string")}))).drop("value_string");
    }

    private Dataset<Row> convertStreamForBinary(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        Predef$.MODULE$.require(option4.isDefined() && ((SeqLike) option4.get()).size() == 1, new KafkaSparkSt$$$$81b6276541cdb983d7526da4623b4c$$$$mForBinary$1(this, option4));
        String str = (String) ((IterableLike) option4.get()).head();
        Option find = dataset.schema().find(new KafkaSparkStructuredStreamingWriter$$anonfun$21(this, str));
        Predef$.MODULE$.require(find.isDefined(), new KafkaSparkSt$$$$6e7ada4b5dba97c9b77845b1f8315ca8$$$$mForBinary$2(this, dataset, str));
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$ predef$ = Predef$.MODULE$;
        BinaryType$ binaryType$ = BinaryType$.MODULE$;
        predef$.require(dataType != null ? dataType.equals(binaryType$) : binaryType$ == null, new KafkaSparkSt$$$$162688f172d49832de7c54e065a297$$$$mForBinary$3(this, str, dataType));
        List list = (List) ((SeqLike) ((List) option.map(new KafkaSparkStructuredStreamingWriter$$anonfun$22(this)).toList().$plus$plus(option2.map(new KafkaSparkStructuredStreamingWriter$$anonfun$23(this)).toList(), List$.MODULE$.canBuildFrom())).$plus$plus(option3.map(new KafkaSparkStructuredStreamingWriter$$anonfun$24(this)).toList(), List$.MODULE$.canBuildFrom())).$colon$plus(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " AS value"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})), List$.MODULE$.canBuildFrom());
        logger().debug(new KafkaSparkSt$$$$e31b64d0d6b7c574c6b3bfd956ed26e8$$$$mForBinary$4(this, list));
        return dataset.selectExpr(list);
    }

    private Dataset<Row> addTopicNameCheckIfNeeded(Option<String> option, Seq<TopicModel> seq, Dataset<Row> dataset) {
        if (option.isEmpty()) {
            return dataset;
        }
        KafkaSparkStructuredStreamingWriter$$anonfun$26 kafkaSparkStructuredStreamingWriter$$anonfun$26 = new KafkaSparkStructuredStreamingWriter$$anonfun$26(this, ((TraversableOnce) seq.map(new KafkaSparkStructuredStreamingWriter$$anonfun$25(this), Seq$.MODULE$.canBuildFrom())).toSet());
        return dataset.withColumn("topic", functions$.MODULE$.udf(kafkaSparkStructuredStreamingWriter$$anonfun$26, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaSparkStructuredStreamingWriter.class.getClassLoader()), new TypeCreator(this) { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$e7abdd45f3217579d0d8e55a903a1ade$$$$pecreator3$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaSparkStructuredStreamingWriter.class.getClassLoader()), new TypeCreator(this) { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkSt$$$$4b5ab85620f1ac7aa7a7f6d4d48d36a$$$$pecreator4$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("topic")})));
    }

    private DataStreamWriter<Row> addKafkaConf(DataStreamWriter<Row> dataStreamWriter, TinyKafkaConfig tinyKafkaConfig) {
        return dataStreamWriter.option("kafka.bootstrap.servers", ((TraversableOnce) tinyKafkaConfig.connections().map(new KafkaSparkStructuredStreamingWriter$$anonfun$27(this), Seq$.MODULE$.canBuildFrom())).mkString(",")).option("value.serializer", tinyKafkaConfig.default_encoder()).option("key.serializer", tinyKafkaConfig.encoder_fqcn()).option("kafka.partitioner.class", tinyKafkaConfig.partitioner_fqcn()).option("kafka.batch.size", BoxesRunTime.boxToInteger(tinyKafkaConfig.batch_send_size()).toString()).option("kafka.acks", tinyKafkaConfig.acks()).options(((TraversableOnce) tinyKafkaConfig.others().map(new KafkaSparkSt$$$$15874718397b1bf75e64d75be932da3e$$$$dKafkaConf$1(this), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public KafkaSparkStructuredStreamingWriter(TopicBL topicBL, String str, SparkSession sparkSession) {
        this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingWriter$$topicBL = topicBL;
        this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingWriter$$topicDatastoreModelName = str;
        Logging.class.$init$(this);
    }
}
