cloudfront需要自己配置缓存方式,此处不做说明,下方直接贴代码,照抄即可,记得修改脚本中的中文标注
import boto3
import requests
from concurrent.futures import ThreadPoolExecutor
from botocore.config import Config
import logging
# 配置信息
# 用户以及地区配置
aws_access_key_id = '用户key_id'
aws_secret_access_key = '用户secert'
aws_default_region = '地区' # 主要是给s3指定地区,例如:ap-southeast-1(新加坡,其余的在aws查看)
# cloudfront与s3配置
cloudfront_domain = '填写域名' # 在自己的cloudfront 查看域名格式是: *.cloudfront.net
bucket_name = '你的s3桶名称' # 直接填写bucket桶,例如bucket名字是: test 就直接填写:test
max_workers = 10 # 并发请求数,根据自己需求以及脚本机器性能调整
# 设定输出模式,不喜欢用可移除,移除记得全局匹配logger.info 替换成 print
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 设置错误或超时重试
config = Config(
retries={
'max_attempts': 10, # 最大重试次数
'mode': 'standard' # 标准重试模式
}
)
# 初始化 S3 客户端
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key,
region_name=aws_default_region, config=config)
# 分页列出 S3 Bucket 中的所有文件
def list_s3_files(bucket):
files = []
continuation_token = None
try:
while True:
# 调用 S3 list_objects_v2 API,处理分页情况
if continuation_token:
response = s3.list_objects_v2(Bucket=bucket, ContinuationToken=continuation_token)
else:
response = s3.list_objects_v2(Bucket=bucket)
# 检查是否有文件
if 'Contents' in response:
files.extend([item['Key'] for item in response['Contents']])
# 判断是否有更多文件需要分页获取
if response.get('IsTruncated'): # 如果返回被截断了,说明还有更多文件
continuation_token = response.get('NextContinuationToken')
else:
break # 没有更多文件时,退出循环
except Exception as e:
logger.info(f"Error listing files in bucket {bucket}: {e}")
return []
return files
# 预热单个资源
def preheat_resource(resource):
url = f'https://{cloudfront_domain}/{resource}'
try:
response = requests.get(url)
if response.status_code == 200:
logger.info(f'Successfully preheated {resource}')
else:
logger.info(f'Failed to preheat {resource}. Status code: {response.status_code}')
except Exception as e:
logger.info(f'Error preheating {resource}: {e}')
# 预热 CloudFront 中的所有资源
def preheat_cloudfront(resources):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(preheat_resource, resources)
# 主函数
if __name__ == "__main__":
# 获取 S3 中所有的文件路径
s3_files = list_s3_files(bucket_name)
if s3_files:
logger.info(f"Total files to preheat: {len(s3_files)}")
# 预热 CloudFront 中的所有文件
preheat_cloudfront(s3_files)
else:
logger.info("No files to preheat.")