package io.rsocket.rpc.showcase.service;

import com.google.protobuf.Empty;
import io.netty.buffer.ByteBuf;
import io.rsocket.rpc.showcase.service.protobuf.SimpleRequest;
import io.rsocket.rpc.showcase.service.protobuf.SimpleResponse;
import io.rsocket.rpc.showcase.service.protobuf.SimpleService;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/rpc/showcase/service/DefaultSimpleService.class */
public class DefaultSimpleService implements SimpleService {
    @Override // io.rsocket.rpc.showcase.service.protobuf.SimpleService
    public Mono<Empty> fireAndForget(SimpleRequest simpleRequest, ByteBuf byteBuf) {
        System.out.println("got message -> " + simpleRequest.getRequestMessage());
        return Mono.just(Empty.getDefaultInstance());
    }

    @Override // io.rsocket.rpc.showcase.service.protobuf.SimpleService
    public Mono<SimpleResponse> requestReply(SimpleRequest simpleRequest, ByteBuf byteBuf) {
        return Mono.fromCallable(() -> {
            return SimpleResponse.newBuilder().setResponseMessage("we got the message -> " + simpleRequest.getRequestMessage()).m188build();
        });
    }

    @Override // io.rsocket.rpc.showcase.service.protobuf.SimpleService
    public Mono<SimpleResponse> streamingRequestSingleResponse(Publisher<SimpleRequest> publisher, ByteBuf byteBuf) {
        return Flux.from(publisher).windowTimeout(10, Duration.ofSeconds(500L)).take(1L).flatMap(Function.identity()).reduce(new ConcurrentHashMap(), (concurrentHashMap, simpleRequest) -> {
            for (char c : simpleRequest.getRequestMessage().toCharArray()) {
                ((AtomicInteger) concurrentHashMap.computeIfAbsent(Character.valueOf(c), ch -> {
                    return new AtomicInteger();
                })).incrementAndGet();
            }
            return concurrentHashMap;
        }).map(concurrentHashMap2 -> {
            StringBuilder sb = new StringBuilder();
            concurrentHashMap2.forEach((ch, atomicInteger) -> {
                sb.append("character -> ").append(ch).append(", count -> ").append(atomicInteger.get()).append("\n");
            });
            return SimpleResponse.newBuilder().setResponseMessage(sb.toString()).m188build();
        });
    }

    @Override // io.rsocket.rpc.showcase.service.protobuf.SimpleService
    public Flux<SimpleResponse> requestStream(SimpleRequest simpleRequest, ByteBuf byteBuf) {
        String requestMessage = simpleRequest.getRequestMessage();
        return Flux.interval(Duration.ofMillis(200L)).onBackpressureDrop().map(l -> {
            return l + " - got message - " + requestMessage;
        }).map(str -> {
            return SimpleResponse.newBuilder().setResponseMessage(str).m188build();
        });
    }

    @Override // io.rsocket.rpc.showcase.service.protobuf.SimpleService
    public Flux<SimpleResponse> streamingRequestAndResponse(Publisher<SimpleRequest> publisher, ByteBuf byteBuf) {
        return Flux.from(publisher).flatMap(simpleRequest -> {
            return requestReply(simpleRequest, byteBuf);
        });
    }
}
