# Web服务执行流程文档 ## 概述 本文档详细描述了从 `main.py` 中调用 `uvicorn.run()` 开始,到处理HTTP请求的完整执行流程。 ## 1. 启动阶段 (Startup) ### 1.1 入口点:main.py **文件位置**: `colocation_mvp/main.py` ```python uvicorn.run( "web.app:app", host=args.host, port=args.port, reload=False, log_level=args.log_level.lower() ) ``` **执行流程**: 1. `uvicorn.run()` 启动 ASGI 服务器 2. 加载 `web.app` 模块中的 `app` 对象(FastAPI 应用实例) 3. 触发 FastAPI 应用的启动事件 --- ### 1.2 FastAPI 应用初始化:web/app.py **文件位置**: `colocation_mvp/web/app.py` **执行步骤**: #### 步骤 1: 创建 FastAPI 应用实例 ```python app = FastAPI( title="Co-location MVP Web API", description="Web interface for interactive co-location pattern mining with Stage3 preference learning", version="1.0.0" ) ``` #### 步骤 2: 执行启动事件 (`@app.on_event("startup")`) ```python @app.on_event("startup") async def startup_event(): global service # 1. 初始化 PipelineManager config_path = project_root / "config" / "config.yaml" manager = PipelineManager(config_path=str(config_path)) # 2. 创建 WebService service = WebService(manager) # 3. 创建并挂载路由 router = create_router(service) app.include_router(router) logger.info("Web service initialized successfully") ``` **关键调用链**: - `PipelineManager.__init__()` → 初始化所有系统组件 - `WebService.__init__()` → 封装业务逻辑层 - `create_router()` → 创建API路由 --- ### 1.3 PipelineManager 初始化:controller/manager.py **文件位置**: `colocation_mvp/controller/manager.py` **执行步骤**: #### 步骤 1: 加载配置文件 ```python with open(config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) ``` #### 步骤 2: 初始化核心组件 ```python # LLM客户端 self.llm_client = LLMClient(config_path) # 偏好解析器 self.preference_parser = PreferenceParser( default_params=self.config.get('mining', {}) ) # 共现模式挖掘器 self.miner = CoLocationMiner( data_path=self.config['data']['data_path'], distance_threshold=16.0 ) ``` #### 步骤 3: 初始化 Stage0 (意图理解) ```python if self.stage0_enabled: available_poi_types = list(self.miner.feature_counts.keys()) self.intent_encoder = IntentEncoder(self.llm_client, available_poi_types=available_poi_types) self.intent_mapper = IntentMapper(self.embedder) ``` #### 步骤 4: 初始化 Stage3 (偏好学习) ```python # 记忆存储 self.memory = MemoryStore() # 偏好学习器 self.learner = PreferenceLearner(decay_lambda=decay_lambda) # 模式嵌入器 self.embedder = PatternEmbedder() # 可选:对比学习训练器 if self.stage3_use_contrastive: self.trainer = PreferenceTrainer(...) self.preference_model = PreferenceEncoder(...) ``` #### 步骤 5: 初始化 Stage4 (迭代交互) ```python if self.stage4_enabled: self.iteration_manager = IterationManager( miner=self.miner, learner=self.learner, memory=self.memory, embedder=self.embedder, llm_client=self.llm_client, intent_encoder=self.intent_encoder, intent_mapper=self.intent_mapper, fusion_alpha=self.fusion_alpha ) ``` --- ### 1.4 WebService 初始化:web/services.py **文件位置**: `colocation_mvp/web/services.py` ```python def __init__(self, manager: PipelineManager): self.manager = manager self.memory = MemoryStore() self.embedder = PatternEmbedder() ``` --- ### 1.5 路由创建:web/router.py **文件位置**: `colocation_mvp/web/router.py` **执行步骤**: ```python def create_router(service: WebService) -> APIRouter: router = APIRouter(prefix="/api", tags=["api"]) # 注册所有API端点 @router.post("/query") @router.post("/feedback") @router.post("/train") @router.get("/train/history") @router.post("/iteration/start") @router.post("/iteration/next") @router.post("/iteration/finalize") return router ``` **注册的API端点**: - `POST /api/query` - 处理自然语言查询 - `POST /api/feedback` - 保存用户反馈 - `POST /api/train` - 触发模型训练 - `GET /api/train/history` - 获取训练历史 - `POST /api/iteration/start` - 启动迭代会话 (Stage4) - `POST /api/iteration/next` - 处理反馈并返回下一轮 (Stage4) - `POST /api/iteration/finalize` - 获取最终结果 (Stage4) --- ## 2. 请求处理阶段 (Request Handling) ### 2.1 HTTP 请求到达 当客户端发送HTTP请求时,uvicorn将请求路由到对应的FastAPI端点。 --- ### 2.2 示例:处理查询请求 (`POST /api/query`) **完整调用链**: ``` 客户端请求 ↓ uvicorn 接收请求 ↓ FastAPI 路由匹配: POST /api/query ↓ web/router.py: query() 函数 ↓ web/services.py: WebService.run_query() ↓ controller/manager.py: PipelineManager.process_query() ↓ [根据是否启用Stage4选择处理路径] ↓ ├─ Stage4启用: controller/manager.py: start_iteration() │ ↓ │ controller/iteration_manager.py: IterationManager.init_user_vector() │ ↓ │ [Stage0: 意图理解] │ ├─ llm/intent_encoder.py: IntentEncoder.parse() │ └─ llm/intent_mapper.py: IntentMapper.to_vector() │ ↓ │ controller/iteration_manager.py: IterationManager.run_one_iteration_step() │ ↓ │ controller/iteration_manager.py: IterationManager.run_one_round() │ ↓ │ core/miner.py: CoLocationMiner.mine_patterns() │ ↓ │ learning/learner.py: PreferenceLearner._score_with_vector() │ ↓ │ 返回结果 │ └─ Stage4未启用: controller/manager.py: process_query() ↓ [Stage0: 意图理解] (可选) ├─ llm/intent_encoder.py: IntentEncoder.parse() └─ llm/intent_mapper.py: IntentMapper.to_vector() ↓ core/miner.py: CoLocationMiner.mine_patterns() ↓ learning/learner.py: PreferenceLearner.score_patterns() ↓ preference/scorer.py: score_patterns() (可选) ↓ controller/manager.py: _generate_explanation() ↓ 返回结果 ``` --- ### 2.3 详细函数调用说明 #### 2.3.1 web/router.py: query() ```python @router.post("/query", response_model=QueryResponse) async def query(request: QueryRequest) -> QueryResponse: result = service.run_query(request.query, iteration_rounds=request.iteration_rounds) return QueryResponse(**result) ``` **职责**: - 接收HTTP请求 - 调用服务层处理查询 - 返回格式化的响应 --- #### 2.3.2 web/services.py: run_query() ```python def run_query(self, query: str, iteration_rounds: Optional[int] = None) -> Dict[str, Any]: result = self.manager.process_query(query, iteration_rounds=iteration_rounds) cleaned_result = self._clean_result(result) # 清理numpy数组等 return cleaned_result ``` **职责**: - 调用 PipelineManager 处理查询 - 清理结果(转换numpy数组为列表) - 处理数据格式转换 --- #### 2.3.3 controller/manager.py: process_query() **关键逻辑**: ```python def process_query(self, query: str, iteration_rounds: Optional[int] = None) -> Dict[str, Any]: # 如果指定了迭代轮数,使用Stage4迭代模式 if iteration_rounds is not None and self.stage4_enabled: return self.start_iteration(query, iteration_rounds) # 否则使用传统处理流程 # Step 1: 提取偏好参数 params = self._extract_preferences(query) # Step 2: 解析偏好参数 mining_params = self.preference_parser.parse_preference(...) # Step 3: 挖掘共现模式 patterns = self.miner.mine_patterns(...) # Step 4: 评分和排序 if self.stage0_enabled: # 使用融合向量评分 scores = self.learner.score_patterns(patterns, user_id=..., interaction_round=0) else: # 使用传统方法评分 scores = score_with_preference(patterns, ...) # Step 5: 生成解释 explanation = self._generate_explanation(...) return result ``` --- #### 2.3.4 core/miner.py: mine_patterns() **执行流程**: ```python def mine_patterns(self, min_participation: float = 0.6, max_pattern_size: int = 5, priority: str = "confidence") -> List[Dict[str, Any]]: # 1. 创建2阶表实例 current_patterns = self._create_table_instances_2() # 2. 迭代生成k阶模式 (k=2到max_pattern_size) k = 2 while k <= max_pattern_size and current_patterns: # 2.1 过滤满足参与率阈值的模式 valid_patterns = {} for pattern_str, instances in current_patterns.items(): participation = self._calculate_participation_index(...) if participation >= min_participation: valid_patterns[pattern_str] = instances # 2.2 生成下一阶模式 if k < max_pattern_size: current_patterns = self._generate_k_order_patterns(valid_patterns) k += 1 # 3. 根据优先级排序 all_patterns.sort(key=lambda x: x[priority], reverse=True) return all_patterns ``` **关键方法**: - `_create_table_instances_2()`: 创建2阶表实例 - `_calculate_participation_index()`: 计算参与率指数 - `_generate_k_order_patterns()`: 生成k阶模式(检查邻近关系) --- #### 2.3.5 learning/learner.py: score_patterns() **执行流程**: ```python def score_patterns(self, patterns: List[Dict[str, Any]], user_id: Optional[str] = None, interaction_round: int = 0) -> Optional[List[float]]: # 1. 尝试使用融合向量(Stage0) if user_id: fused_vector = self.get_user_vector(user_id, interaction_round) if fused_vector is not None: return self._score_with_vector(patterns, fused_vector) # 2. 回退到传统方法(正反馈 - 负反馈) positive_vec, negative_vec = self.build_user_vectors() # 编码模式 p_vecs = self.embedder.encode_patterns(pattern_lists) # 计算相似度分数 scores = similarity_to_positive - similarity_to_negative return scores ``` **关键方法**: - `get_user_vector()`: 获取融合用户向量(u_t = α_t * u_llm + (1-α_t) * u_feedback) - `_score_with_vector()`: 使用单个向量计算相似度 - `build_user_vectors()`: 从历史反馈构建正负向量 --- ## 3. Stage4 迭代交互流程 ### 3.1 启动迭代会话: POST /api/iteration/start **调用链**: ``` web/router.py: start_iteration() ↓ web/services.py: WebService.start_iteration() ↓ controller/manager.py: PipelineManager.start_iteration() ↓ controller/iteration_manager.py: IterationManager.init_user_vector() ↓ [Stage0: 意图理解] ├─ llm/intent_encoder.py: IntentEncoder.parse() └─ llm/intent_mapper.py: IntentMapper.to_vector() ↓ controller/iteration_manager.py: IterationManager.run_one_iteration_step() ↓ controller/iteration_manager.py: IterationManager.run_one_round() ↓ core/miner.py: CoLocationMiner.mine_patterns() ↓ learning/learner.py: PreferenceLearner._score_with_vector() ↓ 返回第一轮结果 ``` --- ### 3.2 处理反馈并继续迭代: POST /api/iteration/next **调用链**: ``` web/router.py: next_iteration() ↓ web/services.py: WebService.next_iteration() ↓ controller/manager.py: PipelineManager.next_iteration() ↓ controller/iteration_manager.py: IterationManager.update_and_train() ↓ ├─ controller/iteration_manager.py: IterationManager.update_user_vector() │ ├─ memory/store.py: MemoryStore.add_positive() │ └─ memory/store.py: MemoryStore.add_negative() │ ↓ │ learning/learner.py: PreferenceLearner.compute_feedback_vector() │ ↓ │ 融合向量: u_t = α * u_llm + (1-α) * u_feedback │ └─ learning/trainer.py: PreferenceTrainer.train() (可选) ↓ controller/iteration_manager.py: IterationManager.run_one_iteration_step() ↓ 返回下一轮结果 ``` --- ## 4. 关键组件说明 ### 4.1 数据流 ``` 原始数据 (beijing_poi.json) ↓ CoLocationMiner (加载数据) ↓ 2阶表实例 (457,907个实例) ↓ k阶模式挖掘 (k=2到5) ↓ 模式评分 (使用用户向量) ↓ 排序和返回 ``` ### 4.2 用户向量构建流程 ``` 用户查询 ↓ [Stage0] LLM意图理解 ↓ 提取偏好模式 ↓ 编码为向量 ↓ 生成初始向量 u_llm ↓ [用户反馈] ↓ 编码反馈模式 ↓ 构建反馈向量 u_feedback ↓ 融合: u_t = α_t * u_llm + (1-α_t) * u_feedback ↓ 用于模式评分 ``` ### 4.3 模式挖掘流程 ``` 2阶模式挖掘 ↓ 计算参与率指数 ↓ 过滤 (participation >= threshold) ↓ 生成3阶模式 ├─ 检查前k-1个特征相同 ├─ 检查前k-1个实例相同 └─ 检查最后两个实例邻近 ↓ 重复直到max_pattern_size ↓ 排序和返回 ``` --- ## 5. 错误处理 ### 5.1 异常捕获层次 1. **路由层** (`web/router.py`): 捕获所有异常,返回HTTP 500错误 2. **服务层** (`web/services.py`): 记录错误日志,重新抛出异常 3. **管理器层** (`controller/manager.py`): 业务逻辑错误处理 4. **组件层**: 各组件内部错误处理 ### 5.2 日志记录 所有关键步骤都通过 `logging` 模块记录日志,日志级别: - `INFO`: 正常流程信息 - `WARNING`: 警告信息(如组件未启用) - `ERROR`: 错误信息(包含堆栈跟踪) --- ## 6. 性能优化点 ### 6.1 缓存机制 - **实例映射**: `CoLocationMiner._instance_map` - 预构建实例字符串到对象的映射 - **2阶表实例**: 在 `mine_patterns()` 开始时创建,后续复用 ### 6.2 GPU加速 - **模式编码**: `PatternEmbedder` 使用 sentence-transformers,支持GPU - **相似度计算**: `PreferenceLearner._score_with_vector()` 使用PyTorch GPU加速 ### 6.3 批量处理 - **模式编码**: `encode_patterns()` 批量编码多个模式 - **相似度计算**: 向量化计算所有模式的相似度 --- ## 7. 总结 ### 7.1 启动流程 ``` main.py: uvicorn.run() ↓ web/app.py: FastAPI应用创建 ↓ web/app.py: startup_event() ↓ controller/manager.py: PipelineManager初始化 ↓ web/services.py: WebService初始化 ↓ web/router.py: 路由注册 ↓ 服务就绪,等待请求 ``` ### 7.2 请求处理流程 ``` HTTP请求 ↓ FastAPI路由匹配 ↓ web/router.py: 端点函数 ↓ web/services.py: 服务层方法 ↓ controller/manager.py: 管理器方法 ↓ [各组件处理] ↓ 结果返回 ↓ HTTP响应 ``` ### 7.3 关键设计模式 1. **分层架构**: 路由层 → 服务层 → 管理器层 → 组件层 2. **依赖注入**: 通过构造函数注入依赖 3. **事件驱动**: FastAPI的startup事件 4. **策略模式**: 根据配置选择不同的处理策略(Stage0/3/4) --- ## 附录:文件结构 ``` colocation_mvp/ ├── main.py # 入口点 ├── web/ │ ├── app.py # FastAPI应用 │ ├── router.py # API路由定义 │ ├── services.py # 服务层 │ └── schemas.py # 数据模型 ├── controller/ │ ├── manager.py # 管道管理器 │ └── iteration_manager.py # 迭代管理器 (Stage4) ├── core/ │ └── miner.py # 共现模式挖掘 ├── learning/ │ ├── learner.py # 偏好学习器 │ ├── embedder.py # 模式嵌入器 │ └── trainer.py # 对比学习训练器 ├── llm/ │ ├── client.py # LLM客户端 │ ├── intent_encoder.py # 意图编码器 (Stage0) │ └── intent_mapper.py # 意图映射器 (Stage0) ├── memory/ │ └── store.py # 记忆存储 └── config/ └── config.yaml # 配置文件 ```