import configparser import requests import json import urllib import hashlib import hmac import base64 from datetime import datetime from elasticsearch import Elasticsearch import logging import os # 配置文件路径 CONFIG_FILE = 'config.ini' # 本地软件列表文件路径 LOCAL_SOFTWARE_FILE = 'white_software.txt' # 日志配置 logging.basicConfig( filename='software_es_search.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def load_config(): """读取配置文件""" config = configparser.ConfigParser() if not os.path.exists(CONFIG_FILE): raise FileNotFoundError(f"配置文件 {CONFIG_FILE} 不存在。") if not config.read(CONFIG_FILE, encoding='utf-8'): raise IOError(f"无法读取配置文件 {CONFIG_FILE}。") return config def get_es_client(config): """创建并返回Elasticsearch客户端实例""" if 'elasticsearch' not in config: logging.error("配置文件中缺少 [elasticsearch] 配置段。") return None es_config = config['elasticsearch'] try: es_client = Elasticsearch( hosts=[{'host': es_config.get('host', '127.0.0.1'), 'port': int(es_config.get('port', 9200)), "scheme": "http"}], basic_auth=(es_config.get('user'), es_config.get('password')) ) if not es_client.ping(): raise ConnectionError("无法连接到Elasticsearch,请检查配置。") logging.info("成功连接到Elasticsearch。") return es_client except Exception as e: logging.error(f"连接Elasticsearch时出错: {e}") return None def get_es_software_list(es_client, index, field, size=1000): """从Elasticsearch获取软件列表""" try: res = es_client.search( index=index, body={ "size": size, "query": {"match_all": {}}, "fields": [field], "_source": False } ) software_list = {hit['fields'][field][0] for hit in res['hits']['hits']} logging.info(f"从Elasticsearch获取到 {len(software_list)} 条软件信息。") return software_list except Exception as e: logging.error(f"从Elasticsearch查询数据时出错: {e}") return set() def get_local_software_list(): """读取本地文件中的软件列表""" try: if not os.path.exists(LOCAL_SOFTWARE_FILE): with open(LOCAL_SOFTWARE_FILE, 'w', encoding='utf-8') as f: logging.warning(f"本地文件 {LOCAL_SOFTWARE_FILE} 不存在,已创建新文件。") return set() with open(LOCAL_SOFTWARE_FILE, 'r', encoding='utf-8') as f: software_list = {line.strip() for line in f if line.strip()} logging.info(f"从本地文件获取到 {len(software_list)} 条软件信息。") return software_list except Exception as e: logging.error(f"读取本地文件时出错: {e}") return set() def update_local_software_list(software_list): """更新本地软件列表文件""" try: with open(LOCAL_SOFTWARE_FILE, 'a', encoding='utf-8') as f: for software in sorted(software_list): f.write(f"{software}\n") logging.info("已更新本地软件列表文件。") except Exception as e: logging.error(f"更新本地文件时出错: {e}") def send_dingtalk_notification(webhook_url, secret, message): """发送钉钉消息""" timestamp = str(round(datetime.now().timestamp() * 1000)) string_to_sign = f'{timestamp}\n{secret}' hmac_code = hmac.new(string_to_sign.encode('utf-8'), digestmod=hashlib.sha256, key=secret.encode('utf-8')).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) headers = {'Content-Type': 'application/json;charset=utf-8'} data = { "msgtype": "markdown", "markdown": { "title": "新软件发现通知", "text": message } } try: response = requests.post( f"{webhook_url}×tamp={timestamp}&sign={sign}", headers=headers, data=json.dumps(data) ) response.raise_for_status() logging.info("钉钉消息发送成功。") except requests.exceptions.RequestException as e: logging.error(f"发送钉钉消息时出错: {e}") logging.error(f"响应内容: {response.text if 'response' in locals() else '无'}") def send_feishu_notification(webhook_url, message): """发送飞书消息""" headers = {'Content-Type': 'application/json;charset=utf-8'} data = { "msg_type": "post", "content": { "post": { "zh_cn": { "title": "新软件发现通知", "content": [ [{ "tag": "text", "text": message }] ] } } } } try: response = requests.post( webhook_url, headers=headers, data=json.dumps(data) ) response.raise_for_status() logging.info("飞书消息发送成功。") except requests.exceptions.RequestException as e: logging.error(f"发送飞书消息时出错: {e}") logging.error(f"响应内容: {response.text if 'response' in locals() else '无'}") def main(): """主函数""" try: config = load_config() except (FileNotFoundError, IOError) as e: logging.error(e) return es_client = get_es_client(config) if not es_client: return es_config = config['elasticsearch'] # 修复:为 get_es_software_list 添加 size 参数,从配置文件获取 es_software = get_es_software_list(es_client, es_config['index'], es_config['field'], size=int(es_config.get('size', 1000))) local_software = get_local_software_list() # 对比列表,找出新增的软件 new_software = es_software - local_software if new_software: logging.info(f"发现 {len(new_software)} 个新软件,正在发送通知...") message = ( f"### : 新软件发现通知\n\n" f"以下是Elasticsearch中新发现的软件列表:\n\n" f"数量:**{len(new_software)}**\n\n" f"----\n\n" + "\n".join([f"- **{s}**" for s in sorted(new_software)]) ) # 修复:获取通知类型并正确遍历 notification_types = config['notifications'].get('type', '').split(',') if 'dingtalk' in [t.strip() for t in notification_types]: for section in config.sections(): if section.startswith('dingtalk_'): dingtalk_config = config[section] send_dingtalk_notification( dingtalk_config.get('webhook_url'), dingtalk_config.get('secret'), message ) if 'feishu' in [t.strip() for t in notification_types]: for section in config.sections(): if section.startswith('feishu_'): feishu_config = config[section] # 修复:飞书通知不需要 secret send_feishu_notification( feishu_config.get('webhook_url'), message ) # 修复:将新发现的软件追加到本地文件 update_local_software_list(new_software) else: logging.info("未发现新软件,无需通知。") if __name__ == "__main__": main()