AWS S3 事件通知 + SQS 消息队列:Java 实现文件上传自动化处理的完整实战指南

AWS S3 事件通知 + SQS 消息队列:Java 实现文件上传自动化处理的完整实战指南

摘要 :在现代云原生应用中,文件上传后的自动化处理(如缩略图生成、格式转换、内容审核等)是一个极为常见的需求。本文以真实生产项目为基础,详细讲解如何利用 AWS S3 事件通知 + SQS 消息队列构建一套事件驱动架构,并在 Java(Spring)项目中实现完整的监听 → 过滤 → 异步处理链路,涵盖防循环触发、并发控制、内存安全等生产级关键细节。

目录

1.背景与需求分析
2.架构设计与方案对比
3.AWS 侧配置详解
4.Java 代码实现详解
5.生产环境关键设计
6.监控、运维与故障排查
7.性能优化与成本控制
8.总结与最佳实践

一、背景与需求分析

1.1 业务场景

在我们的内容管理系统中,用户每天通过多种方式(Web 端直传、API 上传、后台批量导入等)向 S3 上传大量图片和视频文件。为了提升前端展示性能和用户体验,我们需要:

  • 图片上传后:自动生成 WebP 格式的缩略图,减小页面加载体积

  • 视频上传后:自动提取首帧作为封面图,供列表页展示

  • 处理结果:缩略图统一存放到同桶的约定目录下,文件名带 _thumbnail标识

1.2 技术挑战

二、架构设计与方案对比

2.1 方案一:应用层手动发布事件(传统方案)
用户上传 → 应用代码发布事件 → 消息队列 → 消费处理
缺点:
  • 需要在所有上传入口(Controller、Service、定时任务等)手动埋点发送事件
  • 容易遗漏,尤其是直接通过 AWS Console 或 SDK 上传的场景
  • 业务代码与缩略图逻辑强耦合
2.2 方案二:S3 事件通知 + SQS(推荐方案)
文件上传到 S3 → S3 自动发送事件通知 → SQS 队列 → Java 监听器消费 → 异步处理 → 结果回传 S3
优点:
  • 无需修改任何上传代码,零侵入
  • 覆盖所有上传方式(SDK、Console、CLI、跨账号复制等)完全解耦,可独立扩展
  • AWS 原生架构,成熟可靠
2.3 最终架构图
┌──────────────┐         ┌──────────────┐         ┌──────────────────────┐

│   客户端/API  │ upload  │    AWS S3    │  event  │     AWS SQS          │

│  (多种入口)   │ ──────→ │   Bucket     │ ──────→ │  标准队列             │

└──────────────┘         └──────────────┘         └──────────┬───────────┘

                                                             │ poll

                                                             ▼

                                                   ┌──────────────────┐

                                                   │ ThumbnailListener │

                                                   │ (@SqsListener)    │

                                                   └────────┬─────────┘

                                                            │ 过滤 + 构建事件

                                                            ▼

                                                   ┌──────────────────┐

                                                   │ 内部延迟队列       │

                                                   │ (SuishenQueue)    │

                                                   └────────┬─────────┘

                                                            │ 异步消费

                                                            ▼

                                                ┌───────────────────────┐

                                                │ ThumbnailEventHandler │

                                                │ (Semaphore 限流)       │

                                                └────────┬──────────────┘

                                                         │

                                          ┌──────────────┼──────────────┐

                                          ▼              ▼              ▼

                                     图片缩略图      视频首帧        不支持的类型

                                     (→ WebP)      (→ WebP)        (跳过)

                                          │              │

                                          ▼              ▼

                                   ┌────────────────────────┐

                                   │  上传到 S3 thumbnails    │

                                   │  目录                    │

                                   └────────────────────────┘

三、AWS 侧配置详解

3.1 创建 SQS 队列

登录 AWS 控制台,进入 SQS 服务:

1.点击 "创建队列"
2.队列类型:标准队列(Standard Queue)
3.队列名称:s3-upload-thumbnail-notifications
4.关键参数配置:
5.记录队列 URL(后续配置需要),格式如:

https://sqs.us-west-2.amazonaws.com/514246740424/s3-upload-thumbnail-notifications  
3.2 配置 SQS 访问策略

在队列的 "访问策略"中添加以下策略,允许 S3 服务向该队列发送消息:

{

  "Version": "2012-10-17",

  "Statement": [

    {

      "Sid": "AllowS3ToSendMessage",

      "Effect": "Allow",

      "Principal": {

        "Service": "s3.amazonaws.com"

      },

      "Action": "SQS:SendMessage",

      "Resource": "arn:aws:sqs:us-west-2:514246740424:s3-upload-thumbnail-notifications",

      "Condition": {

        "ArnLike": {

          "aws:SourceArn": "arn:aws:s3:::your-bucket-name"

        }

      }

    }

  ]

}

安全提示:Condition中的 aws:SourceArn限制了只有指定的 S3 桶才能发送消息,务必配置,防止其他桶的消息混入。

3.3 配置 S3 事件通知

进入 S3 桶 → 属性→ 事件通知→ 创建事件通知: 防循环关键点:如果你的缩略图也上传到同一个桶,强烈建议配置前缀过滤或后缀排除,从 AWS 层面就避免缩略图文件触发新事件。

3.4 S3 事件通知消息格式

当文件上传到 S3 时,AWS 自动发送如下 JSON 消息到 SQS:

{

  "Records": [

    {

      "eventVersion": "2.1",

      "eventSource": "aws:s3",

      "awsRegion": "us-west-2",

      "eventTime": "2026-02-11T10:00:00.000Z",

      "eventName": "ObjectCreated:Put",

      "s3": {

        "bucket": {

          "name": "your-bucket-name",

          "arn": "arn:aws:s3:::your-bucket-name"

        },

        "object": {

          "key": "growth/original/photo.jpg",

          "size": 1024000,

          "eTag": "abc123def456",

          "sequencer": "0A1B2C3D4E5F6789"

        }

      }

    }

  ]

}

关键字段说明:

  • eventName:事件类型,用于过滤(如只处理 ObjectCreated:*)
  • s3.bucket.name:桶名,用于后续 S3 操作
  • s3.object.key:对象键(文件路径),注意是URL 编码的,代码中需要解码
  • s3.object.size:文件大小(字节),可用于过滤超大文件

四、Java 代码实现详解

4.1 Maven 依赖配置
<!-- AWS SDK BOM(统一版本管理) -->

<dependencyManagement>

    <dependencies>

        <dependency>

            <groupId>software.amazon.awssdk</groupId>

            <artifactId>bom</artifactId>

            <version>2.20.157</version>

            <type>pom</type>

            <scope>import</scope>

        </dependency>

    </dependencies>

</dependencyManagement>


<dependencies>

    <!-- AWS S3 SDK v2 -->

    <dependency>

        <groupId>software.amazon.awssdk</groupId>

        <artifactId>s3</artifactId>

    </dependency>


    <!-- AWS SQS SDK v2 -->

    <dependency>

        <groupId>software.amazon.awssdk</groupId>

        <artifactId>sqs</artifactId>

    </dependency>


    <!-- AWS SQS SDK v1(Spring Cloud AWS 依赖) -->

    <dependency>

        <groupId>com.amazonaws</groupId>

        <artifactId>aws-java-sdk-sqs</artifactId>

        <version>1.12.529</version>

    </dependency>


    <!-- Spring Cloud AWS Messaging(提供 @SqsListener 注解) -->

    <dependency>

        <groupId>io.awspring.cloud</groupId>

        <artifactId>spring-cloud-aws-messaging</artifactId>

        <version>2.4.4</version>

    </dependency>


    <!-- 图片缩略图生成库 -->

    <dependency>

        <groupId>net.coobird</groupId>

        <artifactId>thumbnailator</artifactId>

        <version>0.4.21</version>

    </dependency>


    <!-- WebP 图片格式支持 -->

    <dependency>

        <groupId>org.sejda.imageio</groupId>

        <artifactId>webp-imageio</artifactId>

        <version>0.1.6</version>

    </dependency>

</dependencies>  

说明:项目同时使用了 AWS SDK v1(Spring Cloud AWS 依赖)和 v2(S3 操作),这是因为 spring-cloud-aws-messaging 2.x底层依赖 v1 的 SQS 客户端。如果使用 Spring Cloud AWS 3.x,则可以统一到 v2。

4.2 应用配置文件
# ============ AWS 基础配置 ============

aws.s3.accessKey=YOUR_ACCESS_KEY

aws.s3.secretKey=YOUR_SECRET_KEY

aws.s3.region=us-west-2


# ============ SQS 缩略图队列配置 ============

# 是否启用 SQS 监听(可作为总开关)

aws.sqs.thumbnail.enabled=true

# SQS 队列 URL

aws.sqs.thumbnail.queue.url=https://sqs.us-west-2.amazonaws.com/514246740424/s3-upload-thumbnail-notifications


# ============ 缩略图生成配置 ============

# 缩略图质量(0.0-1.0,值越小文件越小)

aws.s3.thumbnail.quality=0.3

# 是否保留原始尺寸(仅压缩质量)

aws.s3.thumbnail.keep.original.size=true

# 是否启用内存监控(生产环境建议开启)

aws.s3.thumbnail.memory.monitor.enabled=true  
4.3 条件化加载:SqsEnabledCondition

在非 Spring Boot 环境(如传统 Spring MVC 项目)中,我们需要自定义 Condition来控制 Bean 的创建:

@Slf4j

public class SqsEnabledCondition implements Condition {


    @Override

    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {

        String enabled = null;


        // 方式1: 尝试从 Spring Environment 读取

        Environment env = context.getEnvironment();

        if (env != null) {

            enabled = env.getProperty("aws.sqs.thumbnail.enabled");

        }


        // 方式2: 直接读取 classpath 下的配置文件(兜底)

        if (enabled == null && context.getResourceLoader() != null) {

            Resource resource = context.getResourceLoader()

                .getResource("classpath:config.properties");

            if (resource.exists()) {

                Properties props = new Properties();

                try (InputStream is = resource.getInputStream()) {

                    props.load(is);

                    enabled = props.getProperty("aws.sqs.thumbnail.enabled");

                }

            }

        }


        boolean result = "true".equalsIgnoreCase(enabled);

        log.info("SQS 条件判断: aws.sqs.thumbnail.enabled={}, Bean创建决策: {}",

                enabled, result ? "创建" : "跳过");

        return result;

    }

}
设计意图:
  • 通过配置开关控制整个 SQS 监听链路是否启用
  • 支持多环境差异化配置(开发环境关闭,生产环境开启)
  • 双重读取策略兼容不同的 Spring 配置加载方式
4.4 SQS 配置类:AwsSqsConfig
@Slf4j

@Configuration

public class AwsSqsConfig {


    @Value("${aws.s3.accessKey}")

    private String awsAccessKey;


    @Value("${aws.s3.secretKey}")

    private String awsSecretKey;


    @Value("${aws.s3.region:us-west-2}")

    private String awsRegion;


    /**

     * 创建 SQS 异步客户端

     */

    @Bean

    @Conditional(SqsEnabledCondition.class)

    public AmazonSQSAsync amazonSQSAsync() {

        BasicAWSCredentials credentials =

            new BasicAWSCredentials(awsAccessKey, awsSecretKey);


        return AmazonSQSAsyncClientBuilder.standard()

                .withRegion(Regions.fromName(awsRegion))

                .withCredentials(new AWSStaticCredentialsProvider(credentials))

                .build();

    }


    /**

     * 创建消息监听容器

     * 这是 Spring Cloud AWS 的核心组件,负责从 SQS 拉取消息并分发给 @SqsListener 方法

     */

    @Bean

    @Conditional(SqsEnabledCondition.class)

    public SimpleMessageListenerContainer simpleMessageListenerContainer(

            AmazonSQSAsync amazonSQSAsync,

            QueueMessageHandler queueMessageHandler) {


        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

        container.setAmazonSqs(amazonSQSAsync);

        container.setMessageHandler(queueMessageHandler);

        container.setMaxNumberOfMessages(10);  // 每次最多拉取 10 条消息

        container.setWaitTimeout(20);           // 长轮询 20 秒

        return container;

    }


    /**

     * 创建消息处理器

     */

    @Bean

    @Conditional(SqsEnabledCondition.class)

    public QueueMessageHandler queueMessageHandler(AmazonSQSAsync amazonSQSAsync) {

        QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();

        factory.setAmazonSqs(amazonSQSAsync);

        return factory.createQueueMessageHandler();

    }

}
关键配置说明:

长轮询 vs 短轮询:设置 aitTimeout > 0即启用长轮询。SQS 会等待至有消息到达或超时才返回,比短轮询(立即返回空)节省大量请求费用。

4.5 S3 事件消息模型:S3EventNotification
@Data

public class S3EventNotification {


    @JsonProperty("Records")

    private List<S3EventRecord> records;


    @Data

    public static class S3EventRecord {

        @JsonProperty("eventVersion")

        private String eventVersion;


        @JsonProperty("eventSource")

        private String eventSource;


        @JsonProperty("awsRegion")

        private String awsRegion;


        @JsonProperty("eventTime")

        private String eventTime;


        @JsonProperty("eventName")

        private String eventName;


        @JsonProperty("s3")

        private S3Entity s3;

    }


    @Data

    public static class S3Entity {

        @JsonProperty("bucket")

        private S3Bucket bucket;


        @JsonProperty("object")

        private S3Object object;

    }


    @Data

    public static class S3Bucket {

        @JsonProperty("name")

        private String name;


        @JsonProperty("arn")

        private String arn;

    }


    @Data

    public static class S3Object {

        @JsonProperty("key")

        private String key;


        @JsonProperty("size")

        private Long size;


        @JsonProperty("eTag")

        private String eTag;


        @JsonProperty("sequencer")

        private String sequencer;

    }

}

注意:S3 事件通知的 JSON 字段使用 PascalCase(如 Records),而 Java 习惯camelCase,因此使用 @JsonProperty做映射。

4.6 核心监听器:ThumbnailListener

这是整个系统的入口组件,负责接收 SQS 消息、解析、过滤并分发:

@Slf4j

@Component

@Conditional(SqsEnabledCondition.class)

public class ThumbnailListener {


    @Resource

    private RedisIdService redisIdService;


    @Value("${aws.s3.thumbnail.enabled:true}")

    private boolean thumbnailEnabled;


    // 支持的图片格式

    private static final String[] IMAGE_EXTENSIONS = {

        ".jpg", ".jpeg", ".png", ".gif", ".bmp",

        ".webp", ".tiff", ".tif", ".jfif", ".ico"

    };


    // 支持的视频格式

    private static final String[] VIDEO_EXTENSIONS = {

        ".mp4", ".mov", ".avi", ".mkv", ".webm", ".flv", ".wmv",

        ".mpg", ".mpeg", ".m4v", ".3gp", ".3g2",

        ".ogv", ".vob", ".rm", ".rmvb", ".ts", ".mts", ".m2ts", ".f4v", ".qt"

    };


    @PostConstruct

    public void init() {

        log.info("ThumbnailListener 初始化成功, thumbnailEnabled={}", thumbnailEnabled);

        log.info("支持的图片格式({}个), 视频格式({}个)",

                IMAGE_EXTENSIONS.length, VIDEO_EXTENSIONS.length);

    }


    /**

     * 监听 SQS 消息

     * deletionPolicy = ON_SUCCESS:方法正常返回时自动删除消息,抛异常则不删除(可重试)

     */

    @SqsListener(

        value = "${aws.sqs.thumbnail.queue.url}",

        deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS

    )

    public void handleS3Event(String message) {

        try {

            // 【快速预检查】在 JSON 解析前,先做字符串级别的快速过滤

            if (message.contains("_thumbnail.")) {

                log.info("快速过滤:跳过缩略图消息");

                return;

            }


            // 解析 S3 事件通知

            S3EventNotification notification =

                JSON.parseObject(message, S3EventNotification.class);


            if (notification == null || notification.getRecords() == null) {

                log.info("无效的 S3 事件消息");

                return;

            }


            // 逐条处理事件记录

            for (S3EventNotification.S3EventRecord record : notification.getRecords()) {

                processS3Event(record);

            }


        } catch (Exception e) {

            log.error("处理 SQS 消息失败: {}", e.getMessage(), e);

            // 抛出异常 → 消息不被删除 → SQS 在可见性超时后重新投递

            throw new RuntimeException("处理失败", e);

        }

    }


    private void processS3Event(S3EventNotification.S3EventRecord record) {

        String eventName = record.getEventName();


        // ① 只处理对象创建事件

        if (!eventName.startsWith("ObjectCreated:")) {

            return;

        }


        S3EventNotification.S3Object s3Object = record.getS3().getObject();

        String bucketName = record.getS3().getBucket().getName();

        String objectKey = URLDecoder.decode(s3Object.getKey(), "UTF-8");


        // ② 检查功能开关

        if (!thumbnailEnabled) {

            log.info("缩略图生成已禁用,跳过: {}", objectKey);

            return;

        }


        // ③ 跳过缩略图文件(防循环)

        if (isThumbnailFile(objectKey)) {

            log.info("跳过缩略图文件: {}", objectKey);

            return;

        }


        // ④ 过滤超大文件(>500MB)

        Long fileSize = s3Object.getSize();

        if (fileSize != null && fileSize > 500 * 1024 * 1024L) {

            log.info("文件超过500MB限制,跳过: key={}", objectKey);

            return;

        }


        // ⑤ 判断文件类型

        String fileType = determineFileTypeByKey(objectKey);

        if (!"image".equals(fileType) && !"video".equals(fileType)) {

            return;

        }


        // ⑥ 构建事件并发送到内部异步队列

        ThumbnailEvent event = ThumbnailEvent.builder()

                .id(redisIdService.generate(ThumbnailEvent.class))

                .bucketName(bucketName)

                .objectKey(objectKey)

                .fileType(fileType)

                .fileSize(fileSize)

                .eventName(eventName)

                .eventTime(System.currentTimeMillis())

                .build();


        SourceEventQueueManager.add(event);

        log.info("已发送缩略图生成事件: key={}, type={}", objectKey, fileType);

    }


    // ... isThumbnailFile() 和 determineFileTypeByKey() 方法

}
@SqsListener 注解详解
@SqsListener(

    value = "${aws.sqs.thumbnail.queue.url}",   // 支持 SpEL 表达式读取配置

    deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS  // 删除策略

)
删除策略选项:

4.7 事件模型:ThumbnailEvent
@Data

@Builder

@NoArgsConstructor

@AllArgsConstructor

@Accessors(chain = true)

@SuishenQueue(

    type = SuishenQueueTypeEnum.DELAY,     // 延迟队列类型

    handler = ThumbnailEventHandler.class,  // 指定处理器

    groupCount = 3,                         // 3 个消费者组

    delayTime = 200                         // 200ms 延迟

)

public class ThumbnailEvent implements SourceEvent {

    private Long id;           // 事件 ID(Redis 生成,保证幂等性)

    private String bucketName; // S3 桶名

    private String objectKey;  // 文件路径

    private String fileType;   // image / video

    private Long fileSize;     // 文件大小(字节)

    private String eventName;  // ObjectCreated:Put 等

    private Long eventTime;    // 事件时间戳

}
设计亮点:
  • 使用 @SuishenQueue注解接入内部延迟队列框架,实现二级缓冲
  • groupCount = 3  配合 Semaphore(2)实现精细化并发控制
  • delayTime = 200ms  微延迟可有效聚合短时间内的批量上传事件
4.8 异步处理器:ThumbnailEventHandler
@Slf4j

@SuishenLog(logName = "缩略图生成")

@Service

public class ThumbnailEventHandler extends BaseSourceEventHandler<ThumbnailEvent> {


    /**

     * 并发控制信号量

     * 图像处理是内存密集型操作,限制同时处理数量防止 Native Memory OOM

     */

    private static final Semaphore IMAGE_PROCESS_SEMAPHORE = new Semaphore(2);

    private static final int SEMAPHORE_TIMEOUT_SECONDS = 60;


    @Value("${aws.s3.thumbnail.quality:0.3}")

    private double thumbnailQuality;


    @Override

    protected boolean doHandle(ThumbnailEvent event) {

        boolean acquired = false;

        try {

            // 获取信号量(带超时)

            acquired = IMAGE_PROCESS_SEMAPHORE.tryAcquire(

                SEMAPHORE_TIMEOUT_SECONDS, TimeUnit.SECONDS);


            if (!acquired) {

                log.warn("信号量获取超时,跳过处理: {}", event.getObjectKey());

                return true;

            }


            String fileUrl = "https://static.weryai.com/" + event.getObjectKey();

            int lastSlash = event.getObjectKey().lastIndexOf("/");

            String directory = event.getObjectKey().substring(0, lastSlash);


            if ("image".equalsIgnoreCase(event.getFileType())) {

                // 图片:调用远程服务生成 WebP 缩略图

                ImageCompressUtils.generateImageThumbnail(

                    fileUrl,

                    (int) Math.round(thumbnailQuality * 100),

                    directory

                );

            } else if ("video".equalsIgnoreCase(event.getFileType())) {

                // 视频:提取首帧并转为 WebP

                ImageCompressUtils.generateVideoFirstFrame(

                    fileUrl,

                    (int) Math.round(thumbnailQuality * 100),

                    directory

                );

            }


            return true;


        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();

            return true;

        } catch (Exception e) {

            log.error("缩略图生成失败: {}", event.getObjectKey(), e);

            return true; // 返回 true 避免无限重试

        } finally {

            if (acquired) {

                IMAGE_PROCESS_SEMAPHORE.release();

            }

        }

    }

}

五、生产环境关键设计

5.1 防循环触发:三层防护机制

这是本方案中最重要的安全设计。缩略图生成后会上传回 S3,如果不做处理,会再次触发事件,形成无限循环。

┌─────────────────────────────────────────────────────────────────┐

│                      防循环三层防护                               │

├─────────────┬───────────────────────────────────────────────────┤

│  第一层      │  AWS 层:S3 事件通知前缀/后缀过滤                   │

│  (最外层)    │  只监听 growth/original/ 目录,缩略图写入其他目录     │

├─────────────┼───────────────────────────────────────────────────┤

│  第二层      │  应用层(快速预检查):字符串匹配                     │

│  (中间层)    │  消息内容包含 "_thumbnail." 则直接跳过               │

├─────────────┼───────────────────────────────────────────────────┤

│  第三层      │  应用层(精确检查):文件名模式匹配                   │

│  (最内层)    │  isThumbnailFile() 方法多规则判断                   │

└─────────────┴───────────────────────────────────────────────────┘

isThumbnailFile()的具体实现:

private boolean isThumbnailFile(String objectKey) {

    if (StringUtils.isBlank(objectKey)) return false;


    String lowerKey = objectKey.toLowerCase();


    // 规则1: 包含 _thumbnail. 标识

    if (lowerKey.contains("_thumbnail.")

            || lowerKey.endsWith("_thumbnail.webp")) {

        return true;

    }


    // 规则2: 文件名以 _thumbnail 结尾且扩展名为 .webp

    if (lowerKey.endsWith(".webp")) {

        String fileName = objectKey.substring(objectKey.lastIndexOf('/') + 1);

        fileName = fileName.substring(0, fileName.lastIndexOf('.'));

        if (fileName.endsWith("_thumbnail")) {

            return true;

        }

    }


    return false;

}
5.2 并发控制:Semaphore 限流

图片/视频处理是资源密集型操作,BufferedImage的像素数据存储在堆外内存(Native Memory)中,不受 JVM 堆大小限制,容易导致 OOM。

// 4C8G 服务器推荐并发数为 2

private static final Semaphore IMAGE_PROCESS_SEMAPHORE = new Semaphore(2);  
为什么使用 Semaphore 而不是线程池大小控制?

并发数推荐:

5.3 消息删除策略与重试机制
消息处理成功 → 自动删除(ON_SUCCESS 策略)

消息处理失败(抛异常) → 不删除 → 可见性超时后重新可见 → 重试

消息处理失败(返回 true) → 自动删除 → 不重试
代码中的策略选择:
  • handleS3Event()

     方法中:未知异常抛出 → 消息重试

  • ThumbnailEventHandler.doHandle()中:处理失败返回 true  → 不重试(因为是已知的处理错误,重试大概率也会失败)
5.4 消息处理的幂等性

通过 Redis 生成唯一事件 ID:

.id(redisIdService.generate(ThumbnailEvent.class))

结合内部队列框架的去重机制,确保同一文件不会被处理两次。

六、监控、运维与故障排查

6.1 运行日志关键标记
# 查看监听器初始化

grep "ThumbnailListener 初始化成功" logs/application.log


# 查看消息接收情况

grep "检测到新文件上传" logs/application.log


# 查看处理结果

grep "缩略图生成成功\|缩略图生成失败" logs/application.log


# 查看信号量等待

grep "信号量" logs/application.log  
6.2 AWS CloudWatch 监控指标

6.3 常见问题排查表

7.3 进阶优化建议

八、总结与最佳实践

核心架构回顾
S3 文件上传 → S3 事件通知 → SQS 队列 → @SqsListener 监听

→ 多层过滤(防循环/大小/类型)→ 内部延迟队列 → Semaphore 限流

→ 缩略图生成 → 回传 S3
最佳实践清单


1.使用 SQS 标准队列 而非 FIFO 队列 — 缩略图生成不需要严格顺序,标准队列吞吐量更高且更便宜


2.启用长轮询 (waitTimeout=20)— 节省 API 费用,降低空请求


3.配置消息删除策略为  ON_SUCCESS— 处理成功才删除,失败可重试


4.三层防循环机制 — AWS 层 + 快速预检 + 精确过滤,确保万无一失


5.Semaphore 并发控制 — 防止图像处理导致 Native Memory OOM


6.二级队列缓冲  — SQS → 内部延迟队列,平滑突发流量


7.幂等性设计 — Redis 唯一 ID + 队列去重,防止重复处理


8.合理的可见性超时 — 设置为处理时间的 2-3 倍


9.监控告警 — CloudWatch 监控队列深度和消息年龄


10.环境差异化配置 — 通过 SqsEnabledCondition 实现开发/测试/生产不同策略  

扩展思路

本架构不仅适用于缩略图生成,同样适用于以下场景:

  • 文件内容审核(接入 AWS Rekognition / 第三方审核 API)

  • 文档格式转换(PDF → 图片、Office → PDF 等)

  • 元数据提取(EXIF 信息、视频时长、分辨率等)

  • 全文索引(文档内容提取后写入 Elasticsearch)

  • CDN 预热(新文件上传后自动推送到 CDN 节点) 只需新增对应的 EventHandler,复用同一套 SQS 监听基础设施即可,真正实现一次配置,无限扩展。

作者介绍:

  • 贺浪 高级服务端开发工程师

微鲤技术团队

微鲤技术团队承担了中华万年历、Maybe、蘑菇语音、微鲤游戏高达3亿用户的产品研发工作,并构建了完备的大数据平台、基础研发框架、基础运维设施。践行数据驱动理念,相信技术改变世界。