ElastAlert监控部署


ElastAlert监控部署

elastAlert介绍

ElastAlert是一个简单的框架,用于从Elasticsearch中的数据中发出异常,尖峰或其他感兴趣的模式的警报。

elastAlert部署-常见规则

创建配置文件

mkdir -p /data/elastalert/{rules,elastalert_modules}

创建elasticalert规则

cat > /data/elastalert/elastalert.yaml <<EOF
rrules_folder: /opt/elastalert/rules
run_every:
  minutes: 1
buffer_time:
  minutes: 15
es_host: 192.168.1.xxx
es_port: 9200
#es_username: elastic
#es_password: xxx
writeback_index: elastalert_status
alert_time_limit:
  days: 2
EOF

创建报警规则

cat > /data/elastalert/rules/a.yaml <<EOF
name: awen-nginx
# 报警类型
type: frequency
#type: any
#匹配索引
index: logstash-nginx-*
#是否启用
is_enabled: true
#配置数量,和timeframe配置使用,这里是指1分钟内至少匹配到一条数据
num_events: 1
#多长时间内不报警,如果设置1分钟,那么只报警一次,其他的抑制,如果设置0,则报警所有
#realert:
#  minutes: 1
realert:
   minutes: 0
terms_size: 50
timeframe:
  minutes: 1
#增加时间处理函数
match_enhancements:
  - "elastalert_enhancements.TimeEnhancement.TimeEnhancement"
alert_text_type: alert_text_only
alert_text: |
  - nginx日志超 55秒监控报警--

  datetime: {}
  client_ip: {}
  domain: {}
  status: {}
  upstreamtime: {}
  responsetime: {}
  request_method: {}
  server_ip: {}
  upstreamhost: {}
  url: {}
alert_text_args:
  - local_time
  - client_ip
  - domain
  - status
  - upstreamtime
  - responsetime
  - request_method
  - server_ip
  - upstreamhost
  - url
alert:
  - "elastalert_modules.wechat_robot.WechatRobotAlerter"
#wechat_robot_webhook: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx"
#指定wechat webhook地址
wechat_robot_webhook: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx"
wechat_robot_msgtype: "text"
EOF

创建企业微信wechatrobot

cat > /data/elastalert/elastalert_modules/__init__.py <<EOF
EOF
cat > /data/elastalert/elastalert_modules/wechat_robot.py <<EOF
# -*- coding: utf-8 -*-


"""A plugin of ElastAlert for inotify to wechat group robot.
@reference: https://github.com/xuyaoqiang/elastalert-dingtalk-plugin
@wechat_robot: https://work.weixin.qq.com/help?doc_id=13376
@date: 2020-04-30
@author: Zhang
@python: v3.6
@license: MIT
@comment: add time translate(utc to cst)
"""


import json
import requests
import datetime
from elastalert.alerts import Alerter, DateTimeEncoder
from requests.exceptions import RequestException
from elastalert.util import EAException


class WechatRobotAlerter(Alerter):
    """params
    :param wechat_robot_webhook: webhook of wechat group robot.
    :param wechat_robot_msgtype: message type of wechat group robot.
    :param wechat_robot_mentioned_list: mentioned_list of wechat group members.
    """
    required_options = frozenset(['wechat_robot_webhook', 'wechat_robot_msgtype'])

    def __init__(self, rule):
        super(WechatRobotAlerter, self).__init__(rule)
        self.wechat_robot_webhook = self.rule['wechat_robot_webhook']
        self.wechat_robot_msgtype = self.rule.get('wechat_robot_msgtype', 'text')
        self.wechat_robot_mentioned_list = self.rule.get('wechat_robot_mentioned_list', [])

    def format_body(self, body):
        return body.encode('utf8')

    def utc_to_cst(self, timestamp):
        UTC_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
        utc_time = datetime.datetime.strptime(timestamp, UTC_FORMAT)
        cst_time = utc_time + datetime.timedelta(hours=8)
        return cst_time

    def alert(self, matches):
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json;charset=utf-8"
        }

        try:
            matches[0]['@timestamp'] = self.utc_to_cst(matches[0]['@timestamp'])
        except:
            pass

        body = self.create_alert_body(matches)
        payload = {
            "msgtype": self.wechat_robot_msgtype,
            "text": {
                "content": body,
                "mentioned_list": self.wechat_robot_mentioned_list
            }
        }

        try:
            response = requests.post(self.wechat_robot_webhook,
                                     data=json.dumps(payload, cls=DateTimeEncoder),
                                     headers=headers)
            response.raise_for_status()
        except RequestException as e:
            raise EAException("Error request to wechat_robot: {0}".format(str(e)))

    def get_info(self):
        return {
            "type": "wechat_robot",
            "wechat_robot_webhook": self.wechat_robot_webhook
        }
        pass
EOF

创建时间处理函数

cat > /data/elastalert/elastalert_enhancements/TimeEnhancement.py <<EOF 
from elastalert.util import pretty_ts,ts_to_dt,dt_to_ts,lookup_es_key
from elastalert.enhancements import BaseEnhancement

class TimeEnhancement(BaseEnhancement):
    def process(self, match):
        self.local_time = self.rule.get('local_time_feild', 'local_time')
        self.local_starttime = self.rule.get('local_starttime_feild', 'local_starttime')
        self.local_endtime = self.rule.get('local_endtime_feild', 'local_endtime')
        self.ts_field = self.rule.get('timestamp_field', '@timestamp')
        lt = self.rule.get('use_local_time',"False")

        match_ts = match[self.ts_field]
        match[self.local_time] = pretty_ts(match_ts, lt)
        match[self.local_starttime] = pretty_ts(dt_to_ts(ts_to_dt(match_ts) - self.rule['timeframe']), lt)
        match[self.local_endtime] = match[self.local_time]
EOF

运行容器 docker

docker run  -d --name elastalert --restart=always \
-v /data/elastalert/elastalert.yaml:/opt/elastalert/config.yaml \
-v /data/elastalert/rules:/opt/elastalert/rules  \
-v /data/elastalert/elastalert_modules:/opt/elastalert/elastalert_modules \
-v /data/elastalert/elastalert_enhancements:/opt/elastalert/elastalert_enhancements \
-v /etc/localtime:/etc/localtime \
-v /etc/timezone:/etc/timezone \
-e"CONTAINER_TIMEZONE=Asia/Shanghai"  \
-e"TZ=Asia/Shanghai" \
jertel/elastalert2 --verbose

参考

ElasticAlert介绍

ElasticAlert doc

ElasticAlert Docker

FELK学习

elastAlert ruletype

elastalert github

filter规则

时间处理函数