Administrator
Published on 2024-10-24 / 65 Visits
0
0

aws cloudfront文件预热(python SDK实现)

cloudfront需要自己配置缓存方式,此处不做说明,下方直接贴代码,照抄即可,记得修改脚本中的中文标注

import boto3
import requests
from concurrent.futures import ThreadPoolExecutor
from botocore.config import Config
import logging

# 配置信息
# 用户以及地区配置
aws_access_key_id = '用户key_id'
aws_secret_access_key = '用户secert'
aws_default_region = '地区' # 主要是给s3指定地区,例如:ap-southeast-1(新加坡,其余的在aws查看)

# cloudfront与s3配置
cloudfront_domain = '填写域名'  # 在自己的cloudfront 查看域名格式是: *.cloudfront.net
bucket_name = '你的s3桶名称'    # 直接填写bucket桶,例如bucket名字是: test 就直接填写:test
max_workers = 10  # 并发请求数,根据自己需求以及脚本机器性能调整

# 设定输出模式,不喜欢用可移除,移除记得全局匹配logger.info  替换成 print
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 设置错误或超时重试
config = Config(
    retries={
        'max_attempts': 10,  # 最大重试次数
        'mode': 'standard'  # 标准重试模式
    }
)

# 初始化 S3 客户端
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key,
                  region_name=aws_default_region, config=config)


# 分页列出 S3 Bucket 中的所有文件
def list_s3_files(bucket):
    files = []
    continuation_token = None
    try:
        while True:
            # 调用 S3 list_objects_v2 API,处理分页情况
            if continuation_token:
                response = s3.list_objects_v2(Bucket=bucket, ContinuationToken=continuation_token)
            else:
                response = s3.list_objects_v2(Bucket=bucket)

            # 检查是否有文件
            if 'Contents' in response:
                files.extend([item['Key'] for item in response['Contents']])

            # 判断是否有更多文件需要分页获取
            if response.get('IsTruncated'):  # 如果返回被截断了,说明还有更多文件
                continuation_token = response.get('NextContinuationToken')
            else:
                break  # 没有更多文件时,退出循环
    except Exception as e:
        logger.info(f"Error listing files in bucket {bucket}: {e}")
        return []

    return files


# 预热单个资源
def preheat_resource(resource):
    url = f'https://{cloudfront_domain}/{resource}'
    try:
        response = requests.get(url)
        if response.status_code == 200:
            logger.info(f'Successfully preheated {resource}')
        else:
            logger.info(f'Failed to preheat {resource}. Status code: {response.status_code}')
    except Exception as e:
        logger.info(f'Error preheating {resource}: {e}')


# 预热 CloudFront 中的所有资源
def preheat_cloudfront(resources):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        executor.map(preheat_resource, resources)


# 主函数
if __name__ == "__main__":
    # 获取 S3 中所有的文件路径
    s3_files = list_s3_files(bucket_name)
    if s3_files:
        logger.info(f"Total files to preheat: {len(s3_files)}")
        # 预热 CloudFront 中的所有文件
        preheat_cloudfront(s3_files)
    else:
        logger.info("No files to preheat.")

Comment