package com.af.v4.system.common.cdc.connector.mssqltock.source;

import com.af.stc.utils.JsonDebeziumDeserializationSchema;
import com.af.v4.system.common.cdc.CdcDataSource;
import com.af.v4.system.common.cdc.config.CdcConfig;
import com.af.v4.system.common.cdc.connector.mssqltock.sink.ClickhouseSink;
import com.af.v4.system.common.datasource.DynamicDataSource;
import com.af.v4.system.common.datasource.MyDruidDataSource;
import com.alibaba.druid.DbType;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/af/v4/system/common/cdc/connector/mssqltock/source/SQLServerCdcSource.class */
public class SQLServerCdcSource implements ApplicationRunner, Serializable {
    private final CdcConfig cdcConfig;
    private final ClickhouseSink clickhouseSink;

    public SQLServerCdcSource(ClickhouseSink clickhouseSink, CdcConfig cdcConfig) {
        this.clickhouseSink = clickhouseSink;
        this.cdcConfig = cdcConfig;
    }

    public void run(ApplicationArguments applicationArguments) throws Exception {
        System.out.println("开始启动Flink CDC获取ERP变更数据......");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(this.cdcConfig.getParallelism().intValue());
        executionEnvironment.enableCheckpointing(this.cdcConfig.getCheckpointInterval().longValue(), "exactly".equals(this.cdcConfig.getCheckpointMode()) ? CheckpointingMode.EXACTLY_ONCE : CheckpointingMode.AT_LEAST_ONCE);
        executionEnvironment.getCheckpointConfig().setCheckpointTimeout(this.cdcConfig.getCheckpointTimeout().longValue());
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(this.cdcConfig.getRestartNumber().intValue(), this.cdcConfig.getAttempToRestartDelay().longValue()));
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(this.cdcConfig.getCheckpointDataUri());
        executionEnvironment.addSource(buildDataChangeSource(), "SQLServer-source").addSink(this.clickhouseSink);
        executionEnvironment.execute("SQLServer-stream-cdc");
    }

    private DebeziumSourceFunction buildDataChangeSource() throws MalformedURLException {
        Map dataSourceList = DynamicDataSource.getDataSourceList();
        CdcDataSource source = this.cdcConfig.getSource();
        MyDruidDataSource myDruidDataSource = (MyDruidDataSource) dataSourceList.get(source.getName());
        String str = "127.0.0.1";
        int i = 1433;
        if (DbType.of(myDruidDataSource.getDbType()) == DbType.sqlserver) {
            URL url = new URL(myDruidDataSource.getRawJdbcUrl());
            str = url.getHost();
            i = url.getPort();
        }
        return SqlServerSource.builder().hostname(str).port(i).database(myDruidDataSource.getName()).tableList((String[]) source.getTableList().toArray(new String[0])).username(myDruidDataSource.getUsername()).password(myDruidDataSource.getPassword()).startupOptions(this.cdcConfig.getTotalQuantity().booleanValue() ? StartupOptions.initial() : StartupOptions.latest()).deserializer(new JsonDebeziumDeserializationSchema()).build();
    }
}
