/*
 * Decompiled with CFR 0.152.
 */
package com.af.v4.system.common.redis.core;

import com.af.v4.system.common.redis.RedisService;
import com.af.v4.system.common.redis.annotation.BeforeRetry;
import com.af.v4.system.common.redis.annotation.FastCacheQueue;
import com.af.v4.system.common.redis.annotation.FilterCondition;
import com.af.v4.system.common.redis.annotation.MaxRetry;
import com.af.v4.system.common.redis.core.ConsumerRegistration;
import com.af.v4.system.common.redis.core.FastCacheQueueProcess;
import com.af.v4.system.common.redis.entity.FastCacheQueueEntity;
import jakarta.annotation.PreDestroy;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

@Component
public class FastCacheQueueManager {
    private static final Logger log = LoggerFactory.getLogger(FastCacheQueueManager.class);
    private final RedisService redisService;
    private final ApplicationContext context;
    private final Map<String, Thread> consumerThreads = new ConcurrentHashMap<String, Thread>();
    private final Map<String, Boolean> runningFlags = new ConcurrentHashMap<String, Boolean>();

    public FastCacheQueueManager(RedisService redisService, ApplicationContext context) {
        this.redisService = redisService;
        this.context = context;
    }

    public void registerFromConfig(JSONObject config, FastCacheQueueManager manager) {
        JSONArray queueArr = config.optJSONArray("queue");
        if (queueArr == null || queueArr.isEmpty()) {
            return;
        }
        HashMap<String, FastCacheQueueProcess> processMap = new HashMap<String, FastCacheQueueProcess>();
        for (FastCacheQueueProcess bean : this.context.getBeansOfType(FastCacheQueueProcess.class).values()) {
            FastCacheQueue anno = bean.getClass().getAnnotation(FastCacheQueue.class);
            if (anno == null) continue;
            for (String string : anno.value()) {
                processMap.put(string, bean);
            }
        }
        for (int i = 0; i < queueArr.length(); ++i) {
            JSONObject item = queueArr.getJSONObject(i);
            String queueName = item.optString("name");
            int consumers = item.optInt("consumers", 1);
            FastCacheQueueProcess processor = (FastCacheQueueProcess)processMap.get(queueName);
            if (processor == null) {
                log.warn(">>> \u672a\u627e\u5230\u5904\u7406\u5668: {}\uff0c\u8df3\u8fc7\u961f\u5217\u6ce8\u518c", (Object)queueName);
                continue;
            }
            ConsumerRegistration reg = new ConsumerRegistration(queueName).onProcess(processor::onProcess).onSuccess(processor::onSuccess).onFailure(processor::onFailure);
            for (Method method : processor.getClass().getDeclaredMethods()) {
                if (method.isAnnotationPresent(FilterCondition.class)) {
                    method.setAccessible(true);
                    reg.filter(data -> {
                        try {
                            return (Boolean)method.invoke((Object)processor, data);
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                }
                if (!method.isAnnotationPresent(BeforeRetry.class)) continue;
                method.setAccessible(true);
                reg.beforeRetry((data, retryContext) -> {
                    try {
                        method.invoke((Object)processor, data, retryContext);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            MaxRetry maxRetry = processor.getClass().getAnnotation(MaxRetry.class);
            if (maxRetry != null) {
                reg.maxRetries(maxRetry.value());
            }
            reg.registerWith(manager, consumers);
        }
    }

    protected void register(String queueName, int threadNumber, ConsumerRegistration reg) {
        String threadKey = queueName + "$" + threadNumber;
        this.runningFlags.put(threadKey, true);
        Runnable task = () -> {
            log.info(">>> \u542f\u52a8\u6d88\u8d39\u7ebf\u7a0b \u25b6\ufe0f {}", (Object)threadKey);
            while (Boolean.TRUE.equals(this.runningFlags.get(threadKey))) {
                try {
                    int maxRetry;
                    Object raw = this.redisService.blockingPopFromQueue(queueName);
                    if (reg.verbose) {
                        log.info(">>> \u961f\u5217 {} \u8fd4\u56de\u6570\u636e\uff1a{}", (Object)queueName, raw);
                    }
                    if (!(raw instanceof FastCacheQueueEntity)) {
                        if (!reg.verbose) continue;
                        log.warn(">>> \u961f\u5217 {} \u8fd4\u56de\u6570\u636e\u4e0d\u662f FastCacheQueueEntity\uff0c\u8df3\u8fc7\u5904\u7406", (Object)queueName);
                        continue;
                    }
                    FastCacheQueueEntity entity = (FastCacheQueueEntity)raw;
                    JSONObject data = entity.getData();
                    int retry = entity.getCurrRetryTimes();
                    int n = maxRetry = entity.getMaxRetryTimes() > 0 ? entity.getMaxRetryTimes() : reg.defaultMaxRetries;
                    if (reg.filter != null && !reg.filter.test(data)) {
                        if (!reg.verbose) continue;
                        log.info(">>> \u6d88\u606f\u88ab\u62e6\u622a \u25b6\ufe0f {}", (Object)threadKey);
                        continue;
                    }
                    try {
                        if (reg.verbose) {
                            log.info("\u6d88\u8d39\u5f00\u59cb \u25b6\ufe0f {}", (Object)threadKey);
                        }
                        Object result = null;
                        if (reg.onProcess != null) {
                            if (reg.verbose) {
                                log.info(">>> \u6d88\u8d39\u5f00\u59cb \u25b6\ufe0f {}", (Object)threadKey);
                            }
                            result = reg.onProcess.apply(data);
                        }
                        reg.onSuccess.accept(data, result);
                        if (!reg.verbose) continue;
                        log.info(">>> \u6d88\u8d39\u6210\u529f \u25b6\ufe0f {}", (Object)threadKey);
                    }
                    catch (Exception e) {
                        boolean allowRetry;
                        boolean bl = allowRetry = maxRetry > 0 && ++retry < maxRetry && (reg.condition == null || reg.condition.test(data, e));
                        if (allowRetry) {
                            FastCacheQueueEntity retryEntity = new FastCacheQueueEntity.Builder().data(data).currRetryTimes(retry).maxRetryTimes(maxRetry).build();
                            if (reg.beforeRetry != null) {
                                JSONObject retryContext = new JSONObject().put("retryCount", retry).put("maxRetry", maxRetry).put("exception", (Object)e.getMessage());
                                reg.beforeRetry.accept(data, retryContext);
                            }
                            this.redisService.pushToQueue(queueName, retryEntity);
                            continue;
                        }
                        log.error("\ud83d\uded1 >>> \u6d88\u8d39\u5931\u8d25 \u25b6\ufe0f {}\uff0c\u5f02\u5e38\uff1a{}", (Object)threadKey, (Object)e.getMessage());
                        if (reg.onFailure != null) {
                            reg.onFailure.accept(data, e);
                        }
                        if (!reg.requeueOnFailure) continue;
                        this.redisService.pushToQueue(queueName, entity);
                    }
                }
                catch (Exception ex) {
                    log.error(">>> \u7ebf\u7a0b\u5f02\u5e38 \u25b6\ufe0f {}\uff0c\u539f\u56e0\uff1a{}", (Object)threadKey, (Object)ex.getMessage());
                }
            }
            log.info(">>> \u6d88\u8d39\u7ebf\u7a0b\u9000\u51fa \u25b6\ufe0f {}", (Object)threadKey);
        };
        Thread thread = new Thread(task, threadKey);
        thread.start();
        this.consumerThreads.put(threadKey, thread);
    }

    public void shutdownQueue(String queueName) {
        log.info(">>> \u505c\u6b62\u961f\u5217 \u25b6\ufe0f {}", (Object)queueName);
        this.consumerThreads.keySet().stream().filter(k -> k.startsWith(queueName + "$")).forEach(k -> {
            this.runningFlags.put((String)k, false);
            Thread t = this.consumerThreads.remove(k);
            if (t != null) {
                try {
                    t.join(1000L);
                    log.info(">>> \u5df2\u5173\u95ed\u7ebf\u7a0b \u25b6\ufe0f {}", k);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        });
    }

    @PreDestroy
    public void shutdownAll() {
        log.info(">>> \u6b63\u5728\u5173\u95ed\u6240\u6709\u961f\u5217\u7ebf\u7a0b...");
        this.runningFlags.keySet().forEach(k -> this.runningFlags.put((String)k, false));
        this.consumerThreads.values().forEach(t -> {
            try {
                t.join(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        });
        log.info(">>> \u6240\u6709\u7ebf\u7a0b\u5df2\u5173\u95ed");
    }
}

