python3实现日志监控并发送报警信息到企业微信群

为及时响应服务中的异常和错误信息,需要对日志进行监控,并发送错误信息到企业微信群。实现主要包括两个部分:

一、监控日志信息

因为公司的生产环境部署了elasticsearch和kibana服务,可以通过调用elasticsearch的api直接进行相关查询。对于elasticsearch不了解的,可以看看中文版的es指南,其中内容有部分已过时,英文好的同学阅读英文文档为宜。本次功能主要用到其中的请求体查询部分,在python中调用,需要先安装elasticsearch模块

pip3 install elasticsearch

使用起来很简单,先连接elasticsearch服务器,然后调用search方法进行相关查询:

client = Elasticsearch(hosts=['hostname']) 默认端口是9200

results = client.search(index=index_name, body=query_body)其中index可以是数组,重点是query_body,其格式是map,根据es文档,举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"query": {
"bool": {
"must": {
"match": {"level": "ERROR"}
},
"filter": {
"range": {
"@timestamp": {
"gt": "now-5m"
}
}
}
}
}
}

最外层称为请求体,key为”query:”,value称为查询语句,可以为单个查询语句,或者是多个查询语句的组合,常用的查询语句有以下几类:

  • match查询:在全文字段上应用时,会用合适的分析器去分析查询字符串

    {"match": {"level": "ERROR"}}表示level字段中包含ERROR字符串的匹配

    在精确值的字段上应用时,例如数字、日期、布尔等,会精确匹配给定的值

    {"match": {"age": 26}}

  • multi_match查询:相当于将多个match语句组合,可以在多个字段上查询

    1
    2
    3
    4
    5
    6
    {
    "multi_match": {
    "message": "full text search",
    "field": ["title", "body"]
    }
    }
  • range查询:找出落在指定区间的数字或者时间

    1
    2
    3
    4
    5
    6
    7
    {
    "range": {
    "@timestamp": {
    "gt": "now-5m"
    }
    }
    }

    上面查询表示当前时间到五分钟前的时间区间内,允许的操作符有gt(大于),gte(大于等于),lt(小于),lte(小于等于)

  • term查询:用于精确匹配

    {"term": {"tag": "full_text"}}

  • terms查询:顾名思义,可以指定多个查询值,组合为或操作

    {"terms": {"tags": ["search", "full_text", "nosql"]}}

如果要组合多个查询,最常用的是bool语句,接受以下参数:

  • must

  • must_not

  • should

  • filter

    其含义可以望文生义,指南中的例子如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    {
    "bool": {
    "must": { "match": { "title": "how to make millions" }},
    "must_not": { "match": { "tag": "spam" }},
    "should": [
    { "match": { "tag": "starred" }}
    ],
    "filter": {
    "bool": {
    "must": [
    { "range": { "date": { "gte": "2014-01-01" }}},
    { "range": { "price": { "lte": 29.99 }}}
    ],
    "must_not": [
    { "term": { "category": "ebooks" }}
    ]
    }
    }
    }
    }

    查询的匹配会保存在返回结果results[“hits”][“hits”]中,默认10条匹配结果,可以在设置search()方法的size参数来改变结果集大小,如果要返回所有结果,需要用到scroll相关接口(未完成的工作)。

    二、将获取的信息发送到企业微信群

    分为三步:

    1. 获取企业微信token
    2. 构建所需模式的报警信息
    3. 发送报警信息到微信群

    参见官方开发文档

    附:参考代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    #!/usr/bin/env python3
    import requests
    import json
    import logging
    from elasticsearch import Elasticsearch

    logging.basicConfig(filename='send_alert_wechat_msg.log', format='%(asctime)s %(message)s', level=logging.DEBUG)

    # 企业微信相关配置
    corpid = 'xxxxx' # 企业id
    corpsecret = 'xxxxxx'
    url = 'https://qyapi.weixin.qq.com' # 官方API接口
    chatid = 'xxxxx' # 用官方API创建的群聊ID

    # es查询语句
    query_body = {
    "query": {
    "bool": {
    "must": {
    "match": {"level": "ERROR"}
    },
    "filter": {
    "range": {
    "@timestamp": {
    "gt": "now-5m"
    }
    }
    }
    }
    }
    }


    # 获取企业微信token
    def get_token(url, corpid, corpsecret):
    token_url = '%s/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (url, corpid, corpsecret)
    token = requests.get(token_url).json()['access_token']
    return token


    # 构建告警信息json,使用文本卡片模式
    def messages(msg):
    values = {
    "chatid": chatid,
    "msgtype": 'textcard',
    "textcard": {
    "title": "Error Alert",
    "description": msg,
    "url": "https://betame.cn",
    "btntxt": "more"
    },
    "safe": 0
    }
    msges = (bytes(json.dumps(values), 'utf-8'))
    return msges


    # 发送告警信息
    def send_message(url, token, payload):
    send_url = '%s/cgi-bin/appchat/send?access_token=%s' % (url, token)
    response = requests.post(send_url, data=payload)
    rescode = response.json()['errcode']
    if rescode == 0:
    logging.info('Send wechat message successfully.')
    else:
    logging.info('Send wechat message failed.' + response.json())


    # 连接elasticsearch查询错误信息
    def query_es(query_body):
    client = Elasticsearch()
    res = client.search(index="index-*", body=query_body)
    return res['hits']['hits']


    # 函数调用
    if __name__ == '__main__':
    test_token = get_token(url, corpid, corpsecret)
    desc = '<div class="gray">%(@timestamp)s</div><div class="highlight">%(message)s</div>'

    results = query_es(query_body)
    for result in results:
    payload = desc % result['_source']
    msg_data = messages(payload)
    send_message(url, test_token, msg_data)
如果对您有帮助,可以给作者一点支持和鼓励,不胜感激!