package io.rsocket.kotlin.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import io.rsocket.kotlin.DuplexConnection;
import io.rsocket.kotlin.Frame;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;

/* compiled from: WebsocketDuplexConnection.kt */
@Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��R\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0006\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018��2\u00020\u0001B\u001d\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007¢\u0006\u0002\u0010\bJ\b\u0010\t\u001a\u00020\nH\u0016J\b\u0010\u000b\u001a\u00020\fH\u0016J \u0010\r\u001a\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0012H\u0002J\b\u0010\u0014\u001a\u00020\fH\u0016J\u000e\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u00170\u0016H\u0016J\u0016\u0010\u0018\u001a\u00020\f2\f\u0010\u0019\u001a\b\u0012\u0004\u0012\u00020\u00170\u001aH\u0016J\u0010\u0010\u001b\u001a\u00020\f2\u0006\u0010\u0019\u001a\u00020\u0017H\u0016R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001c"}, d2 = {"Lio/rsocket/kotlin/transport/netty/WebsocketDuplexConnection;", "Lio/rsocket/kotlin/DuplexConnection;", "inbound", "Lreactor/ipc/netty/NettyInbound;", "outbound", "Lreactor/ipc/netty/NettyOutbound;", "context", "Lreactor/ipc/netty/NettyContext;", "(Lreactor/ipc/netty/NettyInbound;Lreactor/ipc/netty/NettyOutbound;Lreactor/ipc/netty/NettyContext;)V", "availability", "", "close", "Lio/reactivex/Completable;", "encodeLength", "", "byteBuf", "Lio/netty/buffer/ByteBuf;", "offset", "", "length", "onClose", "receive", "Lio/reactivex/Flowable;", "Lio/rsocket/kotlin/Frame;", "send", "frame", "Lorg/reactivestreams/Publisher;", "sendOne", "rsocket-transport-netty"})
/* loaded from: input_file:io/rsocket/kotlin/transport/netty/WebsocketDuplexConnection.class */
public final class WebsocketDuplexConnection implements DuplexConnection {
    private final NettyInbound inbound;
    private final NettyOutbound outbound;
    private final NettyContext context;

    @NotNull
    public Completable send(@NotNull Publisher<Frame> publisher) {
        Intrinsics.checkParameterIsNotNull(publisher, "frame");
        Completable ignoreElements = Flowable.fromPublisher(publisher).concatMap(new Function<T, Publisher<? extends R>>() { // from class: io.rsocket.kotlin.transport.netty.WebsocketDuplexConnection$send$1
            public final Flowable<Frame> apply(@NotNull Frame frame) {
                Intrinsics.checkParameterIsNotNull(frame, "it");
                return WebsocketDuplexConnection.this.sendOne(frame).toFlowable();
            }
        }).ignoreElements();
        Intrinsics.checkExpressionValueIsNotNull(ignoreElements, "Flowable.fromPublisher(f…        .ignoreElements()");
        return ignoreElements;
    }

    @NotNull
    public Completable sendOne(@NotNull Frame frame) {
        Intrinsics.checkParameterIsNotNull(frame, "frame");
        Mono then = this.outbound.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(3))).then();
        Intrinsics.checkExpressionValueIsNotNull(then, "outbound.sendObject(\n   …)\n                .then()");
        return ExtKt.toCompletable(then);
    }

    @NotNull
    public Flowable<Frame> receive() {
        Flux map = this.inbound.receive().map(new java.util.function.Function<ByteBuf, Frame>() { // from class: io.rsocket.kotlin.transport.netty.WebsocketDuplexConnection$receive$1
            @Override // java.util.function.Function
            @NotNull
            public final Frame apply(ByteBuf byteBuf) {
                NettyContext nettyContext;
                nettyContext = WebsocketDuplexConnection.this.context;
                ByteBuf compositeBuffer = nettyContext.channel().alloc().compositeBuffer();
                ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(new byte[3]);
                WebsocketDuplexConnection websocketDuplexConnection = WebsocketDuplexConnection.this;
                Intrinsics.checkExpressionValueIsNotNull(wrappedBuffer, "length");
                websocketDuplexConnection.encodeLength(wrappedBuffer, 0, byteBuf.readableBytes());
                compositeBuffer.addComponents(true, new ByteBuf[]{wrappedBuffer, byteBuf.retain()});
                Frame.Companion companion = Frame.Companion;
                Intrinsics.checkExpressionValueIsNotNull(compositeBuffer, "composite");
                return companion.from(compositeBuffer);
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(map, "inbound.receive()\n      …posite)\n                }");
        return ExtKt.toFlowable(map);
    }

    public double availability() {
        return this.context.isDisposed() ? 0.0d : 1.0d;
    }

    @NotNull
    public Completable close() {
        Completable fromRunnable = Completable.fromRunnable(new Runnable() { // from class: io.rsocket.kotlin.transport.netty.WebsocketDuplexConnection$close$1
            @Override // java.lang.Runnable
            public final void run() {
                NettyContext nettyContext;
                NettyContext nettyContext2;
                nettyContext = WebsocketDuplexConnection.this.context;
                if (nettyContext.isDisposed()) {
                    return;
                }
                nettyContext2 = WebsocketDuplexConnection.this.context;
                nettyContext2.channel().close();
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(fromRunnable, "Completable.fromRunnable…          }\n            }");
        return fromRunnable;
    }

    @NotNull
    public Completable onClose() {
        Mono onClose = this.context.onClose();
        Intrinsics.checkExpressionValueIsNotNull(onClose, "context.onClose()");
        return ExtKt.toCompletable(onClose);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void encodeLength(ByteBuf byteBuf, int i, int i2) {
        if ((i2 & (16777215 ^ (-1))) != 0) {
            throw new IllegalArgumentException("Length is larger than 24 bits");
        }
        byteBuf.setByte(i, i2 >> 16);
        byteBuf.setByte(i + 1, i2 >> 8);
        byteBuf.setByte(i + 2, i2);
    }

    public WebsocketDuplexConnection(@NotNull NettyInbound nettyInbound, @NotNull NettyOutbound nettyOutbound, @NotNull NettyContext nettyContext) {
        Intrinsics.checkParameterIsNotNull(nettyInbound, "inbound");
        Intrinsics.checkParameterIsNotNull(nettyOutbound, "outbound");
        Intrinsics.checkParameterIsNotNull(nettyContext, "context");
        this.inbound = nettyInbound;
        this.outbound = nettyOutbound;
        this.context = nettyContext;
    }
}
