基于FLink实现实时安全检测的示例代码

2023-05-16 0 5,055

目录

研发背景

公司安全部目前针对内部系统的网络访问日志的安全审计,大部分都是T+1时效,每日当天,启动Python编写的定时任务,完成昨日的日志审计和检测,定时任务运行完成后,统一进行企业微信告警推送。这种方案在目前的网络环境和人员规模下,呈现两个痛点,一是面对日益频繁的网络攻击、钓鱼链接,T+1的定时任务,难以及时进行告警,因此也难以有效避免如关键信息泄露等问题,二是目前以Python为主的单机定时任务,针对不同场景的处理时效,从一小时到十几小时不等,效率低下。为解决以上问题,本人协助公司安全部同时对告警采集平台进行改造,由之前的python单机任务处理,切换到基于Flink集群的并行处理,且告警推送时效,由之前的T+1天,提升到秒级实时告警。本次改造涉及网络日志审计的多个常见场景,如端口扫描、黑名单统计、异常流量、连续恶意登录等。本次以一段时间内连续登录失败20次后,下一次登录成功场景来进行介绍。

场景描述

针对一个内部系统,如邮件系统,公司员工的访问行为日志,存放于kafka,我们希望对于一个用户账号在同一个IP下,任意的3分钟时间内,连续登录邮件系统20次失败,下一次登录成功,这种场景能够及时获取并推送到企业微信某个指定的安全接口人。kafka中的数据,能够通过某个关键字,区分当前网络访问是否一次登录事件,且有访问时间(也就是事件时间)。在解析到符合需求的用户账号之后,第一时间进行企业微信告警推送,并将其这段时间内的访问行为,写入下游ElasticSearch。

组件版本

  • Flink-1.14.4
  • Java8
  • ElasticSearch-7.3.2
  • Kafka-2.12_2.8.1

日志结构

IP和账号皆为测试使用。

{
   \"user\": \"wangxm\",
   \"client_ip\": \"110.68.6.182\",
   \"source\": \"login\",
   \"loginname\": \"wangxm@test.com\",
   \"IP\": \"110.8.148.58\",
   \"timestamp\": \"17:58:12\",
   \"@timestamp\": \"2022-04-20T09:58:13.647Z\",
   \"ip\": \"110.7.231.25\",
   \"clienttype\": \"POP3\",
   \"result\": \"success\",
   \"@version\": \"1\"
 }

技术方案

上述场景,可考虑使用FlinkCEP及Flink的滑动窗口进行实现。由于本人在采用FlinkCEP的方案进行代码编写调试后,发现并不能满足,因此改用滑动窗口进行实现。

关键代码

入口

主入口类,创建了flink环境、设置了基础参数,创建了kafkaSource,接入消息后,进行了映射、过滤,并设置了水位线,进行了分组,之后设置了滑动窗口,在窗口内进行了事件统计,将复合条件的事件收集返回并写入ElasticSearch。

针对map、filter、keyBy、window等算子,都单独进行了编写,后面会一一列出来。

package com.data.dev.flink.mailTopic.main;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.elasticsearch.ElasticSearchInfo;
import com.data.dev.elasticsearch.SinkToEs;
import com.data.dev.flink.FlinkEnv;
import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*;
import com.data.dev.kafka.KafkaSourceBuilder;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Duration;


/**
 * Flink处理在3分钟内连续登录失败20次后登录成功的场景
 * 采用滑动窗口来实现
 * @author wangxiaomin 2022-06-01
 */

@Slf4j
public class MailMsg extends BaseBean {

    /**
     * Flink作业名称
     */
    public static final  String JobName = \"告警采集平台——连续登录失败后登录成功告警\";
    /**
     * Kafka消息名
     */
    public static final  String KafkaSourceName = \"Kafka Source for AlarmPlatform About Mail Topic\";

    public MailMsg(){
        log.info(\"初始化滑动窗口场景告警程序\");
    }

    /**
     * 执行逻辑统计场景,实现告警推送
     */
    public static void execute(){


        //① 创建Flink执行环境并设置checkpoint等必要的参数
        StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();
        KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ;
        DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName);


        //② 筛选登录消息,创建初始登录事件流
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name(\"Map算子加工\");
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name(\"Filter算子加工\");

        //③ 设置水位线
        WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime()));
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name(\"增加水位线\");

        //④ 设置主键
        KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector());

        //⑥ 转化为滑动窗口
        WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L)));

        //⑦ 在窗口内进行逻辑统计
        SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs  = loginWindowDs.process(new WindowProcessFuncImpl()).name(\"窗口处理逻辑\");

        //⑧ 将结果转化为通用DataStream<String>格式
        SingleOutputStreamOperator<String> resultDs  = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name(\"窗口结果转化为标准格式\");

        //⑨ 将最终结果写入ES
        resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build());

        //⑩ 提交Flink集群进行执行
        FlinkEnv.envExec(env,JobName);

    }
}

mapper算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  逻辑统计场景告警推送ES消息体
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

filter算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction;


/**
 * ② 消费mail主题的消息,过滤其中login的事件
 * @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> {
    @Override
    public boolean filter(MailMsg mailMsg) {
        if(\"login\".equals(mailMsg.getSource())) {
            log.info(\"筛选原始的login事件:【\" + mailMsg + \"】\");
        }
        return \"login\".equals(mailMsg.getSource());
    }
}

keyBy算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.functions.KeySelector;

/**
 * CEP 编程,需要进行key选取
 */
@Slf4j
public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> {
    @Override
    public String getKey(MailMsg mailMsg) {
        return mailMsg.getUser() + \"@\" + mailMsg.getClient_ip();
    }
}

窗口函数核心代码)

这里我们主要考虑使用一个事件列表,用来存储每一个窗口期内得到的连续登录,当检测到登陆失败的事件,即存入事件列表中,之后判断下一次登录失败事件,如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测。一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送。

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.utils.HttpUtils;
import com.data.dev.utils.IPUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 *  滑动窗口内复杂事件解析逻辑实现
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class WindowProcessFuncImpl extends  ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable {
    @Override
    public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) {

        List<MailMsg> loginEventList = new ArrayList<>();
        MailMsgAlarm mailMsgAlarm;
        for (MailMsg mailMsg : iterable) {
            log.info(\"收集到的登录事件【\" + mailMsg + \"】\");

            if (mailMsg.getResult().equals(\"fail\")) { //开始检测当前窗口内的事件,并将失败的事件收集到loginEventList
                log.info(\"开始检测当前窗口内的事件,并将失败的事件收集到loginEventList\");
                loginEventList.add(mailMsg);
            } else if (mailMsg.getResult().equals(\"success\") && loginEventList.size() < 20) {//如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测
                log.info(\"检测到登录成功事件,但此时登录失败的次数为【\" + loginEventList.size() + \"】不足20次,清空loginEventList,等待下一次检测\");
                loginEventList.clear();
            } else if (mailMsg.getResult().equals(\"success\") && loginEventList.size() >= 20) {
                mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg);
                log.info(\"检测到登录成功的事件,此时窗口内连续登录失败的次数为【\" + mailMsgAlarm.getFailTimes() + \"】\");

                //一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送;
                loginEventList.clear();
                doAlarmPush(mailMsgAlarm);

                collector.collect(mailMsgAlarm);//将当前登录成功的事件进行收集上报
            } else {
                log.info(mailMsg.getUser() + \"当前已连续:【\" + loginEventList.size() + \"】 次登录失败\");
            }
        }
    }


    /**
     * 2022年6月17日15:03:06
     * @param eventList:当前窗口内的事件列表
     * @param eventCurrent:当前登录成功的事件
     * @return mailMsgAlarm:告警消息体
     */
    public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){

        String alarmKey = eventCurrent.getUser() + \"@\" + eventCurrent.getClient_ip();
        String loginFailStartTime = eventList.get(0).getTimestamp_datetime();
        String loginSuccessTime = eventCurrent.getTimestamp_datetime();
        int loginFailTimes = eventList.size();

        MailMsgAlarm mailMsgAlarm = new MailMsgAlarm();
        mailMsgAlarm.setMailMsg(eventCurrent);
        mailMsgAlarm.setAlarmKey(alarmKey);
        mailMsgAlarm.setStartTime(loginFailStartTime);
        mailMsgAlarm.setEndTime(loginSuccessTime);
        mailMsgAlarm.setFailTimes(loginFailTimes);

        return mailMsgAlarm;
    }

    /**
     * 2022年6月17日14:47:53
     * @param mailMsgAlarm :当前构建的需要告警的事件
     */
    public void doAlarmPush(MailMsgAlarm mailMsgAlarm){
        String userKey = mailMsgAlarm.getAlarmKey();
        String clientIp = mailMsgAlarm.mailMsg.getClient_ip();
        boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp);
        if(isWhiteListIp){//如果是白名单IP,不告警
            log.info(\"当前登录用户【\" + userKey + \"】属于白名单IP\");
        }else {
            //IP归属查询结果、企业微信推送告警
            String user = HttpUtils.getUserByClientIp(clientIp);
            HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString());
        }
    }
}

最后一次map算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  逻辑统计场景告警推送ES消息体
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

ElasticSearch工具

package com.data.dev.elasticsearch;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.key.ElasticSearchKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 2022年6月17日15:15:06
 * @author wangxiaoming-ghq
 * Flink流计算结果写入ES公共方法
 */
@Slf4j
public class SinkToEs extends BaseBean {
    public static final long serialVersionUID = 2L;
    private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps();
    private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST);
    private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD);
    private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME);
    private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT);

    /**
     * 2022年6月17日15:17:55
     * 获取ES连接信息
     * @return esInfoMap:ES连接信息持久化
     */
    public static HashMap<String,String > getElasticSearchInfo(){
        log.info(\"获取ES连接信息:【 \" + \"HOST=\"+HOST + \"PORT=\"+PORT+\"USERNAME=\"+USERNAME+\"PASSWORD=********\" + \" 】\");
        HashMap<String,String> esInfoMap = new HashMap<>();
        esInfoMap.put(ElasticSearchKey.HOST,HOST);
        esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD);
        esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME);
        esInfoMap.put(ElasticSearchKey.PORT,PORT);

        return esInfoMap;
    }

    /**
     * @param esIndexName:写入索引名称
     * @param esType:写入索引类型
     * @return ElasticsearchSink.Builder<String>:构建器
     */
    public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){
        HashMap<String, String> esInfoMap = getElasticSearchInfo();
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), \"http\"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest() {
                        Map<String, String> json = new HashMap<>();
                        //log.info(\"写入ES的data:【\"+json+\"】\");
                        IndexRequest index  = Requests.indexRequest()
                                .index(esIndexName)
                                .type(esType)
                                .source(json);
                        return index;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest());
                    }
                }
        );


        //定义es的连接配置  带用户名密码
        RestClientFactory restClientFactory = restClientBuilder -> {
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials(
                            String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)),
                            String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD))
                    )
            );
            restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                httpAsyncClientBuilder.disableAuthCaching();
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            });
        };

        esSinkBuilder.setRestClientFactory(restClientFactory);
        return esSinkBuilder;
    }

}

事件实体类

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;


/**
 * @author wangxiaoming-ghq 2022-05-15
 * 逻辑统计场景告警事件
 */
@Data
public class MailMsgAlarm extends BaseBean {


    /**
     * 当前登录成功的事件
     */
   public  MailMsg mailMsg;

    /**
     * 当前捕获的告警主键:username@client_ip
     */
   public  String alarmKey;

    /**
     * 第一次登录失败的事件时间
     */
   public  String startTime;

    /**
     * 连续登录失败后下一次登录成功的事件时间
     */
   public  String endTime;

    /**
     * 连续登录失败的次数
     */
   public  int failTimes;

    @Override
    public String toString() {
        return \"{\" +
                \"  \'mailMsg_login_success\':\'\" + mailMsg + \"\'\" +
                \", \'alarmKey\':\'\" + alarmKey + \"\'\" +
                \", \'start_login_time_in3min\':\'\"  +startTime + \"\'\" +
                \", \'end_login_time_in3min\':\'\"  +endTime + \"\'\" +
                \", \'login_fail_times\':\'\"  +failTimes +  \"\'\" +
                \"}\";
    }

    public MailMsgAlarm() {
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsgAlarm)) return false;
        MailMsgAlarm that = (MailMsgAlarm) o;
        return getFailTimes() == that.getFailTimes() && getMailMsg().equals(that.getMailMsg()) && getAlarmKey().equals(that.getAlarmKey()) && getStartTime().equals(that.getStartTime()) && getEndTime().equals(that.getEndTime());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getMailMsg(), getAlarmKey(), getStartTime(), getEndTime(), getFailTimes());
    }
}

消息实体类

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;

/**
 * {
 *   \"user\": \"wangxm\",
 *   \"client_ip\": \"110.68.6.182\",
 *   \"source\": \"login\",
 *   \"loginname\": \"wangxm@test.com\",
 *   \"IP\": \"110.8.148.58\",
 *   \"timestamp\": \"17:58:12\",
 *   \"@timestamp\": \"2022-04-20T09:58:13.647Z\",
 *   \"ip\": \"110.7.231.25\",
 *   \"clienttype\": \"POP3\",
 *   \"result\": \"success\",
 *   \"@version\": \"1\"
 * }
 *
 * user登录用户
 * client_ip 来源ip
 * source 类型
 * loginname 登录用户邮箱地址
 * ip 目标前端ip
 * timestamp 发送时间
 * @timestamp  发送日期时间
 * IP 邮件日志发送来源IP
 * clienttype 客户端登录类型
 * result 登录状态
 */

@Data
public class MailMsg extends BaseBean {
    public String user;
    public String client_ip;
    public String source;
    public String loginName;
    public String mailSenderSourceIp;
    public String timestamp_time;
    public String timestamp_datetime;
    public String ip;
    public String clientType;
    public String result;
    public String version;

    public MailMsg() {
    }

    public MailMsg(String user, String client_ip, String source, String loginName, String mailSenderSourceIp, String timestamp_time, String timestamp_datetime, String ip, String clientType, String result, String version) {
        this.user = user;
        this.client_ip = client_ip;
        this.source = source;
        this.loginName = loginName;
        this.mailSenderSourceIp = mailSenderSourceIp;
        this.timestamp_time = timestamp_time;
        this.timestamp_datetime = timestamp_datetime;
        this.ip = ip;
        this.clientType = clientType;
        this.result = result;
        this.version = version;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsg)) return false;
        MailMsg mailMsg = (MailMsg) o;
        return getUser().equals(mailMsg.getUser()) && getClient_ip().equals(mailMsg.getClient_ip()) && getSource().equals(mailMsg.getSource()) && getLoginName().equals(mailMsg.getLoginName()) && getMailSenderSourceIp().equals(mailMsg.getMailSenderSourceIp()) && getTimestamp_time().equals(mailMsg.getTimestamp_time()) && getTimestamp_datetime().equals(mailMsg.getTimestamp_datetime()) && getIp().equals(mailMsg.getIp()) && getClientType().equals(mailMsg.getClientType()) && getResult().equals(mailMsg.getResult()) && getVersion().equals(mailMsg.getVersion());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getUser(), getClient_ip(), getSource(), getLoginName(), getMailSenderSourceIp(), getTimestamp_time(), getTimestamp_datetime(), getIp(), getClientType(), getResult(), getVersion());
    }

    @Override
    public String toString() {
        return \"{\" +
                \"  \'user\':\'\" + user + \"\'\" +
                \", \'client_ip\':\'\" + client_ip  + \"\'\" +
                \", \'source\':\'\" + source  + \"\'\" +
                \", \'loginName\':\'\" + loginName  + \"\'\" +
                \", \'IP\':\'\" + mailSenderSourceIp + \"\'\" +
                \", \'timestamp\':\'\" + timestamp_time + \"\'\" +
                \", \'@timestamp\':\'\" + timestamp_datetime + \"\'\" +
                \", \'ip\':\'\"  + \"\'\" +
                \", \'clientType\':\'\" + clientType  + \"\'\" +
                \", \'result\':\'\" + result  + \"\'\" +
                \", \'version\':\'\" + version + \"\'\" +
                \"}\";
    }

}

源代码已去掉敏感信息,地址:https://gitee.com/wangxm-2270/alarmCollectByFlink.git

资源下载此资源下载价格为1小猪币,终身VIP免费,请先
由于本站资源来源于互联网,以研究交流为目的,所有仅供大家参考、学习,不存在任何商业目的与商业用途,如资源存在BUG以及其他任何问题,请自行解决,本站不提供技术服务! 由于资源为虚拟可复制性,下载后不予退积分和退款,谢谢您的支持!如遇到失效或错误的下载链接请联系客服QQ:442469558

:本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可, 转载请附上原文出处链接。
1、本站提供的源码不保证资源的完整性以及安全性,不附带任何技术服务!
2、本站提供的模板、软件工具等其他资源,均不包含技术服务,请大家谅解!
3、本站提供的资源仅供下载者参考学习,请勿用于任何商业用途,请24小时内删除!
4、如需商用,请购买正版,由于未及时购买正版发生的侵权行为,与本站无关。
5、本站部分资源存放于百度网盘或其他网盘中,请提前注册好百度网盘账号,下载安装百度网盘客户端或其他网盘客户端进行下载;
6、本站部分资源文件是经压缩后的,请下载后安装解压软件,推荐使用WinRAR和7-Zip解压软件。
7、如果本站提供的资源侵犯到了您的权益,请邮件联系: 442469558@qq.com 进行处理!

猪小侠源码-最新源码下载平台 Java教程 基于FLink实现实时安全检测的示例代码 http://www.20zxx.cn/706101/xuexijiaocheng/javajc.html

猪小侠源码,优质资源分享网

常见问题
  • 本站所有资源版权均属于原作者所有,均只能用于参考学习,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担
查看详情
  • 最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,建议提前注册好百度网盘账号,使用百度网盘客户端下载
查看详情

相关文章

官方客服团队

为您解决烦忧 - 24小时在线 专业服务