/*
 * Decompiled with CFR 0.152.
 */
package com.af.v4.system.common.cdc.connector.source;

import com.af.v4.system.common.cdc.enums.ConfigureEnum;
import com.af.v4.system.common.cdc.utils.JsonDebeziumDeserializationSchema;
import com.af.v4.system.common.datasource.DynamicDataSource;
import com.af.v4.system.common.datasource.MyDruidDataSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import java.util.Map;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Source {
    private static final Logger LOGGER = LoggerFactory.getLogger(Source.class);
    protected StreamExecutionEnvironment env;
    protected JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema();

    protected abstract DebeziumSourceFunction buildDataSource(JSONObject var1, JSONObject var2, MyDruidDataSource var3);

    public void start(JSONObject config) throws Exception {
        LOGGER.info("\u51c6\u5907\u6355\u83b7\u53d8\u66f4\u6570\u636e......");
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(ConfigureEnum.NumEnum.PARALLELISM.getNum().intValue());
        this.configureCheckpointing();
        this.configureRestartStrategy();
        Map dataSourceMap = DynamicDataSource.getDataSourceList();
        JSONObject sourceConfig = config.getJSONObject("source");
        JSONObject sinkConfig = config.getJSONObject("sink");
        String sourceName = sourceConfig.getString("name");
        String sinkName = sinkConfig.getString("name");
        MyDruidDataSource sourceDataSource = (MyDruidDataSource)dataSourceMap.get(sourceName);
        DebeziumSourceFunction debeziumSourceFunction = this.buildDataSource(config, sourceConfig, sourceDataSource);
        this.env.addSource((SourceFunction)debeziumSourceFunction, sourceName).addSink((SinkFunction)Class.forName(sinkConfig.getString("className")).getDeclaredConstructor(String.class).newInstance(config.toString())).name(sinkName);
        this.env.execute(config.getString("jobName"));
    }

    private void configureCheckpointing() {
        this.env.enableCheckpointing(ConfigureEnum.TimeEnum.CHECKPOINT_INTERVAL.getTime().longValue(), CheckpointingMode.EXACTLY_ONCE);
        this.env.getCheckpointConfig().setCheckpointTimeout(ConfigureEnum.TimeEnum.CHECKPOINT_TIMEOUT.getTime().longValue());
        this.env.setStateBackend((StateBackend)new EmbeddedRocksDBStateBackend());
        this.env.getCheckpointConfig().setCheckpointStorage("file:///D:/flink-checkpoint");
        this.env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        this.env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)3, (long)2000L));
    }

    private void configureRestartStrategy() {
        int restartCount = ConfigureEnum.NumEnum.RESTART_COUNT.getNum();
        long attemptToRestartDelay = ConfigureEnum.TimeEnum.ATTEMPT_TO_RESTART_DELAY.getTime();
        this.env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)restartCount, (long)attemptToRestartDelay));
    }
}

