package com.af.v4.system.common.cdc.connector.mssqltock.sink;

import com.af.v4.system.common.cdc.config.CdcConfig;
import com.af.v4.system.common.datasource.DynamicDataSource;
import com.af.v4.system.common.jpa.service.EntityService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.json.JSONObject;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/af/v4/system/common/cdc/connector/mssqltock/sink/ClickhouseSink.class */
public class ClickhouseSink extends RichSinkFunction<String> {
    private final CdcConfig cdcConfig;
    private final EntityService entityService;

    public ClickhouseSink(CdcConfig cdcConfig, EntityService entityService) {
        this.cdcConfig = cdcConfig;
        this.entityService = entityService;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        DynamicDataSource.setDataSource(this.cdcConfig.getSink().getName());
    }

    public void invoke(String str, SinkFunction.Context context) throws Exception {
        System.out.println("收到变更原始数据：" + str);
        JSONObject jSONObject = new JSONObject(str);
        String obj = jSONObject.get("type").toString();
        boolean z = -1;
        switch (obj.hashCode()) {
            case -1335458389:
                if (obj.equals("delete")) {
                    z = 3;
                    break;
                }
                break;
            case -1183792455:
                if (obj.equals("insert")) {
                    z = true;
                    break;
                }
                break;
            case -838846263:
                if (obj.equals("update")) {
                    z = 2;
                    break;
                }
                break;
            case 3496342:
                if (obj.equals("read")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
                this.entityService.partialSave("t_sink", jSONObject.getJSONObject("data"));
                return;
            case true:
                JSONObject jSONObject2 = new JSONObject();
                this.entityService.partialSave("t_sink", jSONObject2);
                this.entityService.partialSave("t_sink", jSONObject2);
                return;
            case true:
                this.entityService.delete("t_sink", jSONObject.getJSONObject("data").get("id"));
                return;
            default:
                throw new Exception("type的值是无效的");
        }
    }
}
