Administrator
Published on 2024-06-24 / 33 Visits
0
0

python操作Logs Insights,查询所有日志保存csv

利用python在AWS中的CloudWatch下使用Logs Insights查询并保存成csv文件

Logs Insights在网页或者python 通过api查询最多只能查询到1万条,这边利用多次查询,控制时间查询整合满足提取查询所有日志,注意点事需要访问多次接口,会产生查询费用

···

import time
from datetime import datetime, timedelta, timezone
import boto3
import csv
import botocore
import os

# 设置 AWS 访问凭证
aws_access_key_id = '你的用户key'
aws_secret_access_key = '你的用户secret'
aws_default_region = 'AWS账号的地区'
log_group_name = '输入你要查询的目标桶'  # 查询的目标桶
query = "CloudWatch中的Logs Insights查询语句"

# 初始化查询时间
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")  # 这是昨天的时间
start_datetime = datetime.strptime(yesterday + 'T00:00:00Z', "%Y-%m-%dT%H:%M:%SZ")  # 定义起始时间为昨天0点
end_datetime = datetime.strptime(yesterday + 'T23:59:59Z', "%Y-%m-%dT%H:%M:%SZ")  #定义起始时间为昨天23.59分

# 转换为 UTC 时间并转换为毫秒级时间戳(确保是整数类型)
start_timestamp = int(start_datetime.replace(tzinfo=timezone.utc).timestamp() * 1000)
end_timestamp = int(end_datetime.replace(tzinfo=timezone.utc).timestamp() * 1000)


# 函数执行 AWS 日志抓取
def query_logs_insights(query, log_group_name, aws_access_key_id, aws_secret_access_key, aws_default_region,
                        start_timestamp, end_timestamp):
    # 创建 CloudWatch Logs 客户端
    client = boto3.client('logs', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key,
                          region_name=aws_default_region)

    # 定义csv文件生成时间
    current_date = datetime.now().strftime('%Y%m%d%H%M')

    try:
        init_mun = 1800  # 单次查询的间隔,相当于我一次只查询半小时的数据,计算为30分钟*60秒
        switch = 0  # 判断查询的频率
        csv_info = 0  # 表头控制开关
        # 循环查询每半小时的时间段,直到覆盖整个查询范围
        query_end_time = end_timestamp
        while start_timestamp < end_timestamp:

            # Start query
            response = client.start_query(
                logGroupName=log_group_name,
                queryString=query,
                startTime=start_timestamp,
                endTime=query_end_time,
                limit=10000  # 设置查询返回的最大结果数
            )

            query_id = response['queryId']
            print(f'Started query with id: {query_id}')

            # 等待查询完成
            while True:
                response = client.get_query_results(queryId=query_id)
                if response['status'] == 'Complete':
                    break
                time.sleep(1)

            # 获取查询结果
            query_results = response.get('results', [])

            # 处理结果并写入 CSV 文件
            if query_results:
                # 将查询结果整理成需要的格式,合并 @log_trace 到 @message 中
                formatted_results = []
                for event in query_results:
                    event_dict = {field['field']: field['value'] for field in event}
                    # 合并 @log_trace 到 @message 中
                    if '@log_trace' in event_dict:
                        event_dict['@message'] += '\n' + event_dict.pop('@log_trace')

                    formatted_results.append(event_dict)

                # 计算单次查询是否有10000个,有则不运行缩短查询时间
                numb = len(formatted_results)

                if int(numb) >= 10000 and switch == 0:
                    query_end_time = min(start_timestamp + int(init_mun) * 1000, end_timestamp)
                    init_mun = int(init_mun / 2)
                    miniute = int(init_mun / 30)
                    print(f'当前查询的是每{miniute}分钟的频率,查询不成功')
                    continue
                elif int(numb) < 10000 and switch == 0:
                    # 更新查询开始时间为当前查询结束时间
                    start_timestamp = query_end_time
                    query_end_time = min(start_timestamp + int(init_mun) * 1000, end_timestamp)
                    minites = int(init_mun / 30)
                    print(f'当前查询的是每{minites}分钟的频率,查询成功')
                else:
                    print("目前预留给外部传参")
                # 提取所有查询出来的字段,准备写入csv文件
                field_names = list(formatted_results[0].keys())

                for event in formatted_results:
                    for field in field_names:
                        event.setdefault(field, '')  # 默认为空字符串

                file_path = "output_file-" + current_date + ".csv"   # 定义保存文件的格式
                dir_path = os.path.dirname(file_path)
                if dir_path:
                    os.makedirs(dir_path, exist_ok=True)

                if not os.path.exists(file_path):
                    with open(file_path, 'w') as f:
                        pass  # 空操作,或者写入你需要的内容
                # 将查询结果写入 CSV 文件
                with open(file_path, 'a', newline='', encoding='utf-8') as f1:
                    writer = csv.DictWriter(f1, fieldnames=field_names)
                    if not csv_info:  # 如果尚未写入表头
                        writer.writeheader()
                        csv_info = True
                    writer.writerows(formatted_results)

            print(f'日志已保存到文件,当前查询结束时间戳: {query_end_time}')

    except botocore.exceptions.ClientError as e:
        print(f"Error starting or retrieving query: {e}")


def main():
    # 执行查询
    query_logs_insights(query, log_group_name, aws_access_key_id, aws_secret_access_key, aws_default_region,
                        start_timestamp, end_timestamp)

···


Comment