package com.aizuda.snailjob.client.core.client;

import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.annotation.SnailEndPoint;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.core.IdempotentIdGenerate;
import com.aizuda.snailjob.client.core.cache.FutureCache;
import com.aizuda.snailjob.client.core.cache.RetryerInfoCache;
import com.aizuda.snailjob.client.core.callback.future.CallbackTaskExecutorFutureCallback;
import com.aizuda.snailjob.client.core.callback.future.RetryTaskExecutorFutureCallback;
import com.aizuda.snailjob.client.core.context.CallbackContext;
import com.aizuda.snailjob.client.core.context.RemoteRetryContext;
import com.aizuda.snailjob.client.core.exception.SnailRetryClientException;
import com.aizuda.snailjob.client.core.executor.RemoteCallbackExecutor;
import com.aizuda.snailjob.client.core.executor.RemoteRetryExecutor;
import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader;
import com.aizuda.snailjob.client.core.log.RetryLogMeta;
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
import com.aizuda.snailjob.client.core.serializer.JacksonSerializer;
import com.aizuda.snailjob.client.core.timer.StopTaskTimerTask;
import com.aizuda.snailjob.client.core.timer.TimerManager;
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.IdempotentIdContext;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.validation.Valid;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.ReflectionUtils;
import org.springframework.validation.annotation.Validated;

@SnailEndPoint
@Validated
/* loaded from: input_file:com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.class */
public class SnailRetryEndPoint implements Lifecycle {
    private final RemoteRetryExecutor remoteRetryExecutor;
    private final RemoteCallbackExecutor remoteCallbackExecutor;
    private final SnailJobProperties snailJobProperties;
    private ThreadPoolExecutor dispatcherThreadPool = null;

    @Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST)
    public Result<Boolean> dispatch(@Valid DispatchRetryRequest dispatchRetryRequest) {
        RemoteRetryContext bulidRemoteRetryContext = bulidRemoteRetryContext(dispatchRetryRequest);
        RetryerInfo retryerInfo = RetryerInfoCache.get(dispatchRetryRequest.getSceneName(), dispatchRetryRequest.getExecutorName());
        if (Objects.isNull(retryerInfo)) {
            SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", new Object[]{dispatchRetryRequest.getSceneName()});
            return new Result<>(StatusEnum.NO.getStatus().intValue(), MessageFormat.format("场景:[{0}]配置不存在, 请检查您的场景和执行器是否存在", dispatchRetryRequest.getSceneName()));
        }
        initLogContext(bulidRemoteRetryContext);
        try {
            bulidRemoteRetryContext.setDeSerialize((Object[]) SnailRetrySpiLoader.loadRetryArgSerializer().deSerialize(dispatchRetryRequest.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()));
            ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(this.dispatcherThreadPool);
            ListenableFuture submit = listeningDecorator.submit(() -> {
                return this.remoteRetryExecutor.doRetry(bulidRemoteRetryContext);
            });
            FutureCache.addFuture(dispatchRetryRequest.getRetryTaskId(), submit);
            Futures.addCallback(submit, new RetryTaskExecutorFutureCallback(bulidRemoteRetryContext), listeningDecorator);
            TimerManager.add(new StopTaskTimerTask(dispatchRetryRequest.getRetryTaskId()), dispatchRetryRequest.getExecutorTimeout().intValue(), TimeUnit.SECONDS);
            SnailJobLog.REMOTE.info("重试任务:[{}] 调度成功. ", new Object[]{dispatchRetryRequest.getRetryTaskId()});
            return new Result<>(Boolean.TRUE);
        } catch (JsonProcessingException e) {
            SnailJobLog.REMOTE.error("参数解析异常 args:[{}]", new Object[]{dispatchRetryRequest.getArgsStr(), e});
            return new Result<>(StatusEnum.NO.getStatus().intValue(), MessageFormat.format("参数解析异常 args:[{0}]", dispatchRetryRequest.getArgsStr()));
        }
    }

    private static RemoteRetryContext bulidRemoteRetryContext(DispatchRetryRequest dispatchRetryRequest) {
        RemoteRetryContext remoteRetryContext = new RemoteRetryContext();
        remoteRetryContext.setRetryTaskId(dispatchRetryRequest.getRetryTaskId());
        remoteRetryContext.setRetryId(dispatchRetryRequest.getRetryId());
        remoteRetryContext.setRetryCount(dispatchRetryRequest.getRetryCount());
        remoteRetryContext.setArgsStr(dispatchRetryRequest.getArgsStr());
        remoteRetryContext.setGroupName(dispatchRetryRequest.getGroupName());
        remoteRetryContext.setNamespaceId(dispatchRetryRequest.getNamespaceId());
        remoteRetryContext.setScene(dispatchRetryRequest.getSceneName());
        remoteRetryContext.setExecutorName(dispatchRetryRequest.getExecutorName());
        return remoteRetryContext;
    }

    private static void initLogContext(RemoteRetryContext remoteRetryContext) {
        RetryLogMeta retryLogMeta = new RetryLogMeta();
        retryLogMeta.setGroupName(remoteRetryContext.getGroupName());
        retryLogMeta.setNamespaceId(remoteRetryContext.getNamespaceId());
        retryLogMeta.setRetryId(remoteRetryContext.getRetryId());
        retryLogMeta.setRetryTaskId(remoteRetryContext.getRetryTaskId());
        SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
    }

    @Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
    public Result<Boolean> callback(@Valid RetryCallbackRequest retryCallbackRequest) {
        CallbackContext buildCallbackContext = buildCallbackContext(retryCallbackRequest);
        try {
            initLogContext(buildCallbackContext);
            RetryerInfo retryerInfo = RetryerInfoCache.get(retryCallbackRequest.getSceneName(), retryCallbackRequest.getExecutorName());
            if (Objects.isNull(retryerInfo)) {
                SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", new Object[]{retryCallbackRequest.getSceneName()});
                return new Result<>(0, "回调失败", Boolean.FALSE);
            }
            buildCallbackContext.setDeSerialize((Object[]) SnailRetrySpiLoader.loadRetryArgSerializer().deSerialize(retryCallbackRequest.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()));
            buildCallbackContext.setRetryerInfo(retryerInfo);
            ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(this.dispatcherThreadPool);
            ListenableFuture submit = listeningDecorator.submit(() -> {
                this.remoteCallbackExecutor.doRetryCallback(buildCallbackContext);
                return Boolean.TRUE;
            });
            FutureCache.addFuture(retryCallbackRequest.getRetryTaskId(), submit);
            Futures.addCallback(submit, new CallbackTaskExecutorFutureCallback(buildCallbackContext), listeningDecorator);
            TimerManager.add(new StopTaskTimerTask(retryCallbackRequest.getRetryTaskId()), retryCallbackRequest.getExecutorTimeout().intValue(), TimeUnit.SECONDS);
            SnailJobLog.REMOTE.info("回调任务:[{}] 调度成功. ", new Object[]{retryCallbackRequest.getRetryTaskId()});
            return new Result<>(Boolean.TRUE);
        } catch (JsonProcessingException e) {
            SnailJobLog.REMOTE.error("参数解析异常", new Object[]{e});
            return new Result<>(0, "回调失败", Boolean.FALSE);
        }
    }

    private static CallbackContext buildCallbackContext(RetryCallbackRequest retryCallbackRequest) {
        CallbackContext callbackContext = new CallbackContext();
        callbackContext.setRetryTaskId(retryCallbackRequest.getRetryTaskId());
        callbackContext.setRetryId(retryCallbackRequest.getRetryId());
        callbackContext.setGroupName(retryCallbackRequest.getGroupName());
        callbackContext.setNamespaceId(retryCallbackRequest.getNamespaceId());
        callbackContext.setSceneName(retryCallbackRequest.getSceneName());
        callbackContext.setRetryStatus(retryCallbackRequest.getRetryStatus());
        return callbackContext;
    }

    private static void initLogContext(CallbackContext callbackContext) {
        RetryLogMeta retryLogMeta = new RetryLogMeta();
        retryLogMeta.setGroupName(callbackContext.getGroupName());
        retryLogMeta.setNamespaceId(callbackContext.getNamespaceId());
        retryLogMeta.setRetryTaskId(callbackContext.getRetryTaskId());
        retryLogMeta.setRetryId(callbackContext.getRetryId());
        SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
    }

    @Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST)
    public Result<String> idempotentIdGenerate(@Valid GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {
        String scene = generateRetryIdempotentIdDTO.getScene();
        String executorName = generateRetryIdempotentIdDTO.getExecutorName();
        String argsStr = generateRetryIdempotentIdDTO.getArgsStr();
        RetryerInfo retryerInfo = RetryerInfoCache.get(scene, executorName);
        Assert.notNull(retryerInfo, () -> {
            return new SnailRetryClientException("重试信息不存在 scene:[{}] executorName:[{}]", scene, executorName);
        });
        Method method = retryerInfo.getMethod();
        try {
            Object[] objArr = (Object[]) new JacksonSerializer().deSerialize(argsStr, retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
            try {
                Class<? extends IdempotentIdGenerate> idempotentIdGenerate = retryerInfo.getIdempotentIdGenerate();
                return new Result<>((String) ReflectionUtils.invokeMethod(idempotentIdGenerate.getMethod("idGenerate", IdempotentIdContext.class), idempotentIdGenerate.newInstance(), new Object[]{new IdempotentIdContext(scene, executorName, objArr, method.getName())}));
            } catch (Exception e) {
                SnailJobLog.LOCAL.error("幂等id生成异常：{},{}", new Object[]{scene, argsStr, e});
                throw new SnailRetryClientException("idempotentId生成异常：{},{}", scene, argsStr);
            }
        } catch (JsonProcessingException e2) {
            throw new SnailRetryClientException("参数解析异常", (Throwable) e2);
        }
    }

    @Mapping(path = "/retry/stop/v1", method = RequestMethod.POST)
    public Result<Boolean> stop(@Valid StopRetryRequest stopRetryRequest) {
        FutureCache.remove(stopRetryRequest.getRetryTaskId());
        return new Result<>(Boolean.TRUE);
    }

    public void start() {
        if (Objects.nonNull(this.dispatcherThreadPool)) {
            return;
        }
        SnailJobProperties.ThreadPoolConfig dispatcherThreadPool = this.snailJobProperties.getRetry().getDispatcherThreadPool();
        this.dispatcherThreadPool = new ThreadPoolExecutor(dispatcherThreadPool.getCorePoolSize(), dispatcherThreadPool.getMaximumPoolSize(), dispatcherThreadPool.getKeepAliveTime(), dispatcherThreadPool.getTimeUnit(), (BlockingQueue<Runnable>) new LinkedBlockingQueue(dispatcherThreadPool.getQueueCapacity()), (ThreadFactory) new CustomizableThreadFactory("snail-retry-dispatcher-"));
    }

    public void close() {
        if (Objects.nonNull(this.dispatcherThreadPool)) {
            this.dispatcherThreadPool.shutdown();
        }
    }

    @Generated
    public SnailRetryEndPoint(RemoteRetryExecutor remoteRetryExecutor, RemoteCallbackExecutor remoteCallbackExecutor, SnailJobProperties snailJobProperties) {
        this.remoteRetryExecutor = remoteRetryExecutor;
        this.remoteCallbackExecutor = remoteCallbackExecutor;
        this.snailJobProperties = snailJobProperties;
    }
}
