package com.af.v4.system.common.cdc.connector.source;

import com.af.v4.system.common.cdc.enums.ConfigureEnum;
import com.af.v4.system.common.cdc.utils.JsonDebeziumDeserializationSchema;
import com.af.v4.system.common.datasource.DynamicDataSource;
import com.af.v4.system.common.datasource.MyDruidDataSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import java.util.Map;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/af/v4/system/common/cdc/connector/source/Source.class */
public abstract class Source {
    private static final Logger LOGGER = LoggerFactory.getLogger(Source.class);
    protected StreamExecutionEnvironment env;
    protected JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema();

    protected abstract DebeziumSourceFunction buildDataSource(JSONObject jSONObject, JSONObject jSONObject2, MyDruidDataSource myDruidDataSource);

    public void start(JSONObject jSONObject) throws Exception {
        LOGGER.info("准备捕获变更数据......");
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(ConfigureEnum.NumEnum.PARALLELISM.getNum().intValue());
        configureCheckpointing();
        configureRestartStrategy();
        Map dataSourceList = DynamicDataSource.getDataSourceList();
        JSONObject jSONObject2 = jSONObject.getJSONObject("source");
        JSONObject jSONObject3 = jSONObject.getJSONObject("sink");
        String string = jSONObject2.getString("name");
        this.env.addSource(buildDataSource(jSONObject, jSONObject2, (MyDruidDataSource) dataSourceList.get(string)), string).addSink((SinkFunction) Class.forName(jSONObject3.getString("className")).getDeclaredConstructor(String.class).newInstance(jSONObject.toString())).name(jSONObject3.getString("name"));
        this.env.execute(jSONObject.getString("jobName"));
    }

    private void configureCheckpointing() {
        this.env.enableCheckpointing(ConfigureEnum.TimeEnum.CHECKPOINT_INTERVAL.getTime().longValue(), CheckpointingMode.EXACTLY_ONCE);
        this.env.getCheckpointConfig().setCheckpointTimeout(ConfigureEnum.TimeEnum.CHECKPOINT_TIMEOUT.getTime().longValue());
        this.env.setStateBackend(new EmbeddedRocksDBStateBackend());
        this.env.getCheckpointConfig().setCheckpointStorage("file:///D:/flink-checkpoint");
        this.env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        this.env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    }

    private void configureRestartStrategy() {
        this.env.setRestartStrategy(RestartStrategies.fixedDelayRestart(ConfigureEnum.NumEnum.RESTART_COUNT.getNum().intValue(), ConfigureEnum.TimeEnum.ATTEMPT_TO_RESTART_DELAY.getTime().longValue()));
    }
}
