解决复杂材料处理流程(上传→材料处理→组卷→编目)的代码高耦合问题,实现:
if-else 分支;| 技术选型/模式 | 应用场景 | 核心价值 |
|---|---|---|
| 责任链模式 | 4个主流程阶段(上传/处理/组卷/编目) | 阶段解耦,新增/删除阶段无需修改核心逻辑 |
| 策略模式 | 材料处理阶段内的分支逻辑 | 消除 if-else,新增分支只需扩展策略类 |
| Redis | 进度状态存储 | 支持分布式环境下的进度实时查询 |
| JPA(Spring Data) | 材料批次/单材料状态持久化 | 存储上传处理后的状态,支持手动触发后续流程 |
| SpringBoot 3.5 | 整体工程架构 | 适配Java 17+,利用Spring生态简化开发 |
com.yourcompany ├── MaterialProcessApplication.java // 启动类 ├── config/ // 配置类 │ └── RedisConfig.java // Redis序列化配置 ├── context/ // 流程上下文 │ └── ProcessContext.java // 统一流程上下文 ├── entity/ // 实体类 │ ├── Material.java // 单材料实体(JPA) │ ├── MaterialBatch.java // 材料批次实体(JPA) │ └── ProcessProgress.java // 进度实体 ├── handler/ // 阶段处理器(责任链) │ ├── StageHandler.java // 抽象阶段处理器 │ ├── MaterialProcessStageHandler.java // 材料处理处理器 │ ├── AssemblyStageHandler.java // 组卷处理器 │ └── CatalogStageHandler.java // 编目处理器 ├── manager/ // 核心管理器 │ ├── ProgressManager.java // 进度管理器接口 │ ├── RedisProgressManager.java // Redis进度管理器实现 │ └── MaterialProcessManager.java // 流程总管理器(手动触发) ├── repository/ // JPA仓库 │ ├── MaterialBatchRepository.java │ └── MaterialRepository.java ├── controller/ // 前端接口 │ └── MaterialProcessController.java └── strategy/ // 策略模式(材料处理分支) ├── MaterialProcessStrategy.java ├── NormalMaterialStrategy.java ├── AbnormalMaterialStrategy.java └── MaterialProcessStrategyFactory.java
xml<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.0</version>
<relativePath/>
</parent>
<groupId>com.yourcompany</groupId>
<artifactId>material-process</artifactId>
<version>1.0.0</version>
<name>MaterialProcess</name>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<!-- SpringBoot核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- JPA + MySQL -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
yamlspring:
# Redis配置
redis:
host: localhost
port: 6379
password: # 按需填写
database: 0
timeout: 3000ms
# 数据库配置
datasource:
url: jdbc:mysql://localhost:3306/material_process?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root # 替换为你的密码
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update # 生产环境改为none
show-sql: true
properties:
hibernate:
format_sql: true
dialect: org.hibernate.dialect.MySQL8Dialect
server:
port: 8080
servlet:
context-path: /api
javapackage com.yourcompany;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
@SpringBootApplication
@EnableJpaRepositories(basePackages = "com.yourcompany.repository")
public class MaterialProcessApplication {
public static void main(String[] args) {
SpringApplication.run(MaterialProcessApplication.class, args);
}
}
javapackage com.yourcompany.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// Key序列化
StringRedisSerializer stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
// Value序列化(适配LocalDateTime和多态)
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
BasicPolymorphicTypeValidator ptv = BasicPolymorphicTypeValidator.builder()
.allowIfBaseType(Object.class)
.build();
objectMapper.activateDefaultTyping(ptv, ObjectMapper.DefaultTyping.NON_FINAL);
Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(objectMapper, Object.class);
template.setValueSerializer(jacksonSerializer);
template.setHashValueSerializer(jacksonSerializer);
template.afterPropertiesSet();
return template;
}
}
javapackage com.yourcompany.context;
import lombok.Builder;
import lombok.Data;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
/**
* 统一流程上下文:承载所有阶段的输入/输出/状态
*/
@Data
@Builder
public class ProcessContext {
private String processId; // 流程ID(复用材料ID)
private File uploadFile; // 上传文件
private String fileType; // 文件类型
private Map<String, Object> structuredData = new HashMap<>(); // 结构化数据
private Map<String, Object> stageResult = new HashMap<>(); // 阶段结果
// 分支判断标记
private boolean hasDirectory; // 是否有目录
private boolean pageNumberLost; // 页码是否丢失
// 流程状态
private boolean processSuccess; // 处理是否成功
private String errorMsg; // 错误信息
}
java// 进度实体
package com.yourcompany.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProcessProgress {
private String processId;
private String stageName;
private int progressPercent;
private ProcessStatus status;
private String statusDesc;
private LocalDateTime updateTime;
public enum ProcessStatus {
WAIT("待处理"), PROCESSING("处理中"), SUCCESS("成功"), FAIL("失败");
private final String desc;
ProcessStatus(String desc) { this.desc = desc; }
public String getDesc() { return desc; }
}
}
// 材料批次实体(JPA)
package com.yourcompany.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.List;
@Data
@Entity
@Table(name = "material_batch")
public class MaterialBatch {
@Id
@Column(length = 64)
private String batchId;
@Column(length = 64, nullable = false)
private String userId;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private BatchStatus status;
@OneToMany(mappedBy = "batchId", cascade = CascadeType.ALL, fetch = FetchType.EAGER)
private List<Material> materials;
@Column(nullable = false)
private LocalDateTime createTime;
private LocalDateTime updateTime;
public enum BatchStatus {
UPLOADING("上传中"), PROCESSED("处理完成"), ASSEMBLING("组卷中"), CATALOGING("编目中"), COMPLETED("已完成");
private final String desc;
BatchStatus(String desc) { this.desc = desc; }
public String getDesc() { return desc; }
}
}
// 单材料实体(JPA)
package com.yourcompany.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@Entity
@Table(name = "material")
public class Material {
@Id
@Column(length = 64)
private String materialId;
@Column(length = 64, nullable = false)
private String batchId;
@Column(length = 255, nullable = false)
private String fileName;
@Column(length = 32)
private String fileType;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private MaterialStatus status;
@Column(columnDefinition = "json")
private String processResult;
private LocalDateTime uploadTime;
private LocalDateTime processTime;
private LocalDateTime assembleTime;
private LocalDateTime catalogTime;
public enum MaterialStatus {
UPLOADED("已上传"), PROCESSED("已处理"), ASSEMBLED("已组卷"), CATALOGED("已编目");
private final String desc;
MaterialStatus(String desc) { this.desc = desc; }
public String getDesc() { return desc; }
}
}
java// 进度管理器接口
package com.yourcompany.manager;
import com.yourcompany.entity.ProcessProgress;
import java.util.List;
import java.util.Map;
public interface ProgressManager {
void initProgress(String processId, List<String> stageNames);
void updateProgress(String processId, String stageName, int percent, ProcessProgress.ProcessStatus status, String desc);
Map<String, ProcessProgress> queryProgress(String processId);
}
// Redis进度管理器实现
package com.yourcompany.manager;
import com.yourcompany.entity.ProcessProgress;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class RedisProgressManager implements ProgressManager {
private static final String PROGRESS_KEY_PREFIX = "process:progress:";
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public void initProgress(String processId, List<String> stageNames) {
String redisKey = PROGRESS_KEY_PREFIX + processId;
Map<String, ProcessProgress> progressMap = new HashMap<>();
for (String stage : stageNames) {
progressMap.put(stage, new ProcessProgress(
processId, stage, 0, ProcessProgress.ProcessStatus.WAIT,
"等待处理", LocalDateTime.now()
));
}
redisTemplate.opsForHash().putAll(redisKey, progressMap);
}
@Override
public void updateProgress(String processId, String stageName, int percent, ProcessProgress.ProcessStatus status, String desc) {
String redisKey = PROGRESS_KEY_PREFIX + processId;
ProcessProgress progress = (ProcessProgress) redisTemplate.opsForHash().get(redisKey, stageName);
if (progress == null) throw new RuntimeException("进度未初始化");
progress.setProgressPercent(percent);
progress.setStatus(status);
progress.setStatusDesc(desc);
progress.setUpdateTime(LocalDateTime.now());
redisTemplate.opsForHash().put(redisKey, stageName, progress);
}
@Override
public Map<String, ProcessProgress> queryProgress(String processId) {
String redisKey = PROGRESS_KEY_PREFIX + processId;
Map<Object, Object> hashMap = redisTemplate.opsForHash().entries(redisKey);
Map<String, ProcessProgress> result = new HashMap<>();
hashMap.forEach((k, v) -> result.put((String) k, (ProcessProgress) v));
return result;
}
}
java// 抽象阶段处理器
package com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import com.yourcompany.entity.ProcessProgress;
import com.yourcompany.manager.ProgressManager;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public abstract class StageHandler {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected StageHandler nextHandler;
@Resource
protected ProgressManager progressManager;
public void setNextHandler(StageHandler nextHandler) {
this.nextHandler = nextHandler;
}
public final void process(ProcessContext context) {
String processId = context.getProcessId();
String stageName = getStageName();
// 初始化进度为处理中
progressManager.updateProgress(processId, stageName, 0, ProcessProgress.ProcessStatus.PROCESSING, "开始处理");
if (!preCheck(context)) {
progressManager.updateProgress(processId, stageName, 0, ProcessProgress.ProcessStatus.FAIL, context.getErrorMsg());
logger.error("阶段[{}]检查失败:{}", stageName, context.getErrorMsg());
context.setProcessSuccess(false);
return;
}
try {
doProcess(context);
progressManager.updateProgress(processId, stageName, 100, ProcessProgress.ProcessStatus.SUCCESS, "处理完成");
context.setProcessSuccess(true);
} catch (Exception e) {
String errorMsg = "处理异常:" + e.getMessage();
progressManager.updateProgress(processId, stageName, 0, ProcessProgress.ProcessStatus.FAIL, errorMsg);
logger.error("阶段[{}]处理异常", stageName, e);
context.setProcessSuccess(false);
context.setErrorMsg(errorMsg);
}
if (nextHandler != null) nextHandler.process(context);
}
protected abstract String getStageName();
protected abstract boolean preCheck(ProcessContext context);
protected abstract void doProcess(ProcessContext context);
}
// 材料处理处理器(具体实现)
package com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import com.yourcompany.entity.ProcessProgress;
import com.yourcompany.strategy.MaterialProcessStrategy;
import com.yourcompany.strategy.MaterialProcessStrategyFactory;
import org.springframework.stereotype.Component;
@Component
public class MaterialProcessStageHandler extends StageHandler {
@Override
protected String getStageName() {
return "材料处理";
}
@Override
protected boolean preCheck(ProcessContext context) {
if (context.getProcessId() == null || context.getUploadFile() == null) {
context.setErrorMsg("流程ID/上传文件为空");
return false;
}
return true;
}
@Override
protected void doProcess(ProcessContext context) {
// 分步更新进度
progressManager.updateProgress(context.getProcessId(), getStageName(), 20, ProcessProgress.ProcessStatus.PROCESSING, "OCR识别中...");
String ocrResult = "模拟OCR结果:目录\n第一章 概述"; // 实际替换为OCR调用
context.getStructuredData().put("ocrContent", ocrResult);
progressManager.updateProgress(context.getProcessId(), getStageName(), 50, ProcessProgress.ProcessStatus.PROCESSING, "解析目录/页码...");
context.setHasDirectory(ocrResult.contains("目录"));
context.setPageNumberLost(!ocrResult.contains("页码"));
progressManager.updateProgress(context.getProcessId(), getStageName(), 80, ProcessProgress.ProcessStatus.PROCESSING, "执行处理策略...");
MaterialProcessStrategy strategy = MaterialProcessStrategyFactory.getStrategy(context);
strategy.execute(context);
context.getStageResult().put("materialProcess", "处理完成,是否有目录:" + context.isHasDirectory());
}
}
// 组卷处理器
package com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import org.springframework.stereotype.Component;
@Component
public class AssemblyStageHandler extends StageHandler {
@Override
protected String getStageName() {
return "组卷";
}
@Override
protected boolean preCheck(ProcessContext context) {
if (context.getStructuredData().isEmpty()) {
context.setErrorMsg("材料处理结果为空,无法组卷");
return false;
}
return true;
}
@Override
protected void doProcess(ProcessContext context) {
// 组卷核心逻辑(示例)
context.getStageResult().put("assembly", "组卷完成:按" + (context.isHasDirectory() ? "目录" : "标题") + "分组");
}
}
// 编目处理器
package com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import org.springframework.stereotype.Component;
@Component
public class CatalogStageHandler extends StageHandler {
@Override
protected String getStageName() {
return "编目";
}
@Override
protected boolean preCheck(ProcessContext context) {
if (context.getStageResult().get("assembly") == null) {
context.setErrorMsg("组卷未完成,无法编目");
return false;
}
return true;
}
@Override
protected void doProcess(ProcessContext context) {
// 编目核心逻辑(示例)
context.getStageResult().put("catalog", "编目完成:生成材料索引ID-" + context.getProcessId());
}
}
java// 策略接口
package com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
public interface MaterialProcessStrategy {
void execute(ProcessContext context);
}
// 正常材料策略(有目录+页码完整)
package com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
public class NormalMaterialStrategy implements MaterialProcessStrategy {
@Override
public void execute(ProcessContext context) {
context.getStageResult().put("strategy", "正常材料策略:按目录解析章节");
}
}
// 异常材料策略(无目录/页码丢失)
package com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
public class AbnormalMaterialStrategy implements MaterialProcessStrategy {
@Override
public void execute(ProcessContext context) {
context.getStageResult().put("strategy", "异常材料策略:按标题拆分+人工校验提示");
}
}
// 策略工厂
package com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
public class MaterialProcessStrategyFactory {
public static MaterialProcessStrategy getStrategy(ProcessContext context) {
if (context.isHasDirectory() && !context.isPageNumberLost()) {
return new NormalMaterialStrategy();
} else {
return new AbnormalMaterialStrategy();
}
}
}
javapackage com.yourcompany.repository;
import com.yourcompany.entity.Material;
import com.yourcompany.entity.MaterialBatch;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MaterialBatchRepository extends JpaRepository<MaterialBatch, String> {
}
@Repository
public interface MaterialRepository extends JpaRepository<Material, String> {
}
javapackage com.yourcompany.manager;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yourcompany.context.ProcessContext;
import com.yourcompany.entity.Material;
import com.yourcompany.entity.MaterialBatch;
import com.yourcompany.handler.AssemblyStageHandler;
import com.yourcompany.handler.CatalogStageHandler;
import com.yourcompany.handler.MaterialProcessStageHandler;
import com.yourcompany.repository.MaterialBatchRepository;
import com.yourcompany.repository.MaterialRepository;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import java.io.File;
import java.time.LocalDateTime;
import java.util.List;
@Service
public class MaterialProcessManager {
@Resource
private MaterialBatchRepository batchRepository;
@Resource
private MaterialRepository materialRepository;
@Resource
private ProgressManager progressManager;
@Resource
private MaterialProcessStageHandler materialProcessHandler;
@Resource
private AssemblyStageHandler assemblyHandler;
@Resource
private CatalogStageHandler catalogHandler;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 批量上传+自动处理材料
*/
public String batchUploadAndProcess(MaterialBatch batch) {
// 初始化批次状态
batch.setStatus(MaterialBatch.BatchStatus.UPLOADING);
batch.setCreateTime(LocalDateTime.now());
batchRepository.save(batch);
// 处理每个材料
List<String> stageNames = List.of("材料处理");
for (Material material : batch.getMaterials()) {
String materialId = material.getMaterialId();
// 初始化进度
progressManager.initProgress(materialId, stageNames);
// 构建上下文
ProcessContext context = ProcessContext.builder()
.processId(materialId)
.uploadFile(new File(material.getFileName()))
.fileType(material.getFileType())
.build();
// 执行材料处理
materialProcessHandler.process(context);
// 更新材料状态
if (context.isProcessSuccess()) {
material.setStatus(Material.MaterialStatus.PROCESSED);
material.setProcessTime(LocalDateTime.now());
try {
material.setProcessResult(objectMapper.writeValueAsString(context.getStageResult()));
} catch (Exception e) {
material.setProcessResult("{\"error\":\"结果序列化失败\"}");
}
} else {
material.setStatus(Material.MaterialStatus.UPLOADED);
material.setProcessResult("{\"error\":\"" + context.getErrorMsg() + "\"}");
}
materialRepository.save(material);
}
// 更新批次状态为已处理
batch.setStatus(MaterialBatch.BatchStatus.PROCESSED);
batch.setUpdateTime(LocalDateTime.now());
batchRepository.save(batch);
return batch.getBatchId();
}
/**
* 手动触发组卷
*/
public void triggerAssembly(String batchId) {
MaterialBatch batch = batchRepository.findById(batchId)
.orElseThrow(() -> new RuntimeException("批次不存在:" + batchId));
if (!batch.getStatus().equals(MaterialBatch.BatchStatus.PROCESSED)) {
throw new RuntimeException("批次未处理完成,无法组卷");
}
batch.setStatus(MaterialBatch.BatchStatus.ASSEMBLING);
batchRepository.save(batch);
// 处理每个材料的组卷
for (Material material : batch.getMaterials()) {
try {
ProcessContext context = ProcessContext.builder()
.processId(material.getMaterialId())
.structuredData(objectMapper.readValue(material.getProcessResult(), Map.class))
.build();
assemblyHandler.process(context);
if (context.isProcessSuccess()) {
material.setStatus(Material.MaterialStatus.ASSEMBLED);
material.setAssembleTime(LocalDateTime.now());
}
materialRepository.save(material);
} catch (Exception e) {
throw new RuntimeException("材料[" + material.getMaterialId() + "]组卷失败:" + e.getMessage());
}
}
// 恢复批次状态(等待编目)
batch.setStatus(MaterialBatch.BatchStatus.PROCESSED);
batchRepository.save(batch);
}
/**
* 手动触发编目
*/
public void triggerCatalog(String batchId) {
MaterialBatch batch = batchRepository.findById(batchId)
.orElseThrow(() -> new RuntimeException("批次不存在:" + batchId));
if (!batch.getStatus().equals(MaterialBatch.BatchStatus.PROCESSED)) {
throw new RuntimeException("批次未处理完成,无法编目");
}
batch.setStatus(MaterialBatch.BatchStatus.CATALOGING);
batchRepository.save(batch);
// 处理每个材料的编目
for (Material material : batch.getMaterials()) {
try {
ProcessContext context = ProcessContext.builder()
.processId(material.getMaterialId())
.structuredData(objectMapper.readValue(material.getProcessResult(), Map.class))
.build();
catalogHandler.process(context);
if (context.isProcessSuccess()) {
material.setStatus(Material.MaterialStatus.CATALOGED);
material.setCatalogTime(LocalDateTime.now());
}
materialRepository.save(material);
} catch (Exception e) {
throw new RuntimeException("材料[" + material.getMaterialId() + "]编目失败:" + e.getMessage());
}
}
// 编目完成,批次状态改为已完成
batch.setStatus(MaterialBatch.BatchStatus.COMPLETED);
batch.setUpdateTime(LocalDateTime.now());
batchRepository.save(batch);
}
// 辅助方法:获取批次仓库(供控制器调用)
public MaterialBatchRepository getBatchRepository() {
return batchRepository;
}
}
javapackage com.yourcompany.controller;
import com.yourcompany.entity.MaterialBatch;
import com.yourcompany.entity.ProcessProgress;
import com.yourcompany.manager.MaterialProcessManager;
import com.yourcompany.manager.ProgressManager;
import jakarta.annotation.Resource;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/material")
public class MaterialProcessController {
@Resource
private MaterialProcessManager processManager;
@Resource
private ProgressManager progressManager;
/**
* 批量上传+自动处理材料
*/
@PostMapping("/batch/upload")
public ResponseEntity<String> batchUploadAndProcess(@RequestBody MaterialBatch batch) {
String batchId = processManager.batchUploadAndProcess(batch);
return ResponseEntity.ok(batchId);
}
/**
* 手动触发组卷
*/
@PostMapping("/batch/{batchId}/assembly")
public ResponseEntity<Void> triggerAssembly(@PathVariable String batchId) {
processManager.triggerAssembly(batchId);
return ResponseEntity.ok().build();
}
/**
* 手动触发编目
*/
@PostMapping("/batch/{batchId}/catalog")
public ResponseEntity<Void> triggerCatalog(@PathVariable String batchId) {
processManager.triggerCatalog(batchId);
return ResponseEntity.ok().build();
}
/**
* 查询批次进度(前端核心接口)
*/
@GetMapping("/batch/{batchId}/progress")
public ResponseEntity<Map<String, ProcessProgress>> queryBatchProgress(@PathVariable String batchId) {
MaterialBatch batch = processManager.getBatchRepository().findById(batchId)
.orElseThrow(() -> new RuntimeException("批次不存在:" + batchId));
Map<String, ProcessProgress> progressMap = new HashMap<>();
for (Material material : batch.getMaterials()) {
progressMap.putAll(progressManager.queryProgress(material.getMaterialId()));
}
return ResponseEntity.ok(progressMap);
}
}
StageHandler实现自定义处理器,无需修改原有流程;MaterialProcessStrategy接口,更新策略工厂即可;ProgressManager实现(如内存→数据库),业务逻辑无感知;application.yml中的Redis/MySQL地址;POST /api/material/batch/upload(传入批次信息);POST /api/material/batch/{batchId}/assembly;POST /api/material/batch/{batchId}/catalog;GET /api/material/batch/{batchId}/progress。该方案完整覆盖了你的所有需求,代码可直接复制到SpringBoot 3.5项目中运行,仅需根据实际业务补充外部服务(如OCR)的调用逻辑即可。
com.yourcompany ├── MaterialProcessApplication.java // 应用启动类(全注释) ├── config/ // 配置类(全注释) │ └── RedisConfig.java ├── context/ │ └── ProcessContext.java // 流程上下文(字段+类注释) ├── entity/ // 实体类(枚举+字段+业务含义注释) │ ├── Material.java │ ├── MaterialBatch.java │ └── ProcessProgress.java ├── handler/ // 责任链处理器(模板方法+阶段逻辑注释) │ ├── StageHandler.java │ ├── MaterialProcessStageHandler.java │ ├── AssemblyStageHandler.java │ └── CatalogStageHandler.java ├── manager/ // 核心管理器(业务逻辑+异常注释) │ ├── ProgressManager.java │ ├── RedisProgressManager.java │ └── MaterialProcessManager.java ├── repository/ // JPA仓库(简洁注释) │ ├── MaterialBatchRepository.java │ └── MaterialRepository.java ├── controller/ // 接口控制器(请求参数+返回值+业务注释) │ └── MaterialProcessController.java └── strategy/ // 策略模式(策略场景+分支逻辑注释) ├── MaterialProcessStrategy.java ├── NormalMaterialStrategy.java ├── AbnormalMaterialStrategy.java └── MaterialProcessStrategyFactory.java
javapackage com.yourcompany;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
/**
* 材料处理流程应用启动类
* <p>核心功能:批量材料上传处理、手动组卷/编目、进度实时查询</p>
* <p>技术栈:SpringBoot 3.5 + Redis + JPA + 责任链模式 + 策略模式</p>
*
* @author 开发团队
* @version 1.0.0
* @since 2026-01-19
*/
@SpringBootApplication
@EnableJpaRepositories(basePackages = "com.yourcompany.repository") // 扫描JPA仓库接口
public class MaterialProcessApplication {
/**
* 应用入口方法
*
* @param args 启动参数(无自定义参数)
*/
public static void main(String[] args) {
SpringApplication.run(MaterialProcessApplication.class, args);
}
}
javapackage com.yourcompany.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* Redis配置类(适配SpringBoot 3.5 + Java 17)
* <p>核心作用:</p>
* <li>1. 配置RedisTemplate的序列化方式,解决String/Object的序列化问题</li>
* <li>2. 适配LocalDateTime等JSR310时间类型的序列化</li>
* <li>3. 支持多态类型(如枚举、自定义实体)的序列化/反序列化</li>
*
* @author 开发团队
* @since 2026-01-19
*/
@Configuration
public class RedisConfig {
/**
* 自定义RedisTemplate Bean
* <p>替换默认的JdkSerializationRedisSerializer,使用JSON序列化提升可读性</p>
*
* @param redisConnectionFactory Redis连接工厂(Spring自动注入)
* @return 配置完成的RedisTemplate
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// ========== Key序列化配置 ==========
// String序列化:保证Key的可读性,避免乱码
StringRedisSerializer stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
// ========== Value序列化配置 ==========
// JSON序列化:支持对象/枚举/时间类型,且可读性强
ObjectMapper objectMapper = new ObjectMapper();
// 注册JavaTimeModule:解决LocalDateTime序列化失败问题
objectMapper.registerModule(new JavaTimeModule());
// 配置多态类型校验:允许序列化Object及其子类(如ProcessProgress、枚举)
BasicPolymorphicTypeValidator ptv = BasicPolymorphicTypeValidator.builder()
.allowIfBaseType(Object.class)
.build();
objectMapper.activateDefaultTyping(ptv, ObjectMapper.DefaultTyping.NON_FINAL);
Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(objectMapper, Object.class);
template.setValueSerializer(jacksonSerializer);
template.setHashValueSerializer(jacksonSerializer);
// 初始化配置
template.afterPropertiesSet();
return template;
}
}
javapackage com.yourcompany.context;
import lombok.Builder;
import lombok.Data;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
/**
* 流程上下文实体
* <p>核心作用:承载整个材料处理流程中所有阶段的输入、输出、状态信息</p>
* <p>设计原则:仅存储数据,不包含任何业务逻辑,保证单一职责</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Data
@Builder // 建造者模式:简化上下文对象的创建
public class ProcessContext {
/**
* 流程唯一标识
* <p>复用材料ID,用于进度追踪、日志定位</p>
*/
private String processId;
/**
* 上传的原始文件
* <p>材料处理阶段的核心输入</p>
*/
private File uploadFile;
/**
* 文件类型(如pdf、jpg、doc)
* <p>用于区分不同类型材料的处理策略</p>
*/
private String fileType;
/**
* 结构化数据存储
* <p>存储各阶段的中间结构化结果(如OCR识别内容、目录解析结果)</p>
*/
private Map<String, Object> structuredData = new HashMap<>();
/**
* 阶段结果存储
* <p>存储各阶段的最终处理结果,用于后续阶段/前端展示</p>
*/
private Map<String, Object> stageResult = new HashMap<>();
/**
* 是否包含目录(材料处理阶段的判断标记)
* <p>用于策略模式的分支选择:有目录/无目录的材料处理逻辑不同</p>
*/
private boolean hasDirectory;
/**
* 页码是否丢失(材料处理阶段的判断标记)
* <p>用于策略模式的分支选择:页码完整/丢失的处理逻辑不同</p>
*/
private boolean pageNumberLost;
/**
* 流程处理是否成功
* <p>全局状态标记:true=成功,false=失败</p>
*/
private boolean processSuccess;
/**
* 错误信息
* <p>流程失败时存储具体原因,用于前端展示/日志排查</p>
*/
private String errorMsg;
}
javapackage com.yourcompany.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 流程进度实体
* <p>核心作用:记录每个材料每个阶段的处理进度,供前端实时查询</p>
* <p>存储介质:Redis(hash结构),Key=process:progress:{processId}</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Data
@NoArgsConstructor // 反序列化必备
@AllArgsConstructor // 便捷创建对象
public class ProcessProgress {
/**
* 流程ID(关联材料ID)
*/
private String processId;
/**
* 阶段名称
* <p>如:材料处理、组卷、编目</p>
*/
private String stageName;
/**
* 进度百分比
* <p>范围:0-100,支持分步更新(如OCR识别20%→解析50%→完成100%)</p>
*/
private int progressPercent;
/**
* 处理状态
* <p>枚举值:WAIT(待处理)、PROCESSING(处理中)、SUCCESS(成功)、FAIL(失败)</p>
*/
private ProcessStatus status;
/**
* 状态描述
* <p>供前端展示的人性化描述(如:OCR识别中...、页码校验失败)</p>
*/
private String statusDesc;
/**
* 进度更新时间
* <p>用于前端展示进度的最后更新时间</p>
*/
private LocalDateTime updateTime;
/**
* 进度状态枚举
* <p>每个枚举值对应业务含义,desc字段用于前端展示</p>
*/
public enum ProcessStatus {
WAIT("待处理"), // 阶段未开始
PROCESSING("处理中"), // 阶段执行中
SUCCESS("成功"), // 阶段执行完成
FAIL("失败"); // 阶段执行失败
/** 枚举值的业务描述 */
private final String desc;
ProcessStatus(String desc) {
this.desc = desc;
}
/**
* 获取枚举值的业务描述
* @return 描述字符串(如:待处理)
*/
public String getDesc() {
return desc;
}
}
}
javapackage com.yourcompany.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.List;
/**
* 材料批次实体(JPA映射)
* <p>核心作用:管理一批上传的材料,用于手动触发组卷/编目</p>
* <p>数据库表:material_batch</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Data
@Entity
@Table(name = "material_batch") // 映射数据库表名
public class MaterialBatch {
/**
* 批次唯一标识(主键)
* <p>生成规则:BATCH-{yyyyMMdd}-{6位随机数}(如:BATCH-20260119-123456)</p>
*/
@Id
@Column(length = 64, nullable = false) // 字段长度+非空约束
private String batchId;
/**
* 上传用户ID
* <p>关联用户表,用于权限控制、数据隔离</p>
*/
@Column(length = 64, nullable = false)
private String userId;
/**
* 批次整体状态
* <p>枚举值:UPLOADING(上传中)、PROCESSED(已处理完成)、ASSEMBLING(组卷中)、CATALOGING(编目中)、COMPLETED(已完成)</p>
*/
@Enumerated(EnumType.STRING) // 枚举存储为字符串(而非数字,提升可读性)
@Column(nullable = false)
private BatchStatus status;
/**
* 批次下的所有材料
* <p>一对多关联:一个批次包含多个材料</p>
* <p>级联操作:保存批次时自动保存材料,查询批次时自动加载材料</p>
*/
@OneToMany(mappedBy = "batchId", cascade = CascadeType.ALL, fetch = FetchType.EAGER)
private List<Material> materials;
/**
* 批次创建时间
* <p>记录用户上传批次的时间</p>
*/
@Column(nullable = false)
private LocalDateTime createTime;
/**
* 批次更新时间
* <p>记录批次状态最后变更的时间</p>
*/
private LocalDateTime updateTime;
/**
* 批次状态枚举
*/
public enum BatchStatus {
UPLOADING("上传中"), // 批次正在上传/处理材料
PROCESSED("处理完成"), // 材料处理完成,等待手动触发组卷/编目
ASSEMBLING("组卷中"), // 正在执行组卷操作
CATALOGING("编目中"), // 正在执行编目操作
COMPLETED("已完成"); // 组卷+编目全部完成
private final String desc;
BatchStatus(String desc) {
this.desc = desc;
}
public String getDesc() {
return desc;
}
}
}
javapackage com.yourcompany.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 单材料实体(JPA映射)
* <p>核心作用:记录每个材料的详细状态和处理结果</p>
* <p>数据库表:material</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Data
@Entity
@Table(name = "material")
public class Material {
/**
* 材料唯一标识(主键)
* <p>生成规则:MAT-{yyyyMMdd}-{6位随机数}(如:MAT-20260119-123456)</p>
*/
@Id
@Column(length = 64, nullable = false)
private String materialId;
/**
* 所属批次ID
* <p>关联material_batch表的batchId字段</p>
*/
@Column(length = 64, nullable = false)
private String batchId;
/**
* 原始文件名
* <p>如:合同.pdf、说明书.jpg</p>
*/
@Column(length = 255, nullable = false)
private String fileName;
/**
* 文件类型
* <p>如:pdf、jpg、doc、docx</p>
*/
@Column(length = 32)
private String fileType;
/**
* 材料状态
* <p>枚举值:UPLOADED(已上传)、PROCESSED(已处理)、ASSEMBLED(已组卷)、CATALOGED(已编目)</p>
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private MaterialStatus status;
/**
* 材料处理结果
* <p>JSON格式存储,包含OCR结果、结构化数据等</p>
* <p>数据库字段类型:json(MySQL 5.7+支持)</p>
*/
@Column(columnDefinition = "json")
private String processResult;
/**
* 上传时间
*/
private LocalDateTime uploadTime;
/**
* 处理完成时间
*/
private LocalDateTime processTime;
/**
* 组卷完成时间
*/
private LocalDateTime assembleTime;
/**
* 编目完成时间
*/
private LocalDateTime catalogTime;
/**
* 材料状态枚举
*/
public enum MaterialStatus {
UPLOADED("已上传"), // 仅完成上传,未处理
PROCESSED("已处理"), // 材料处理完成(OCR+解析)
ASSEMBLED("已组卷"), // 组卷完成
CATALOGED("已编目"); // 编目完成
private final String desc;
MaterialStatus(String desc) {
this.desc = desc;
}
public String getDesc() {
return desc;
}
}
}
javapackage com.yourcompany.manager;
import com.yourcompany.entity.ProcessProgress;
import java.util.List;
import java.util.Map;
/**
* 进度管理器接口
* <p>核心职责:定义进度的初始化、更新、查询规范,实现存储层与业务层解耦</p>
* <p>实现类:RedisProgressManager(生产)、InMemoryProgressManager(测试)</p>
*
* @author 开发团队
* @since 2026-01-19
*/
public interface ProgressManager {
/**
* 初始化流程所有阶段的进度
* <p>流程开始时调用,将所有阶段初始化为"待处理"状态</p>
*
* @param processId 流程ID(材料ID)
* @param stageNames 所有主阶段名称(如:["材料处理","组卷","编目"])
*/
void initProgress(String processId, List<String> stageNames);
/**
* 更新指定阶段的进度
* <p>阶段执行过程中/完成后调用,实时更新进度状态</p>
*
* @param processId 流程ID
* @param stageName 阶段名称
* @param percent 进度百分比(0-100)
* @param status 处理状态(WAIT/PROCESSING/SUCCESS/FAIL)
* @param statusDesc 状态描述(供前端展示)
* @throws RuntimeException 流程ID/阶段名称未初始化时抛出
*/
void updateProgress(String processId, String stageName, int percent, ProcessProgress.ProcessStatus status, String statusDesc);
/**
* 查询指定流程的所有阶段进度
* <p>前端查询进度时调用,返回该流程下所有阶段的进度信息</p>
*
* @param processId 流程ID
* @return Key=阶段名称,Value=进度信息;无数据时返回空Map(而非null)
*/
Map<String, ProcessProgress> queryProgress(String processId);
}
javapackage com.yourcompany.manager;
import com.yourcompany.entity.ProcessProgress;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Redis进度管理器(生产环境实现)
* <p>核心特性:支持分布式环境、进度数据持久化(Redis重启不丢失)</p>
* <p>存储结构:Redis Hash,Key=process:progress:{processId},Field=stageName,Value=ProcessProgress对象</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Component // 注册为Spring Bean,供其他类注入使用
public class RedisProgressManager implements ProgressManager {
/**
* Redis Key前缀
* <p>避免与其他业务的Key冲突,规范命名:业务:模块:标识</p>
*/
private static final String PROGRESS_KEY_PREFIX = "process:progress:";
/**
* Redis模板(Spring自动注入,使用自定义的JSON序列化配置)
*/
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public void initProgress(String processId, List<String> stageNames) {
// 拼接Redis Key
String redisKey = PROGRESS_KEY_PREFIX + processId;
// 初始化所有阶段的进度为"待处理"
Map<String, ProcessProgress> stageProgressMap = new HashMap<>();
for (String stageName : stageNames) {
ProcessProgress progress = new ProcessProgress(
processId,
stageName,
0, // 初始进度0%
ProcessProgress.ProcessStatus.WAIT, // 初始状态:待处理
"等待处理", // 初始描述
LocalDateTime.now() // 更新时间
);
stageProgressMap.put(stageName, progress);
}
// 批量写入Redis Hash
redisTemplate.opsForHash().putAll(redisKey, stageProgressMap);
}
@Override
public void updateProgress(String processId, String stageName, int percent, ProcessProgress.ProcessStatus status, String statusDesc) {
String redisKey = PROGRESS_KEY_PREFIX + processId;
// 查询当前阶段的进度(必须先初始化)
ProcessProgress progress = (ProcessProgress) redisTemplate.opsForHash().get(redisKey, stageName);
if (progress == null) {
throw new RuntimeException("流程ID[" + processId + "]的阶段[" + stageName + "]未初始化进度");
}
// 更新进度信息
progress.setProgressPercent(percent);
progress.setStatus(status);
progress.setStatusDesc(statusDesc);
progress.setUpdateTime(LocalDateTime.now());
// 写回Redis
redisTemplate.opsForHash().put(redisKey, stageName, progress);
}
@Override
public Map<String, ProcessProgress> queryProgress(String processId) {
String redisKey = PROGRESS_KEY_PREFIX + processId;
// 查询该流程下所有阶段的进度
Map<Object, Object> hashMap = redisTemplate.opsForHash().entries(redisKey);
// 转换为前端需要的Map结构(避免返回Object类型)
Map<String, ProcessProgress> result = new HashMap<>();
for (Map.Entry<Object, Object> entry : hashMap.entrySet()) {
String stageName = (String) entry.getKey();
ProcessProgress progress = (ProcessProgress) entry.getValue();
result.put(stageName, progress);
}
return result;
}
}
javapackage com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import com.yourcompany.entity.ProcessProgress;
import com.yourcompany.manager.ProgressManager;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* 抽象阶段处理器(责任链模式核心)
* <p>模板方法模式:定义流程的骨架,将具体业务逻辑延迟到子类实现</p>
* <p>核心流程:前置检查 → 执行业务 → 进度更新 → 异常处理 → 传递到下一个处理器</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Component // 抽象类也注册为Bean,供子类继承
public abstract class StageHandler {
/**
* 日志对象(子类可直接使用)
*/
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 下一个处理器(责任链节点)
*/
protected StageHandler nextHandler;
/**
* 进度管理器(Spring自动注入Redis实现)
*/
@Resource
protected ProgressManager progressManager;
/**
* 设置下一个处理器(组装责任链)
*
* @param nextHandler 下一个阶段的处理器
*/
public void setNextHandler(StageHandler nextHandler) {
this.nextHandler = nextHandler;
}
/**
* 模板方法:定义阶段处理的核心流程
* <p>子类不可重写,保证流程的一致性</p>
*
* @param context 流程上下文(包含输入/输出/状态)
*/
public final void process(ProcessContext context) {
String processId = context.getProcessId();
String stageName = getStageName();
// ========== 步骤1:初始化阶段进度为"处理中" ==========
progressManager.updateProgress(processId, stageName, 0, ProcessProgress.ProcessStatus.PROCESSING, "开始处理");
// ========== 步骤2:前置检查 ==========
if (!preCheck(context)) {
// 检查失败:更新进度为"失败"
progressManager.updateProgress(processId, stageName, 0, ProcessProgress.ProcessStatus.FAIL, context.getErrorMsg());
logger.error("阶段[{}]前置检查失败,流程ID:{},原因:{}", stageName, processId, context.getErrorMsg());
context.setProcessSuccess(false);
return;
}
try {
// ========== 步骤3:执行核心业务逻辑 ==========
logger.info("开始处理阶段[{}],流程ID:{}", stageName, processId);
doProcess(context);
// ========== 步骤4:处理成功,更新进度 ==========
progressManager.updateProgress(processId, stageName, 100, ProcessProgress.ProcessStatus.SUCCESS, "处理完成");
context.setProcessSuccess(true);
logger.info("阶段[{}]处理完成,流程ID:{}", stageName, processId);
} catch (Exception e) {
// ========== 步骤5:处理异常,更新进度 ==========
String errorMsg = "处理异常:" + e.getMessage();
progressManager.updateProgress(processId, stageName, 0, ProcessProgress.ProcessStatus.FAIL, errorMsg);
logger.error("阶段[{}]处理异常,流程ID:{}", stageName, processId, e); // 记录完整异常栈
context.setProcessSuccess(false);
context.setErrorMsg(errorMsg);
return; // 异常时终止责任链传递
}
// ========== 步骤6:传递到下一个阶段 ==========
if (nextHandler != null) {
nextHandler.process(context);
}
}
/**
* 获取阶段名称(子类实现)
* <p>如:材料处理、组卷、编目</p>
*
* @return 阶段名称
*/
protected abstract String getStageName();
/**
* 前置检查(子类实现)
* <p>检查当前阶段的输入是否合法,如:文件是否存在、参数是否完整</p>
*
* @param context 流程上下文
* @return true=检查通过,false=检查失败
*/
protected abstract boolean preCheck(ProcessContext context);
/**
* 核心业务逻辑(子类实现)
* <p>当前阶段的具体处理逻辑,如:OCR识别、组卷规则执行</p>
*
* @param context 流程上下文
* @throws Exception 业务执行过程中抛出的异常(由父类统一处理)
*/
protected abstract void doProcess(ProcessContext context) throws Exception;
}
javapackage com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import com.yourcompany.entity.ProcessProgress;
import com.yourcompany.strategy.MaterialProcessStrategy;
import com.yourcompany.strategy.MaterialProcessStrategyFactory;
import org.springframework.stereotype.Component;
/**
* 材料处理阶段处理器
* <p>核心职责:完成材料的OCR识别、目录/页码解析、处理策略执行</p>
* <p>进度更新:支持分步更新(20%→50%→80%→100%),提升前端体验</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Component
public class MaterialProcessStageHandler extends StageHandler {
@Override
protected String getStageName() {
return "材料处理";
}
@Override
protected boolean preCheck(ProcessContext context) {
// 检查流程ID是否为空
if (context.getProcessId() == null || context.getProcessId().trim().isEmpty()) {
context.setErrorMsg("流程ID不能为空");
return false;
}
// 检查上传文件是否存在
if (context.getUploadFile() == null || !context.getUploadFile().exists()) {
context.setErrorMsg("上传文件不存在或路径错误");
return false;
}
// 检查文件类型是否支持
if (context.getFileType() == null || !List.of("pdf", "jpg", "png").contains(context.getFileType().toLowerCase())) {
context.setErrorMsg("不支持的文件类型:" + context.getFileType() + ",仅支持pdf/jpg/png");
return false;
}
return true;
}
@Override
protected void doProcess(ProcessContext context) throws Exception {
String processId = context.getProcessId();
String stageName = getStageName();
// ========== 步骤1:OCR识别(进度20%) ==========
progressManager.updateProgress(processId, stageName, 20, ProcessProgress.ProcessStatus.PROCESSING, "OCR识别中...");
// 模拟调用外部OCR服务(实际项目中替换为真实接口)
String ocrResult = "模拟OCR识别结果:\n目录\n第一章 概述(P1)\n第二章 功能(P5)";
context.getStructuredData().put("ocrContent", ocrResult); // 存储OCR结果
// ========== 步骤2:目录/页码解析(进度50%) ==========
progressManager.updateProgress(processId, stageName, 50, ProcessProgress.ProcessStatus.PROCESSING, "解析目录和页码...");
// 判断是否包含目录
context.setHasDirectory(ocrResult.contains("目录"));
// 判断页码是否丢失(简单判断:包含"P数字"则认为页码完整)
context.setPageNumberLost(!ocrResult.matches(".*P\\d+.*"));
// ========== 步骤3:执行处理策略(进度80%) ==========
progressManager.updateProgress(processId, stageName, 80, ProcessProgress.ProcessStatus.PROCESSING, "执行材料处理策略...");
// 策略模式:根据材料特征选择不同的处理策略
MaterialProcessStrategy strategy = MaterialProcessStrategyFactory.getStrategy(context);
strategy.execute(context);
// ========== 步骤4:存储处理结果 ==========
// 父类模板方法会自动将进度更新为100%
context.getStageResult().put("materialProcess", "材料处理完成,是否有目录:" + context.isHasDirectory() + ",页码是否丢失:" + context.isPageNumberLost());
}
}
javapackage com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import org.springframework.stereotype.Component;
/**
* 组卷阶段处理器
* <p>核心职责:根据材料处理结果,按照业务规则完成组卷</p>
* <p>前置条件:材料处理完成,结构化数据非空</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Component
public class AssemblyStageHandler extends StageHandler {
@Override
protected String getStageName() {
return "组卷";
}
@Override
protected boolean preCheck(ProcessContext context) {
// 检查材料处理结果是否为空
if (context.getStructuredData().isEmpty()) {
context.setErrorMsg("材料处理结果为空,无法执行组卷");
return false;
}
// 检查是否包含核心组卷数据(OCR内容)
if (context.getStructuredData().get("ocrContent") == null) {
context.setErrorMsg("OCR识别结果为空,无法执行组卷");
return false;
}
return true;
}
@Override
protected void doProcess(ProcessContext context) throws Exception {
// 模拟组卷逻辑:根据是否有目录选择不同的组卷规则
String ocrContent = (String) context.getStructuredData().get("ocrContent");
String assemblyResult;
if (context.isHasDirectory()) {
assemblyResult = "按目录组卷完成:共识别到" + ocrContent.split("\n").length + "个章节";
} else {
assemblyResult = "按标题组卷完成:无目录,按段落拆分";
}
// 存储组卷结果
context.getStageResult().put("assembly", assemblyResult);
}
}
javapackage com.yourcompany.handler;
import com.yourcompany.context.ProcessContext;
import org.springframework.stereotype.Component;
/**
* 编目阶段处理器
* <p>核心职责:根据组卷结果,生成材料的编目索引</p>
* <p>前置条件:组卷完成,stageResult中包含assembly字段</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Component
public class CatalogStageHandler extends StageHandler {
@Override
protected String getStageName() {
return "编目";
}
@Override
protected boolean preCheck(ProcessContext context) {
// 检查组卷结果是否存在
if (context.getStageResult().get("assembly") == null) {
context.setErrorMsg("组卷结果为空,无法执行编目");
return false;
}
return true;
}
@Override
protected void doProcess(ProcessContext context) throws Exception {
// 模拟编目逻辑:生成唯一编目ID
String processId = context.getProcessId();
String catalogId = "CAT-" + processId.substring(4); // 截取材料ID后拼接编目前缀
String catalogResult = "编目完成,编目ID:" + catalogId + ",组卷规则:" + context.getStageResult().get("assembly");
// 存储编目结果
context.getStageResult().put("catalog", catalogResult);
}
}
javapackage com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
/**
* 材料处理策略接口
* <p>核心职责:定义不同类型材料的处理逻辑规范</p>
* <p>实现类:NormalMaterialStrategy(正常材料)、AbnormalMaterialStrategy(异常材料)</p>
*
* @author 开发团队
* @since 2026-01-19
*/
public interface MaterialProcessStrategy {
/**
* 执行材料处理策略
*
* @param context 流程上下文
*/
void execute(ProcessContext context);
}
javapackage com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
/**
* 正常材料处理策略
* <p>适用场景:有目录 + 页码完整的材料</p>
* <p>处理逻辑:按目录解析章节,无需额外校验</p>
*
* @author 开发团队
* @since 2026-01-19
*/
public class NormalMaterialStrategy implements MaterialProcessStrategy {
@Override
public void execute(ProcessContext context) {
// 正常材料处理逻辑:按目录解析章节
context.getStageResult().put("strategy", "正常材料策略:按目录解析章节,页码校验通过");
}
}
javapackage com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
/**
* 异常材料处理策略
* <p>适用场景:无目录 或 页码丢失的材料</p>
* <p>处理逻辑:按段落拆分,标记需要人工校验</p>
*
* @author 开发团队
* @since 2026-01-19
*/
public class AbnormalMaterialStrategy implements MaterialProcessStrategy {
@Override
public void execute(ProcessContext context) {
// 异常材料处理逻辑:标记需要人工校验
String reason = context.isHasDirectory() ? "页码丢失" : "无目录";
context.getStageResult().put("strategy", "异常材料策略:" + reason + ",按段落拆分,需人工校验");
}
}
javapackage com.yourcompany.strategy;
import com.yourcompany.context.ProcessContext;
/**
* 材料处理策略工厂
* <p>核心职责:根据材料特征(是否有目录、页码是否丢失)选择对应的处理策略</p>
* <p>设计原则:封装策略选择逻辑,避免业务层出现大量if-else</p>
*
* @author 开发团队
* @since 2026-01-19
*/
public class MaterialProcessStrategyFactory {
/**
* 获取材料处理策略
*
* @param context 流程上下文(包含材料特征)
* @return 对应的处理策略
*/
public static MaterialProcessStrategy getStrategy(ProcessContext context) {
// 正常材料:有目录 + 页码完整
if (context.isHasDirectory() && !context.isPageNumberLost()) {
return new NormalMaterialStrategy();
}
// 异常材料:无目录 或 页码丢失
else {
return new AbnormalMaterialStrategy();
}
}
}
javapackage com.yourcompany.repository;
import com.yourcompany.entity.Material;
import com.yourcompany.entity.MaterialBatch;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
/**
* 材料批次JPA仓库
* <p>核心职责:提供材料批次的CRUD操作</p>
* <p>继承JpaRepository:无需手动实现基本CRUD,Spring自动生成代理类</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Repository
public interface MaterialBatchRepository extends JpaRepository<MaterialBatch, String> {
}
/**
* 材料JPA仓库
* <p>核心职责:提供单材料的CRUD操作</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Repository
public interface MaterialRepository extends JpaRepository<Material, String> {
}
javapackage com.yourcompany.manager;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yourcompany.context.ProcessContext;
import com.yourcompany.entity.Material;
import com.yourcompany.entity.MaterialBatch;
import com.yourcompany.handler.AssemblyStageHandler;
import com.yourcompany.handler.CatalogStageHandler;
import com.yourcompany.handler.MaterialProcessStageHandler;
import com.yourcompany.repository.MaterialBatchRepository;
import com.yourcompany.repository.MaterialRepository;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import java.io.File;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
/**
* 材料处理流程总管理器
* <p>核心职责:</p>
* <li>1. 编排材料处理流程(上传→处理→组卷→编目)</li>
* <li>2. 提供手动触发组卷/编目的入口</li>
* <li>3. 管理批次和材料的状态持久化</li>
* <p>设计原则:聚合核心逻辑,对外提供简洁接口,隔离底层细节</p>
*
* @author 开发团队
* @since 2026-01-19
*/
@Service // 标记为业务层Bean,支持事务管理(如需可添加@Transactional)
public class MaterialProcessManager {
/**
* 材料批次仓库(JPA)
*/
@Resource
private MaterialBatchRepository batchRepository;
/**
* 材料仓库(JPA)
*/
@Resource
private MaterialRepository materialRepository;
/**
* 进度管理器(Redis)
*/
@Resource
private ProgressManager progressManager;
/**
* 材料处理处理器(责任链节点)
*/
@Resource
private MaterialProcessStageHandler materialProcessHandler;
/**
* 组卷处理器(责任链节点)
*/
@Resource
private AssemblyStageHandler assemblyHandler;
/**
* 编目处理器(责任链节点)
*/
@Resource
private CatalogStageHandler catalogHandler;
/**
* JSON工具类(序列化/反序列化)
*/
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 批量上传并处理材料
* <p>流程:初始化批次状态 → 逐个处理材料 → 更新批次状态为"已处理完成"</p>
*
* @param batch 前端传入的批次信息(包含用户ID、材料列表)
* @return 批次ID(供前端后续调用组卷/编目接口)
* @throws RuntimeException 批次ID/用户ID为空、材料列表为空时抛出
*/
public String batchUploadAndProcess(MaterialBatch batch) {
// 前置校验
if (batch.getBatchId() == null || batch.getBatchId().trim().isEmpty()) {
throw new RuntimeException("批次ID不能为空");
}
if (batch.getUserId() == null || batch.getUserId().trim().isEmpty()) {
throw new RuntimeException("用户ID不能为空");
}
if (batch.getMaterials() == null || batch.getMaterials().isEmpty()) {
throw new RuntimeException("材料列表不能为空");
}
// ========== 步骤1:初始化批次状态 ==========
batch.setStatus(MaterialBatch.BatchStatus.UPLOADING);
batch.setCreateTime(LocalDateTime.now());
batchRepository.save(batch); // 保存批次到数据库
// ========== 步骤2:逐个处理材料 ==========
List<String> stageNames = List.of("材料处理"); // 材料处理阶段名称
for (Material material : batch.getMaterials()) {
String materialId = material.getMaterialId();
// 校验材料ID
if (materialId == null || materialId.trim().isEmpty()) {
throw new RuntimeException("材料ID不能为空,批次ID:" + batch.getBatchId());
}
// 初始化该材料的进度
progressManager.initProgress(materialId, stageNames);
// 构建流程上下文
ProcessContext context = ProcessContext.builder()
.processId(materialId)
.uploadFile(new File(material.getFileName())) // 实际项目中需替换为真实文件路径
.fileType(material.getFileType())
.build();
// 执行材料处理
materialProcessHandler.process(context);
// ========== 步骤3:更新材料状态 ==========
if (context.isProcessSuccess()) {
material.setStatus(Material.MaterialStatus.PROCESSED);
material.setProcessTime(LocalDateTime.now());
// 序列化处理结果为JSON字符串(存储到数据库)
try {
material.setProcessResult(objectMapper.writeValueAsString(context.getStageResult()));
} catch (JsonProcessingException e) {
material.setProcessResult("{\"error\":\"处理结果序列化失败:" + e.getMessage() + "\"}");
}
} else {
material.setStatus(Material.MaterialStatus.UPLOADED);
material.setProcessResult("{\"error\":\"" + context.getErrorMsg() + "\"}");
}
materialRepository.save(material); // 保存材料到数据库
}
// ========== 步骤4:更新批次状态为"已处理完成" ==========
batch.setStatus(MaterialBatch.BatchStatus.PROCESSED);
batch.setUpdateTime(LocalDateTime.now());
batchRepository.save(batch);
return batch.getBatchId();
}
/**
* 手动触发组卷
* <p>前置条件:批次状态为"已处理完成"</p>
*
* @param batchId 批次ID
* @throws RuntimeException 批次不存在、批次状态不合法、组卷失败时抛出
*/
public void triggerAssembly(String batchId) {
// 查询批次(不存在则抛出异常)
MaterialBatch batch = batchRepository.findById(batchId)
.orElseThrow(() -> new RuntimeException("批次不存在:" + batchId));
// 校验批次状态(仅允许已处理完成的批次触发组卷)
if (!batch.getStatus().equals(MaterialBatch.BatchStatus.PROCESSED)) {
throw new RuntimeException("批次[" + batchId + "]状态为" + batch.getStatus().getDesc() + ",仅允许已处理完成的批次触发组卷");
}
// 更新批次状态为"组卷中"
batch.setStatus(MaterialBatch.BatchStatus.ASSEMBLING);
batchRepository.save(batch);
// ========== 逐个处理材料的组卷 ==========
for (Material material : batch.getMaterials()) {
try {
// 反序列化处理结果为Map(供组卷使用)
Map<String, Object> processResult = objectMapper.readValue(material.getProcessResult(), Map.class);
// 构建流程上下文
ProcessContext context = ProcessContext.builder()
.processId(material.getMaterialId())
.structuredData(processResult)
.build();
// 执行组卷
assemblyHandler.process(context);
// 更新材料状态
if (context.isProcessSuccess()) {
material.setStatus(Material.MaterialStatus.ASSEMBLED);
material.setAssembleTime(LocalDateTime.now());
}
materialRepository.save(material);
} catch (Exception e) {
throw new RuntimeException("材料[" + material.getMaterialId() + "]组卷失败:" + e.getMessage(), e);
}
}
// 组卷完成后,恢复批次状态为"已处理完成"(等待编目)
batch.setStatus(MaterialBatch.BatchStatus.PROCESSED);
batchRepository.save(batch);
}
/**
* 手动触发编目
* <p>前置条件:批次状态为"已处理完成"</p>
*
* @param batchId 批次ID
* @throws RuntimeException 批次不存在、批次状态不合法、编目失败时抛出
*/
public void triggerCatalog(String batchId) {
// 查询批次
MaterialBatch batch = batchRepository.findById(batchId)
.orElseThrow(() -> new RuntimeException("批次不存在:" + batchId));
// 校验批次状态
if (!batch.getStatus().equals(MaterialBatch.BatchStatus.PROCESSED)) {
throw new RuntimeException("批次[" + batchId + "]状态为" + batch.getStatus().getDesc() + ",仅允许已处理完成的批次触发编目");
}
// 更新批次状态为"编目中"
batch.setStatus(MaterialBatch.BatchStatus.CATALOGING);
batchRepository.save(batch);
// ========== 逐个处理材料的编目 ==========
for (Material material : batch.getMaterials()) {
try {
// 反序列化处理结果
Map<String, Object> processResult = objectMapper.readValue(material.getProcessResult(), Map.class);
// 构建流程上下文
ProcessContext context = ProcessContext.builder()
.processId(material.getMaterialId())
.structuredData(processResult)
.build();
// 执行编目
catalogHandler.process(context);
// 更新材料状态
if (context.isProcessSuccess()) {
material.setStatus(Material.MaterialStatus.CATALOGED);
material.setCatalogTime(LocalDateTime.now());
}
materialRepository.save(material);
} catch (Exception e) {
throw new RuntimeException("材料[" + material.getMaterialId() + "]编目失败:" + e.getMessage(), e);
}
}
// 编目完成后,更新批次状态为"已完成"
batch.setStatus(MaterialBatch.BatchStatus.COMPLETED);
batch.setUpdateTime(LocalDateTime.now());
batchRepository.save(batch);
}
/**
* 获取批次仓库(供控制器调用)
*
* @return 材料批次仓库
*/
public MaterialBatchRepository getBatchRepository() {
return batchRepository;
}
}
javapackage com.yourcompany.controller;
import com.yourcompany.entity.Material;
import com.yourcompany.entity.MaterialBatch;
import com.yourcompany.entity.ProcessProgress;
import com.yourcompany.manager.MaterialProcessManager;
import com.yourcompany.manager.ProgressManager;
import jakarta.annotation.Resource;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
/**
* 材料处理流程控制器
* <p>核心职责:提供前端交互的RESTful接口</p>
* <p>接口列表:</p>
* <li>1. 批量上传并处理材料:POST /material/batch/upload</li>
* <li>2. 手动触发组卷:POST /material/batch/{batchId}/assembly</li>
* <li>3. 手动触发编目:POST /material/batch/{batchId}/catalog</li>
* <li>4. 查询批次进度:GET /material/batch/{batchId}/progress</li>
*
* @author 开发团队
* @since 2026-01-19
*/
@RestController
@RequestMapping("/material") // 接口前缀
public class MaterialProcessController {
/**
* 流程总管理器
*/
@Resource
private MaterialProcessManager processManager;
/**
* 进度管理器
*/
@Resource
private ProgressManager progressManager;
/**
* 批量上传并处理材料接口
* <p>前端上传批次信息后调用,返回批次ID供后续操作</p>
*
* @param batch 批次信息(包含用户ID、材料列表)
* @return 响应体:批次ID(String)
*/
@PostMapping("/batch/upload")
public ResponseEntity<String> batchUploadAndProcess(@RequestBody MaterialBatch batch) {
String batchId = processManager.batchUploadAndProcess(batch);
return ResponseEntity.ok(batchId); // 200 OK + 批次ID
}
/**
* 手动触发组卷接口
* <p>前端点击"组卷"按钮后调用</p>
*
* @param batchId 批次ID(路径参数)
* @return 响应体:无(200 OK)
*/
@PostMapping("/batch/{batchId}/assembly")
public ResponseEntity<Void> triggerAssembly(@PathVariable String batchId) {
processManager.triggerAssembly(batchId);
return ResponseEntity.ok().build(); // 200 OK 无返回体
}
/**
* 手动触发编目接口
* <p>前端点击"编目"按钮后调用</p>
*
* @param batchId 批次ID(路径参数)
* @return 响应体:无(200 OK)
*/
@PostMapping("/batch/{batchId}/catalog")
public ResponseEntity<Void> triggerCatalog(@PathVariable String batchId) {
processManager.triggerCatalog(batchId);
return ResponseEntity.ok().build();
}
/**
* 查询批次进度接口
* <p>前端轮询调用,展示每个材料的各阶段进度</p>
*
* @param batchId 批次ID(路径参数)
* @return 响应体:Map<String, ProcessProgress>(Key=阶段名称,Value=进度信息)
*/
@GetMapping("/batch/{batchId}/progress")
public ResponseEntity<Map<String, ProcessProgress>> queryBatchProgress(@PathVariable String batchId) {
// 查询批次
MaterialBatch batch = processManager.getBatchRepository().findById(batchId)
.orElseThrow(() -> new RuntimeException("批次不存在:" + batchId));
// 汇总该批次下所有材料的进度
Map<String, ProcessProgress> progressMap = new HashMap<>();
for (Material material : batch.getMaterials()) {
progressMap.putAll(progressManager.queryProgress(material.getMaterialId()));
}
return ResponseEntity.ok(progressMap); // 200 OK + 进度信息
}
}
| 注释类型 | 核心要求 |
|---|---|
| 类注释 | 说明类的核心职责、设计原则、适用场景、版本/作者信息 |
| 方法注释 | 说明方法功能、入参/出参含义、异常场景、业务规则 |
| 字段注释 | 说明字段的业务含义、生成规则、约束条件(如非空、长度) |
| 枚举注释 | 说明每个枚举值的业务含义,配套desc字段供前端展示 |
| 关键逻辑注释 | 说明“为什么这么做”(而非“做了什么”),如:策略选择、进度分步更新的原因 |
这套带完整注释的代码可直接用于生产环境,后续维护/扩展时,开发者只需阅读注释即可快速理解代码逻辑,大幅降低维护成本。
在你这套基于SpringBoot 3.5的材料处理流程代码中,可维护性(易理解、易修改、易排查问题)和可扩展性(低成本新增功能/适配变化)的保障需要从设计原则落地、代码结构规范、扩展机制固化、工程化实践四个维度系统性设计,以下是具体可落地的方案:
所有代码设计严格遵循 SOLID原则,这是可维护和可扩展的基础,也是你这套代码的核心骨架:
StageHandler 及其子类:仅负责单个阶段的业务逻辑(材料处理/组卷/编目),不掺杂进度存储、批次管理等逻辑;ProgressManager:仅负责进度的增删改查,不涉及业务流程;MaterialProcessManager:仅负责流程编排(触发上传/组卷/编目),不做具体的阶段业务逻辑;ProcessContext:仅承载流程数据/状态,不包含任何业务逻辑。StageHandler 实现 getStageName()/preCheck()/doProcess(),无需修改原有处理器和管理器;MaterialProcessStrategy 接口,修改 MaterialProcessStrategyFactory(仅工厂类少量修改,符合“最小修改原则”);ProgressManager 接口(如从Redis改为数据库),通过Spring Bean替换即可,业务代码无感知。ProgressManager 接口,而非 RedisProgressManager 具体实现;MaterialProcessManager 依赖 StageHandler 抽象类,而非具体的 MaterialProcessStageHandler;@Resource private ProgressManager progressManager),而非具体类。ProgressManager 仅定义进度相关的3个方法(初始化/更新/查询),不掺杂批次管理、材料处理等无关方法;MaterialProcessStrategy 仅定义 execute() 方法,不同策略只需实现核心逻辑,无需冗余代码。你的代码已按“分层思想”设计,需固化这套结构,所有新增代码必须遵循:
com.yourcompany ├── controller :仅接收前端请求,参数校验,调用管理器,返回结果(无业务逻辑) ├── manager :流程编排/核心逻辑聚合(如批次管理、手动触发),不做具体业务 ├── handler :单个阶段的业务逻辑(责任链节点) ├── strategy :阶段内的分支逻辑(策略模式) ├── entity :数据模型(JPA实体/进度实体),无业务逻辑 ├── context :流程上下文(仅存数据) ├── repository :数据持久化(JPA仓库),仅做CRUD ├── config :框架配置(Redis/JPA),无业务逻辑
MaterialProcessStageHandler 而非 StageHandler1);batchUploadAndProcess 而非 upload);PROGRESS_KEY_PREFIX 而非 progressKey)。triggerAssembly 注释“仅允许已处理完成的批次调用”);StageHandler 的 process() 模板方法复用了“进度更新+异常处理+流程传递”逻辑,所有子类无需重复写这些代码;JsonUtils),避免多处重复代码。针对你这套代码的核心扩展场景,制定无侵入式扩展流程,所有开发人员必须遵循:
步骤1:新建类 `AuditStageHandler` 继承 `StageHandler`,实现3个抽象方法(阶段名/前置检查/核心逻辑); 步骤2:在Spring中注册为Bean(加 `@Component`); 步骤3:在 `MaterialProcessManager` 中注入该处理器,按需调整流程编排逻辑(如上传处理后→审核→组卷); 步骤4:更新 `ProgressManager` 的阶段名称初始化逻辑(新增“材料审核”阶段);
MaterialProcessStageHandler/AssemblyStageHandler,仅新增类+少量编排调整。步骤1:新建类 `SpecialMaterialStrategy` 实现 `MaterialProcessStrategy`; 步骤2:在 `MaterialProcessStrategyFactory` 中新增分支判断(如 `else if (context.isSpecialMaterial())`); 步骤3:测试该分支逻辑,不影响原有策略;
步骤1:新建类 `DbProgressManager` 实现 `ProgressManager` 接口; 步骤2:在Spring配置中替换Bean(如通过 `@Primary` 注解优先注入Db实现); 步骤3:测试进度的初始化/更新/查询逻辑,业务代码无需修改;
步骤1:封装外部服务门面类 `OcrServiceFacade`,定义统一接口(如 `String doOcr(File file)`); 步骤2:为不同服务商实现适配器(如 `BaiduOcrAdapter`/`AliOcrAdapter`); 步骤3:在 `MaterialProcessStageHandler` 中调用门面类,而非直接调用服务商API;
application.yml 中,通过 @Value 或 @ConfigurationProperties 注入;application-dev.yml/application-prod.yml),切换环境无需改代码。新增全局异常处理器,统一返回格式,避免零散的try-catch:
java@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(RuntimeException.class)
public ResponseEntity<Map<String, Object>> handleRuntimeException(RuntimeException e) {
Map<String, Object> result = new HashMap<>();
result.put("code", 500);
result.put("msg", e.getMessage());
result.put("timestamp", LocalDateTime.now());
return ResponseEntity.status(500).body(result);
}
// 新增其他异常处理(参数校验异常、IO异常等)
}
processId/batchId 等关键标识;javalogger.info("批次[{}]开始处理,包含{}个材料", batchId, batch.getMaterials().size());
logger.error("材料[{}]处理失败,原因:{}", materialId, e.getMessage(), e);
master(生产)、dev(开发)、feature/*(功能分支),新增功能在feature分支开发,通过PR合并到dev,代码审查通过后再合并到master;| 维度 | 核心保障措施 |
|---|---|
| 设计层面 | 严格遵循SOLID原则,依赖抽象而非具体实现 |
| 代码层面 | 分层清晰、单一职责、命名规范、注释完善、复用通用逻辑 |
| 扩展层面 | 基于接口/抽象类扩展,新增功能仅新增类/修改工厂,不改动核心业务逻辑 |
| 工程层面 | 配置外置、统一异常处理、完善日志、测试覆盖、版本控制+代码审查 |
| 团队层面 | 固化扩展流程(新增阶段/策略/存储的标准步骤),形成开发手册 |
你这套代码的架构本身已具备良好的可维护性和可扩展性基础(责任链+策略模式+接口抽象),后续只需:
通过以上措施,即使后续新增多个主阶段、数十种材料处理分支,代码依然能保持清晰的结构,修改和扩展的成本始终可控。
你在处理PDF拆页OCR、每页调用dify流判断文件类型等长耗时、高频次操作时引入了多线程和队列,核心要解决的是 ProcessContext 在并发修改、跨线程传递、队列消费 场景下的线程安全、数据一致性、状态可靠 三大问题。以下是可落地的完整保障方案,结合你的业务场景(PDF拆页/OCR/dify调用)逐一拆解。
你面临的 ProcessContext 风险点:
ProcessContext 的 structuredData/stageResult 等Map结构,导致 ConcurrentModificationException;processSuccess/hasDirectory 等状态,导致最终状态错乱;ProcessContext 对象时,可能因序列化/反序列化丢失数据,或消费线程获取到过期上下文;ProcessContext,导致数据串扰。核心原则:让上下文“不可变”或“线程封闭”,避免多线程直接修改同一个对象。
将原有可变的Map替换为线程安全的 ConcurrentHashMap,核心状态用原子类管理,同时提供“副本创建”方法(子线程操作副本,主线程汇总):
javapackage com.yourcompany.context;
import lombok.Builder;
import lombok.Data;
import lombok.With;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 线程安全的流程上下文(适配多线程+队列场景)
* 核心改造:
* 1. 核心Map用ConcurrentHashMap避免并发修改异常;
* 2. 状态字段用AtomicBoolean保证原子更新;
* 3. @With注解生成副本方法(子线程操作副本,不影响主线程);
* 4. 禁止直接修改核心字段,仅通过方法更新。
*/
@Data
@Builder
public class ProcessContext {
/** 全局唯一标识:关联主任务(如PDF文件)+ 子任务(如页码),格式:{processId}-page-{pageNum} */
private final String uniqueId;
/** 主流程ID(材料ID),关联原始材料 */
private final String mainProcessId;
/** 子任务标识(如PDF页码),用于多页处理 */
private final String subTaskId;
/** 上传的原始文件(主线程唯一持有,子线程只读) */
private final File uploadFile;
/** 文件类型(只读,避免并发修改) */
private final String fileType;
/** 线程安全的结构化数据存储(支持多线程读写) */
private final Map<String, Object> structuredData = new ConcurrentHashMap<>();
/** 线程安全的阶段结果存储 */
private final Map<String, Object> stageResult = new ConcurrentHashMap<>();
/** 原子布尔值:是否存在目录(避免并发覆盖) */
private final AtomicBoolean hasDirectory = new AtomicBoolean(false);
/** 原子布尔值:页码是否丢失 */
private final AtomicBoolean pageNumberLost = new AtomicBoolean(false);
/** 原子布尔值:子任务处理是否成功 */
private final AtomicBoolean subTaskSuccess = new AtomicBoolean(true);
/** 原子布尔值:主流程处理是否成功 */
private final AtomicBoolean mainProcessSuccess = new AtomicBoolean(true);
/** 错误信息(线程安全的字符串拼接) */
private final StringBuilder errorMsg = new StringBuilder();
// ========== 安全更新方法:封装原子操作,禁止外部直接set ==========
/**
* 安全更新“是否有目录”状态
* @param value 新值
*/
public void setHasDirectory(boolean value) {
this.hasDirectory.set(value);
}
/**
* 安全追加错误信息(多线程下避免字符串覆盖)
* @param msg 错误信息
*/
public void appendErrorMsg(String msg) {
synchronized (errorMsg) { // 同步块保证字符串拼接原子性
errorMsg.append(msg).append("; ");
}
}
/**
* 创建子任务上下文副本(核心:线程封闭,子线程操作副本)
* @param subTaskId 子任务ID(如PDF页码)
* @return 独立的子上下文
*/
public ProcessContext createSubContext(String subTaskId) {
return ProcessContext.builder()
.uniqueId(this.mainProcessId + "-" + subTaskId)
.mainProcessId(this.mainProcessId)
.subTaskId(subTaskId)
.uploadFile(this.uploadFile)
.fileType(this.fileType)
.build();
}
/**
* 汇总子上下文结果到主上下文(主线程执行,避免并发)
* @param subContext 子任务上下文
*/
public void mergeSubContext(ProcessContext subContext) {
// 合并结构化数据(ConcurrentHashMap支持并发putAll)
this.structuredData.putAll(subContext.getStructuredData());
this.stageResult.putAll(subContext.getStageResult());
// 原子更新失败状态:只要有一个子任务失败,主任务标记为失败
if (!subContext.getSubTaskSuccess().get()) {
this.mainProcessSuccess.set(false);
this.appendErrorMsg(subContext.getErrorMsg().toString());
}
}
}
| 改造项 | 解决的问题 |
|---|---|
ConcurrentHashMap | 避免多线程修改 structuredData/stageResult 时抛出 ConcurrentModificationException |
AtomicBoolean | 保证 hasDirectory/processSuccess 等状态的原子更新,避免并发覆盖 |
createSubContext() | 每个子任务(如PDF单页)生成独立上下文,线程封闭,互不干扰 |
mergeSubContext() | 主线程统一汇总子任务结果,避免多线程直接修改主上下文 |
uniqueId | 全局唯一标识,关联主任务+子任务,便于队列/线程中追踪上下文 |
以“PDF拆页后逐页OCR + 逐页调用dify判断类型”为例,实现多线程安全处理:
java@Component
public class MaterialProcessStageHandler extends StageHandler {
// 自定义线程池(适配PDF拆页场景,核心线程数=CPU核心数*2,避免线程过多)
@Resource
private ExecutorService pdfProcessExecutor;
// 线程安全的上下文缓存(用于队列/线程间共享上下文)
@Resource
private ContextCacheManager contextCacheManager;
@Override
protected void doProcess(ProcessContext mainContext) throws Exception {
String mainProcessId = mainContext.getMainProcessId();
// 步骤1:将主上下文存入缓存(队列消费时可通过ID获取)
contextCacheManager.put(mainProcessId, mainContext);
// 步骤2:PDF拆页(假设拆出N页)
List<File> pdfPages = pdfSplitUtil.splitPdf(mainContext.getUploadFile());
List<CompletableFuture<Void>> futures = new ArrayList<>();
// 步骤3:多线程处理每一页(OCR + dify判断)
for (int i = 0; i < pdfPages.size(); i++) {
int pageNum = i + 1;
File pageFile = pdfPages.get(i);
// 创建子上下文(线程封闭,每个页面对应独立上下文)
ProcessContext subContext = mainContext.createSubContext("page-" + pageNum);
// 提交子任务到线程池
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
// 子任务1:调用OCR提取当前页内容
String ocrContent = externalServiceFacade.callOcrService(pageFile);
subContext.getStructuredData().put("page-" + pageNum + "-ocr", ocrContent);
// 子任务2:调用dify判断当前页文件类型
String fileType = externalServiceFacade.callDifyMultimodalFlow(pageFile);
subContext.getStructuredData().put("page-" + pageNum + "-type", fileType);
// 标记子任务成功
subContext.getSubTaskSuccess().set(true);
} catch (Exception e) {
// 标记子任务失败,记录错误信息
subContext.getSubTaskSuccess().set(false);
subContext.appendErrorMsg("页码" + pageNum + "处理失败:" + e.getMessage());
logger.error("子任务{}处理失败", subContext.getUniqueId(), e);
} finally {
// 子任务完成后,将子上下文存入缓存(供主线程汇总)
contextCacheManager.put(subContext.getUniqueId(), subContext);
}
}, pdfProcessExecutor);
futures.add(future);
}
// 步骤4:等待所有子任务完成,汇总结果
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (int i = 0; i < pdfPages.size(); i++) {
int pageNum = i + 1;
String subContextId = mainProcessId + "-page-" + pageNum;
// 从缓存获取子上下文,汇总到主上下文
ProcessContext subContext = contextCacheManager.get(subContextId);
if (subContext != null) {
mainContext.mergeSubContext(subContext);
// 汇总后移除子上下文,释放内存
contextCacheManager.remove(subContextId);
}
}
// 步骤5:更新主上下文状态(主线程操作,无并发问题)
if (mainContext.getMainProcessSuccess().get()) {
mainContext.setHasDirectory(checkHasDirectory(mainContext.getStructuredData()));
progressManager.updateProgress(mainProcessId, getStageName(), 100, SUCCESS, "所有页码处理完成");
} else {
progressManager.updateProgress(mainProcessId, getStageName(), 0, FAIL, mainContext.getErrorMsg().toString());
}
}
}
队列中不直接传递ProcessContext对象(避免序列化/反序列化问题、并发修改),而是传递上下文唯一ID,消费线程从线程安全的缓存中获取上下文:
java@Component
public class ContextCacheManager {
// 并发HashMap作为缓存,过期时间1小时(避免内存泄漏)
private final LoadingCache<String, ProcessContext> contextCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.concurrencyLevel(Runtime.getRuntime().availableProcessors()) // 并发级别=CPU核心数
.build(new CacheLoader<String, ProcessContext>() {
@Override
public ProcessContext load(String key) throws Exception {
// 缓存未命中时返回空,避免创建空对象
return null;
}
});
/**
* 存入上下文(线程安全)
* @param contextId 上下文唯一ID
* @param context 上下文对象
*/
public void put(String contextId, ProcessContext context) {
try {
contextCache.put(contextId, context);
} catch (Exception e) {
logger.error("存入上下文缓存失败,ID:{}", contextId, e);
}
}
/**
* 获取上下文(线程安全)
* @param contextId 上下文唯一ID
* @return 上下文对象(null=未找到/已过期)
*/
public ProcessContext get(String contextId) {
try {
return contextCache.get(contextId);
} catch (Exception e) {
logger.error("获取上下文缓存失败,ID:{}", contextId, e);
return null;
}
}
/**
* 移除上下文(释放内存)
* @param contextId 上下文唯一ID
*/
public void remove(String contextId) {
contextCache.invalidate(contextId);
}
}
java@Component
public class DifyTypeJudgeProducer {
@Resource
private RabbitTemplate rabbitTemplate;
// 生产:发送上下文ID到队列,而非上下文对象
public void sendDifyJudgeTask(String contextId) {
rabbitTemplate.convertAndSend("dify-type-judge-queue", contextId);
}
}
@Component
public class DifyTypeJudgeConsumer {
@Resource
private ContextCacheManager contextCacheManager;
@Resource
private ExternalServiceFacade externalServiceFacade;
@RabbitListener(queues = "dify-type-judge-queue")
public void consume(String contextId) {
// 消费:通过ID从缓存获取上下文(线程安全)
ProcessContext context = contextCacheManager.get(contextId);
if (context == null) {
logger.error("上下文{}已过期或不存在,跳过处理", contextId);
return;
}
try {
// 处理dify判断逻辑(单线程消费,无并发修改)
String fileType = externalServiceFacade.callDifyMultimodalFlow(context.getUploadFile());
context.getStructuredData().put("dify-type", fileType);
context.getSubTaskSuccess().set(true);
} catch (Exception e) {
context.getSubTaskSuccess().set(false);
context.appendErrorMsg("dify判断失败:" + e.getMessage());
} finally {
// 更新缓存中的上下文
contextCacheManager.put(contextId, context);
}
}
}
所有对 ProcessContext 状态的修改,必须通过封装的原子方法(如 setHasDirectory()、appendErrorMsg()),禁止外部直接修改:
java// 错误示例:直接修改原子变量(可能导致并发覆盖)
context.getHasDirectory().set(true);
// 正确示例:通过封装方法更新
context.setHasDirectory(true);
队列消费/多线程处理失败时,标记上下文状态并支持重试,避免上下文状态丢失:
java// 重试逻辑示例(结合Spring Retry)
@Retryable(
value = {Exception.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void processDifyTask(ProcessContext context) {
try {
String fileType = externalServiceFacade.callDifyMultimodalFlow(context.getUploadFile());
context.getStructuredData().put("dify-type", fileType);
} catch (Exception e) {
// 重试失败后,标记上下文状态
context.getSubTaskSuccess().set(false);
context.appendErrorMsg("dify调用失败(重试3次):" + e.getMessage());
throw e; // 触发重试
}
}
// 重试耗尽后的降级处理
@Recover
public void recoverDifyTask(Exception e, ProcessContext context) {
logger.error("dify任务重试耗尽,执行降级逻辑", e);
// 降级:使用本地规则判断文件类型
String fallbackType = fileTypeUtil.judgeBySuffix(context.getFileType());
context.getStructuredData().put("dify-type", fallbackType + "-降级");
}
避免线程池耗尽、队列阻塞导致上下文处理异常:
java// 线程池配置类
@Configuration
public class ExecutorConfig {
@Bean(name = "pdfProcessExecutor")
public ExecutorService pdfProcessExecutor() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
int maxPoolSize = corePoolSize * 2;
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 队列大小适配PDF拆页数量
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "pdf-process-thread-" + count.incrementAndGet());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:主线程执行,避免任务丢失
);
}
}
| 保障维度 | 核心措施 |
|---|---|
| 线程安全 | 1. ProcessContext使用ConcurrentHashMap/AtomicBoolean; 2. 子任务创建独立上下文,线程封闭; 3. 主线程统一汇总结果 |
| 队列传递安全 | 1. 队列传递上下文ID而非对象; 2. 线程安全的缓存管理上下文; 3. 消费线程单线程处理,避免并发修改 |
| 数据一致性 | 1. 原子方法更新状态,禁止直接修改; 2. 汇总子上下文时主线程操作; 3. 错误信息同步块追加 |
| 异常可靠性 | 1. 子任务失败标记状态,不影响主上下文; 2. 队列/多线程任务支持重试; 3. 重试耗尽后降级处理 |
| 内存安全 | 1. 上下文缓存设置过期时间; 2. 子任务完成后移除缓存; 3. 线程池拒绝策略避免任务丢失 |
通过以上方案,你可以在多线程+队列场景下,完全保证 ProcessContext 的线程安全、数据一致、状态可靠,同时适配PDF拆页OCR、逐页dify判断等核心业务场景。如果需要针对具体队列(如Kafka/RocketMQ)或线程池配置做细化调整,可以告诉我,我会补充对应的代码示例。
本文作者:wucc
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-SA 许可协议。转载请注明出处!