package com.netease.nim.camellia.redis.proxy.command.async;

import com.netease.nim.camellia.redis.proxy.command.AuthCommandProcessor;
import com.netease.nim.camellia.redis.proxy.command.ClientCommandUtil;
import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.command.HelloCommandUtil;
import com.netease.nim.camellia.redis.proxy.command.async.bigkey.BigKeyHunter;
import com.netease.nim.camellia.redis.proxy.command.async.connectlimit.ConnectLimiterHolder;
import com.netease.nim.camellia.redis.proxy.command.async.converter.Converters;
import com.netease.nim.camellia.redis.proxy.command.async.hotkey.HotKeyHunter;
import com.netease.nim.camellia.redis.proxy.command.async.hotkey.HotKeyHunterManager;
import com.netease.nim.camellia.redis.proxy.command.async.hotkeycache.HotKeyCache;
import com.netease.nim.camellia.redis.proxy.command.async.hotkeycache.HotKeyCacheManager;
import com.netease.nim.camellia.redis.proxy.command.async.hotkeycache.HotValue;
import com.netease.nim.camellia.redis.proxy.command.async.info.ProxyInfoUtils;
import com.netease.nim.camellia.redis.proxy.command.async.interceptor.CommandInterceptResponse;
import com.netease.nim.camellia.redis.proxy.command.async.interceptor.CommandInterceptor;
import com.netease.nim.camellia.redis.proxy.command.async.spendtime.CommandSpendTimeConfig;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.monitor.ChannelMonitor;
import com.netease.nim.camellia.redis.proxy.monitor.RedisMonitor;
import com.netease.nim.camellia.redis.proxy.netty.ChannelInfo;
import com.netease.nim.camellia.redis.proxy.reply.BulkReply;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.reply.StatusReply;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/nim/camellia/redis/proxy/command/async/CommandsTransponder.class */
public class CommandsTransponder {
    private static final Logger logger = LoggerFactory.getLogger(CommandsTransponder.class);
    private final AuthCommandProcessor authCommandProcessor;
    private final AsyncCamelliaRedisTemplateChooser chooser;
    private final CommandSpendTimeConfig commandSpendTimeConfig;
    private final CommandInterceptor commandInterceptor;
    private final HotKeyHunterManager hotKeyHunterManager;
    private final HotKeyCacheManager hotKeyCacheManager;
    private final BigKeyHunter bigKeyHunter;
    private final Converters converters;
    private boolean eventLoopSetSuccess = false;

    public CommandsTransponder(AsyncCamelliaRedisTemplateChooser asyncCamelliaRedisTemplateChooser, CommandInvokeConfig commandInvokeConfig) {
        this.chooser = asyncCamelliaRedisTemplateChooser;
        this.authCommandProcessor = commandInvokeConfig.getAuthCommandProcessor();
        this.commandSpendTimeConfig = commandInvokeConfig.getCommandSpendTimeConfig();
        this.commandInterceptor = commandInvokeConfig.getCommandInterceptor();
        this.hotKeyHunterManager = commandInvokeConfig.getHotKeyHunterManager();
        this.hotKeyCacheManager = commandInvokeConfig.getHotKeyCacheManager();
        this.bigKeyHunter = commandInvokeConfig.getBigKeyHunter();
        this.converters = commandInvokeConfig.getConverters();
    }

    public void transpond(ChannelInfo channelInfo, List<Command> list) {
        CommandInterceptResponse commandInterceptResponse;
        HotKeyHunter hotKeyHunter;
        if (!this.eventLoopSetSuccess) {
            RedisClientHub.updateEventLoop(channelInfo.getCtx().channel().eventLoop());
            this.eventLoopSetSuccess = true;
        }
        try {
            boolean z = false;
            AsyncTaskQueue asyncTaskQueue = channelInfo.getAsyncTaskQueue();
            if (logger.isDebugEnabled()) {
                ArrayList arrayList = new ArrayList(list.size());
                Iterator<Command> it = list.iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().getName());
                }
                logger.debug("receive commands, commands.size = {}, consid = {}, commands = {}", new Object[]{Integer.valueOf(list.size()), asyncTaskQueue.getChannelInfo().getConsid(), arrayList});
            }
            ArrayList arrayList2 = new ArrayList(list.size());
            ChannelHandlerContext ctx = channelInfo.getCtx();
            boolean z2 = this.commandInterceptor != null;
            for (Command command : list) {
                command.setChannelInfo(channelInfo);
                if (RedisMonitor.isMonitorEnable()) {
                    RedisMonitor.incr(channelInfo.getBid(), channelInfo.getBgroup(), command.getName());
                }
                if (this.hotKeyHunterManager != null && (hotKeyHunter = this.hotKeyHunterManager.get(channelInfo.getBid(), channelInfo.getBgroup())) != null) {
                    try {
                        List<byte[]> keys = command.getKeys();
                        if (keys != null) {
                            hotKeyHunter.incr(keys);
                        }
                    } catch (Exception e) {
                        ErrorLogCollector.collect(CommandsTransponder.class, "hot key hunter error", e);
                    }
                }
                if (this.converters != null) {
                    try {
                        this.converters.convertRequest(command);
                    } catch (Exception e2) {
                        ErrorLogCollector.collect(CommandsTransponder.class, "convert request error", e2);
                    }
                }
                AsyncTask asyncTask = new AsyncTask(asyncTaskQueue, command, this.commandSpendTimeConfig, this.bigKeyHunter, this.converters);
                if (!asyncTaskQueue.add(asyncTask)) {
                    asyncTaskQueue.clear();
                    logger.warn("AsyncTaskQueue full, client connect will be disconnect, remote.ip = {}", ctx.channel().remoteAddress());
                    ctx.writeAndFlush(ErrorReply.TOO_BUSY).addListener(channelFuture -> {
                        ctx.close();
                    });
                    return;
                }
                if (z2) {
                    try {
                        commandInterceptResponse = this.commandInterceptor.check(command);
                    } catch (Exception e3) {
                        String str = "ERR command intercept error [" + e3.getMessage() + "]";
                        ErrorLogCollector.collect(CommandsTransponder.class, str, e3);
                        commandInterceptResponse = new CommandInterceptResponse(false, str);
                    }
                    if (!commandInterceptResponse.isPass()) {
                        String errorMsg = commandInterceptResponse.getErrorMsg();
                        if (errorMsg == null) {
                            errorMsg = CommandInterceptResponse.DEFAULT_FAIL.getErrorMsg();
                        }
                        asyncTask.replyCompleted(new ErrorReply(errorMsg));
                        z = true;
                    }
                }
                RedisCommand redisCommand = command.getRedisCommand();
                if (redisCommand == RedisCommand.PING) {
                    asyncTask.replyCompleted(StatusReply.PONG);
                    z = true;
                } else if (redisCommand == RedisCommand.AUTH) {
                    boolean z3 = (channelInfo.getBid() == null || channelInfo.getBgroup() == null) ? false : true;
                    Reply invokeAuthCommand = this.authCommandProcessor.invokeAuthCommand(channelInfo, command);
                    if (!z3 && !checkConnectLimit(channelInfo)) {
                        return;
                    }
                    asyncTask.replyCompleted(invokeAuthCommand);
                    z = true;
                } else if (redisCommand == RedisCommand.HELLO) {
                    boolean z4 = (channelInfo.getBid() == null || channelInfo.getBgroup() == null) ? false : true;
                    Reply invokeHelloCommand = HelloCommandUtil.invokeHelloCommand(channelInfo, this.authCommandProcessor, command);
                    if (!z4 && !checkConnectLimit(channelInfo)) {
                        return;
                    }
                    asyncTask.replyCompleted(invokeHelloCommand);
                    z = true;
                } else if (this.authCommandProcessor.isPasswordRequired() && channelInfo.getChannelStats() != ChannelInfo.ChannelStats.AUTH_OK) {
                    asyncTask.replyCompleted(ErrorReply.NO_AUTH);
                    z = true;
                } else if (redisCommand == RedisCommand.SELECT) {
                    if (command.getObjects().length != 2) {
                        asyncTask.replyCompleted(ErrorReply.argNumWrong(redisCommand));
                    } else if ("0".equals(Utils.bytesToString(command.getObjects()[1]))) {
                        asyncTask.replyCompleted(StatusReply.OK);
                    } else {
                        asyncTask.replyCompleted(new ErrorReply("ERR DB index is out of range"));
                    }
                    z = true;
                } else if (redisCommand == RedisCommand.INFO) {
                    CompletableFuture<Reply> infoReply = ProxyInfoUtils.getInfoReply(command, this.chooser);
                    asyncTask.getClass();
                    infoReply.thenAccept(asyncTask::replyCompleted);
                    z = true;
                } else if (redisCommand == RedisCommand.CLIENT) {
                    boolean z5 = (channelInfo.getBid() == null || channelInfo.getBgroup() == null) ? false : true;
                    Reply invokeClientCommand = ClientCommandUtil.invokeClientCommand(channelInfo, command);
                    if (!z5 && !checkConnectLimit(channelInfo)) {
                        return;
                    }
                    asyncTask.replyCompleted(invokeClientCommand);
                    z = true;
                } else {
                    if (redisCommand == RedisCommand.QUIT) {
                        channelInfo.getCtx().close();
                        return;
                    }
                    if (redisCommand == RedisCommand.SUBSCRIBE || redisCommand == RedisCommand.PSUBSCRIBE) {
                        channelInfo.setInSubscribe(true);
                    }
                    if (channelInfo.isInSubscribe() && redisCommand != RedisCommand.SUBSCRIBE && redisCommand != RedisCommand.PSUBSCRIBE && redisCommand != RedisCommand.UNSUBSCRIBE && redisCommand != RedisCommand.PUNSUBSCRIBE) {
                        asyncTaskQueue.reply(new ErrorReply("Command " + redisCommand.name() + " not allowed while subscribed. Allowed commands are: [PSUBSCRIBE, QUIT, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE]"));
                        z = true;
                    } else if (redisCommand == null || redisCommand.getSupportType() == RedisCommand.CommandSupportType.NOT_SUPPORT) {
                        asyncTask.replyCompleted(ErrorReply.NOT_SUPPORT);
                        z = true;
                    } else {
                        if (command.getRedisCommand() == RedisCommand.GET && this.hotKeyCacheManager != null && command.getObjects().length >= 2) {
                            byte[] bArr = command.getObjects()[1];
                            HotKeyCache hotKeyCache = this.hotKeyCacheManager.get(channelInfo.getBid(), channelInfo.getBgroup());
                            if (hotKeyCache != null) {
                                asyncTask.setHotKeyCache(hotKeyCache);
                                HotValue cache = hotKeyCache.getCache(bArr);
                                if (cache != null) {
                                    asyncTask.replyCompleted(new BulkReply(cache.getValue()), true);
                                    z = true;
                                }
                            }
                        }
                        if (this.bigKeyHunter != null) {
                            try {
                                this.bigKeyHunter.checkRequest(command);
                            } catch (Exception e4) {
                                ErrorLogCollector.collect(CommandsTransponder.class, e4.getMessage(), e4);
                            }
                        }
                        arrayList2.add(asyncTask);
                    }
                }
            }
            if (arrayList2.isEmpty()) {
                return;
            }
            if (z) {
                list = new ArrayList(arrayList2.size());
                Iterator<AsyncTask> it2 = arrayList2.iterator();
                while (it2.hasNext()) {
                    list.add(it2.next().getCommand());
                }
            }
            flush(channelInfo.getBid(), channelInfo.getBgroup(), arrayList2, list);
        } catch (Exception e5) {
            logger.error("commands transponder error, client connect will be force closed, bid = {}, bgroup = {}, addr = {}", new Object[]{channelInfo.getBid(), channelInfo.getBgroup(), channelInfo.getCtx().channel().remoteAddress(), e5});
            channelInfo.getCtx().close();
        }
    }

    private boolean checkConnectLimit(ChannelInfo channelInfo) {
        try {
            if (ConnectLimiterHolder.connectLimiter == null) {
                return true;
            }
            Long bid = channelInfo.getBid();
            String bgroup = channelInfo.getBgroup();
            if (bid == null || bgroup == null) {
                return true;
            }
            int bidBgroupConnect = ChannelMonitor.bidBgroupConnect(bid.longValue(), bgroup);
            int connectThreshold = ConnectLimiterHolder.connectLimiter.connectThreshold(bid.longValue(), bgroup);
            if (bidBgroupConnect < connectThreshold) {
                ChannelMonitor.initBidBgroup(bid.longValue(), bgroup, channelInfo);
                return true;
            }
            ChannelHandlerContext ctx = channelInfo.getCtx();
            ctx.writeAndFlush(ErrorReply.TOO_MANY_CONNECTS).addListener(channelFuture -> {
                ctx.close();
            });
            logger.warn("too many connects, connect will be force closed, bid = {}, bgroup = {}, current = {}, max = {}, consid = {}, client.addr = {}", new Object[]{bid, bgroup, Integer.valueOf(bidBgroupConnect), Integer.valueOf(connectThreshold), channelInfo.getConsid(), channelInfo.getCtx().channel().remoteAddress()});
            return false;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return true;
        }
    }

    private void flush(Long l, String str, List<AsyncTask> list, List<Command> list2) {
        AsyncCamelliaRedisTemplate asyncCamelliaRedisTemplate = null;
        try {
            try {
                asyncCamelliaRedisTemplate = this.chooser.choose(l, str);
            } catch (Exception e) {
                ErrorLogCollector.collect(CommandsTransponder.class, "AsyncCamelliaRedisTemplateChooser choose error, bid = " + l + ", bgroup = " + str + ", ex = " + e.toString(), e);
            }
            if (asyncCamelliaRedisTemplate == null) {
                Iterator<AsyncTask> it = list.iterator();
                while (it.hasNext()) {
                    it.next().replyCompleted(ErrorReply.NOT_AVAILABLE);
                }
            } else {
                try {
                    List<CompletableFuture<Reply>> sendCommand = asyncCamelliaRedisTemplate.sendCommand(list2);
                    for (int i = 0; i < list.size(); i++) {
                        AsyncTask asyncTask = list.get(i);
                        CompletableFuture<Reply> completableFuture = sendCommand.get(i);
                        asyncTask.getClass();
                        completableFuture.thenAccept(asyncTask::replyCompleted);
                    }
                } catch (Exception e2) {
                    ErrorLogCollector.collect(CommandsTransponder.class, "AsyncCamelliaRedisTemplateChooser sendCommand error, bid = " + l + ", bgroup = " + str + ", ex = " + e2.toString(), e2);
                    Iterator<AsyncTask> it2 = list.iterator();
                    while (it2.hasNext()) {
                        it2.next().replyCompleted(ErrorReply.NOT_AVAILABLE);
                    }
                    list.clear();
                    return;
                }
            }
            list.clear();
        } catch (Throwable th) {
            list.clear();
            throw th;
        }
    }
}
