package io.rsocket.kotlin.transport.netty;

import io.netty.buffer.ByteBuf;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import io.rsocket.kotlin.DuplexConnection;
import io.rsocket.kotlin.Frame;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;

/* compiled from: NettyDuplexConnection.kt */
@Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��@\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0006\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018��2\u00020\u0001B\u001d\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007¢\u0006\u0002\u0010\bJ\b\u0010\t\u001a\u00020\nH\u0016J\b\u0010\u000b\u001a\u00020\fH\u0016J\b\u0010\r\u001a\u00020\fH\u0016J\u000e\u0010\u000e\u001a\b\u0012\u0004\u0012\u00020\u00100\u000fH\u0016J\u0016\u0010\u0011\u001a\u00020\f2\f\u0010\u0012\u001a\b\u0012\u0004\u0012\u00020\u00100\u0013H\u0016J\u0010\u0010\u0014\u001a\u00020\f2\u0006\u0010\u0012\u001a\u00020\u0010H\u0016R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0015"}, d2 = {"Lio/rsocket/kotlin/transport/netty/NettyDuplexConnection;", "Lio/rsocket/kotlin/DuplexConnection;", "inbound", "Lreactor/ipc/netty/NettyInbound;", "outbound", "Lreactor/ipc/netty/NettyOutbound;", "context", "Lreactor/ipc/netty/NettyContext;", "(Lreactor/ipc/netty/NettyInbound;Lreactor/ipc/netty/NettyOutbound;Lreactor/ipc/netty/NettyContext;)V", "availability", "", "close", "Lio/reactivex/Completable;", "onClose", "receive", "Lio/reactivex/Flowable;", "Lio/rsocket/kotlin/Frame;", "send", "frame", "Lorg/reactivestreams/Publisher;", "sendOne", "rsocket-transport-netty"})
/* loaded from: input_file:io/rsocket/kotlin/transport/netty/NettyDuplexConnection.class */
public final class NettyDuplexConnection implements DuplexConnection {
    private final NettyInbound inbound;
    private final NettyOutbound outbound;
    private final NettyContext context;

    @NotNull
    public Completable send(@NotNull Publisher<Frame> publisher) {
        Intrinsics.checkParameterIsNotNull(publisher, "frame");
        Completable ignoreElements = Flowable.fromPublisher(publisher).concatMap(new Function<T, Publisher<? extends R>>() { // from class: io.rsocket.kotlin.transport.netty.NettyDuplexConnection$send$1
            public final Flowable<Frame> apply(@NotNull Frame frame) {
                Intrinsics.checkParameterIsNotNull(frame, "it");
                return NettyDuplexConnection.this.sendOne(frame).toFlowable();
            }
        }).ignoreElements();
        Intrinsics.checkExpressionValueIsNotNull(ignoreElements, "Flowable.fromPublisher(f…        .ignoreElements()");
        return ignoreElements;
    }

    @NotNull
    public Completable sendOne(@NotNull Frame frame) {
        Intrinsics.checkParameterIsNotNull(frame, "frame");
        Mono then = this.outbound.sendObject(frame.content()).then();
        Intrinsics.checkExpressionValueIsNotNull(then, "outbound.sendObject(frame.content()).then()");
        return ExtKt.toCompletable(then);
    }

    @NotNull
    public Flowable<Frame> receive() {
        Flux map = this.inbound.receive().map(new java.util.function.Function<ByteBuf, Frame>() { // from class: io.rsocket.kotlin.transport.netty.NettyDuplexConnection$receive$1
            @Override // java.util.function.Function
            @NotNull
            public final Frame apply(ByteBuf byteBuf) {
                Frame.Companion companion = Frame.Companion;
                ByteBuf retain = byteBuf.retain();
                Intrinsics.checkExpressionValueIsNotNull(retain, "buf.retain()");
                return companion.from(retain);
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(map, "inbound.receive()\n      …rame.from(buf.retain()) }");
        return ExtKt.toFlowable(map);
    }

    public double availability() {
        return this.context.isDisposed() ? 0.0d : 1.0d;
    }

    @NotNull
    public Completable close() {
        Completable fromRunnable = Completable.fromRunnable(new Runnable() { // from class: io.rsocket.kotlin.transport.netty.NettyDuplexConnection$close$1
            @Override // java.lang.Runnable
            public final void run() {
                NettyContext nettyContext;
                NettyContext nettyContext2;
                nettyContext = NettyDuplexConnection.this.context;
                if (nettyContext.isDisposed()) {
                    return;
                }
                nettyContext2 = NettyDuplexConnection.this.context;
                nettyContext2.channel().close();
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(fromRunnable, "Completable.fromRunnable…          }\n            }");
        return fromRunnable;
    }

    @NotNull
    public Completable onClose() {
        Mono onClose = this.context.onClose();
        Intrinsics.checkExpressionValueIsNotNull(onClose, "context.onClose()");
        return ExtKt.toCompletable(onClose);
    }

    public NettyDuplexConnection(@NotNull NettyInbound nettyInbound, @NotNull NettyOutbound nettyOutbound, @NotNull NettyContext nettyContext) {
        Intrinsics.checkParameterIsNotNull(nettyInbound, "inbound");
        Intrinsics.checkParameterIsNotNull(nettyOutbound, "outbound");
        Intrinsics.checkParameterIsNotNull(nettyContext, "context");
        this.inbound = nettyInbound;
        this.outbound = nettyOutbound;
        this.context = nettyContext;
    }
}
