利用python在AWS中的CloudWatch下使用Logs Insights查询并保存成csv文件
Logs Insights在网页或者python 通过api查询最多只能查询到1万条,这边利用多次查询,控制时间查询整合满足提取查询所有日志,注意点事需要访问多次接口,会产生查询费用
···
import time
from datetime import datetime, timedelta, timezone
import boto3
import csv
import botocore
import os
# 设置 AWS 访问凭证
aws_access_key_id = '你的用户key'
aws_secret_access_key = '你的用户secret'
aws_default_region = 'AWS账号的地区'
log_group_name = '输入你要查询的目标桶' # 查询的目标桶
query = "CloudWatch中的Logs Insights查询语句"
# 初始化查询时间
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") # 这是昨天的时间
start_datetime = datetime.strptime(yesterday + 'T00:00:00Z', "%Y-%m-%dT%H:%M:%SZ") # 定义起始时间为昨天0点
end_datetime = datetime.strptime(yesterday + 'T23:59:59Z', "%Y-%m-%dT%H:%M:%SZ") #定义起始时间为昨天23.59分
# 转换为 UTC 时间并转换为毫秒级时间戳(确保是整数类型)
start_timestamp = int(start_datetime.replace(tzinfo=timezone.utc).timestamp() * 1000)
end_timestamp = int(end_datetime.replace(tzinfo=timezone.utc).timestamp() * 1000)
# 函数执行 AWS 日志抓取
def query_logs_insights(query, log_group_name, aws_access_key_id, aws_secret_access_key, aws_default_region,
start_timestamp, end_timestamp):
# 创建 CloudWatch Logs 客户端
client = boto3.client('logs', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key,
region_name=aws_default_region)
# 定义csv文件生成时间
current_date = datetime.now().strftime('%Y%m%d%H%M')
try:
init_mun = 1800 # 单次查询的间隔,相当于我一次只查询半小时的数据,计算为30分钟*60秒
switch = 0 # 判断查询的频率
csv_info = 0 # 表头控制开关
# 循环查询每半小时的时间段,直到覆盖整个查询范围
query_end_time = end_timestamp
while start_timestamp < end_timestamp:
# Start query
response = client.start_query(
logGroupName=log_group_name,
queryString=query,
startTime=start_timestamp,
endTime=query_end_time,
limit=10000 # 设置查询返回的最大结果数
)
query_id = response['queryId']
print(f'Started query with id: {query_id}')
# 等待查询完成
while True:
response = client.get_query_results(queryId=query_id)
if response['status'] == 'Complete':
break
time.sleep(1)
# 获取查询结果
query_results = response.get('results', [])
# 处理结果并写入 CSV 文件
if query_results:
# 将查询结果整理成需要的格式,合并 @log_trace 到 @message 中
formatted_results = []
for event in query_results:
event_dict = {field['field']: field['value'] for field in event}
# 合并 @log_trace 到 @message 中
if '@log_trace' in event_dict:
event_dict['@message'] += '\n' + event_dict.pop('@log_trace')
formatted_results.append(event_dict)
# 计算单次查询是否有10000个,有则不运行缩短查询时间
numb = len(formatted_results)
if int(numb) >= 10000 and switch == 0:
query_end_time = min(start_timestamp + int(init_mun) * 1000, end_timestamp)
init_mun = int(init_mun / 2)
miniute = int(init_mun / 30)
print(f'当前查询的是每{miniute}分钟的频率,查询不成功')
continue
elif int(numb) < 10000 and switch == 0:
# 更新查询开始时间为当前查询结束时间
start_timestamp = query_end_time
query_end_time = min(start_timestamp + int(init_mun) * 1000, end_timestamp)
minites = int(init_mun / 30)
print(f'当前查询的是每{minites}分钟的频率,查询成功')
else:
print("目前预留给外部传参")
# 提取所有查询出来的字段,准备写入csv文件
field_names = list(formatted_results[0].keys())
for event in formatted_results:
for field in field_names:
event.setdefault(field, '') # 默认为空字符串
file_path = "output_file-" + current_date + ".csv" # 定义保存文件的格式
dir_path = os.path.dirname(file_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
if not os.path.exists(file_path):
with open(file_path, 'w') as f:
pass # 空操作,或者写入你需要的内容
# 将查询结果写入 CSV 文件
with open(file_path, 'a', newline='', encoding='utf-8') as f1:
writer = csv.DictWriter(f1, fieldnames=field_names)
if not csv_info: # 如果尚未写入表头
writer.writeheader()
csv_info = True
writer.writerows(formatted_results)
print(f'日志已保存到文件,当前查询结束时间戳: {query_end_time}')
except botocore.exceptions.ClientError as e:
print(f"Error starting or retrieving query: {e}")
def main():
# 执行查询
query_logs_insights(query, log_group_name, aws_access_key_id, aws_secret_access_key, aws_default_region,
start_timestamp, end_timestamp)
···