package com.af.v4.system.common.cdc.connector.sink;

import com.af.v4.system.common.core.exception.ServiceException;
import com.af.v4.system.common.core.utils.SpringUtils;
import com.af.v4.system.common.mq.RocketMQProducer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.rocketmq.client.producer.SendStatus;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/af/v4/system/common/cdc/connector/sink/Sink.class */
public abstract class Sink extends RichSinkFunction<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Sink.class);
    private static final Logger CDC_ERROR_LOGGER = LoggerFactory.getLogger("cdcErrorLog");
    protected String globalConfigStr;
    protected RocketMQProducer rocketMQProducer;

    public Sink(String str) {
        this.globalConfigStr = str;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.rocketMQProducer = (RocketMQProducer) SpringUtils.getBean(RocketMQProducer.class);
    }

    public void invoke(String str, SinkFunction.Context context) {
        JSONObject jSONObject = new JSONObject(str);
        JSONObject jSONObject2 = new JSONObject(this.globalConfigStr);
        JSONObject jSONObject3 = jSONObject2.getJSONObject("sink");
        String string = jSONObject3.getString("table");
        JSONObject jSONObject4 = new JSONObject();
        if (jSONObject.has("type")) {
            jSONObject4.put("type", jSONObject.get("type").toString());
        }
        JSONObject jSONObject5 = jSONObject.getJSONObject("data");
        try {
            jSONObject4.put("dataSourceName", jSONObject3.getString("name"));
            jSONObject4.put("entityData", jSONObject5);
            jSONObject4.put("table", string);
            exec(jSONObject2, jSONObject4);
            JSONObject syncSend = this.rocketMQProducer.syncSend("AF_LOGIC_TOPIC", jSONObject2.getString("dataSyncLogic"), jSONObject4.toString());
            if (syncSend.getString("sendStatus").equals(SendStatus.SEND_OK.name())) {
            } else {
                throw new ServiceException("数据同步失败，信息：" + syncSend);
            }
        } catch (Exception e) {
            LOGGER.error("数据处理失败.", e);
            CDC_ERROR_LOGGER.info(jSONObject4 + ",");
        }
    }

    public abstract void exec(JSONObject jSONObject, JSONObject jSONObject2) throws Exception;
}
