| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980 |
- from __future__ import annotations
- from collections.abc import Sequence
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import ptwt
- from .layers_2d import Conv2dBN
- from .lib_mamba.vmamba import SS2D as VMambaSS2D
- """
- ## 完成的修改
- ### 1. 小波变换模块迁移至 ptwt
- - **替换 `XHaarWaveletTransform2d` → `XWaveletTransform2d`**:使用 `ptwt.wavedec2` / `ptwt.waverec2` 实现可逆小波变换
- - **优势**:
- - 支持任意 pywt 兼容小波(haar, db4, sym2, db6 等),通过 `wavelet_type` 参数切换
- - 自动处理边界对齐,无需手动 padding/cropping
- - 代码更简洁,无手工卷积滤波器
- - **`XWaveletBranch2d`** 已更新引用新类,移除了 wavelet 类型限制检查
- ### 2. XFrequencyRefine2d 频率域精炼模块分析
- **发现的问题与修复:**
- - **原代码 FFT 低频掩码位置错误**:未使用 `fftshift`,直接在左上角做十字掩码,与真实低频位置(四角)不匹配
- - **已修复**:使用 `fftshift` → 圆形低频掩码 → `ifftshift` 还原的正确流程
- **设计合理性评估:**
- | 方面 | 评价 |
- |------|------|
- | 低频/高频分离 | ✅ 圆形掩码合理,可调节半径 |
- | 门控机制 | ⚠️ 门控值来自空间域而非频域,可能损失频域选择性 |
- | 通道注意力 | ✅ 每个通道独立门控,灵活 |
- | 重建精度 | ✅ 正交归一化 FFT + 完整频域保留 |
- | 计算开销 | ⚠️ meshgrid 每步计算,可缓存优化 |
- **改进建议:**
- 1. 门控可改为频域计算(对 `|fft|` 做平均池化)而非空间域
- 2. 低频半径可改为可学习参数
- 3. meshgrid 可缓存为 buffer 避免重复计算
- ### 验证结果
- 所有模块测试通过,小波分解→重建误差 < 1e-4,输出形状一致。
- """
- # ============================================================
- # 核心架构:XNet2D 医学图像分割网络
- # 业务意图:针对超声等医学图像分割任务,融合局部纹理、频率域、全局序列建模三重能力
- # 设计约束:
- # - 2D 张量通道优先 (N,C,H,W)
- # - 所有可逆变换需支持 inverse 恢复原始空间尺寸
- # - SSM 后端可切换:GPU→oflex,CPU→torch
- # ============================================================
- # --------------------------------------------------------------------------
- # XNetStem2d:输入茎(Stem)
- # 为什么:将单张输入图快速降采样 4 倍 (H/4, W/4),并逐步提升通道维度
- # 关键行为:
- # - 两次步幅为 2 的卷积实现 4 倍下采样
- # - 中间嵌入 depthwise 卷积增强局部通道交互
- # --------------------------------------------------------------------------
- class XNetStem2d(nn.Module):
- def __init__(self, in_channels: int, stem_channels: int, out_channels: int) -> None:
- super().__init__()
- self.block = nn.Sequential(
- Conv2dBN(in_channels, stem_channels, 3, 2, 1), # 首次下采样 H/2, W/2
- nn.ReLU(inplace=True),
- Conv2dBN(
- stem_channels, stem_channels, 3, 1, 1, groups=stem_channels
- ), # depthwise 局部特征增强
- nn.ReLU(inplace=True),
- Conv2dBN(stem_channels, out_channels, 1, 1, 0), # 通道升维
- nn.ReLU(inplace=True),
- Conv2dBN(out_channels, out_channels, 3, 2, 1), # 二次下采样 H/4, W/4
- nn.ReLU(inplace=True),
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- return self.block(x)
- # --------------------------------------------------------------------------
- # XNetDownsample2d:阶段间下采样器
- # 为什么:在编码器各阶段之间平滑过渡,降低空间分辨率同时增加通道数
- # 关键行为:
- # - 仅支持 conv 模式(扩展点由子类控制)
- # --------------------------------------------------------------------------
- class XNetDownsample2d(nn.Module):
- def __init__(self, in_channels: int, out_channels: int, mode: str = "conv") -> None:
- super().__init__()
- if mode != "conv":
- raise ValueError(f"Unsupported downsample mode: {mode}")
- self.block = nn.Sequential(
- Conv2dBN(in_channels, out_channels, 3, 2, 1), # H/2, W/2 下采样
- nn.ReLU(inplace=True),
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- return self.block(x)
- # --------------------------------------------------------------------------
- # XLocalBranch2d:局部感受野分支
- # 为什么:并行捕获 3×3 和 5×5 多尺度局部纹理,对医学图像边缘/细微结构敏感
- # 关键行为:
- # - 两组 depthwise 卷积 + 1×1 通道压缩
- # - 输出直接相加(残差式局部特征累积)
- # --------------------------------------------------------------------------
- class XLocalBranch2d(nn.Module):
- def __init__(self, channels: int) -> None:
- super().__init__()
- self.branch3 = nn.Sequential(
- Conv2dBN(
- channels, channels, 3, 1, 1, groups=channels
- ), # 3×3 depthwise 局部感受野
- nn.ReLU(inplace=True),
- Conv2dBN(channels, channels, 1, 1, 0), # 1×1 通道重映射
- )
- self.branch5 = nn.Sequential(
- Conv2dBN(
- channels, channels, 5, 1, 2, groups=channels
- ), # 5×5 depthwise 更大感受野
- nn.ReLU(inplace=True),
- Conv2dBN(channels, channels, 1, 1, 0),
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- return self.branch3(x) + self.branch5(x) # 多尺度局部特征融合
- # --------------------------------------------------------------------------
- # XWaveletTransform2d:基于 ptwt 的 2D 小波变换
- # 为什么:将特征分解为低频近似 (LL) 与高频细节 (LH, HL, HH),便于频率域操作
- # 关键行为:
- # - 使用 ptwt.wavedec2 / ptwt.waverec2 实现可逆小波分解与重建
- # - 支持任意 pywt 兼容小波(haar, db4, sym2 等)
- # - 输出格式:(ll_coeff, (lh_coeff, hl_coeff, hh_coeff))
- # --------------------------------------------------------------------------
- class XWaveletTransform2d(nn.Module):
- def __init__(self, wavelet: str = "haar", level: int = 1) -> None:
- super().__init__()
- self.wavelet = wavelet
- self.level = level
- def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
- """
- 分解输入张量。
- Returns:
- ll: 低频近似系数 [B, C, H', W']
- high: 高频细节张量,拼接 LH/HL/HH 为 [B, C*3, H', W']
- """
- coeffs = ptwt.wavedec2(x, self.wavelet, level=self.level)
- ll = coeffs[0] # 低频近似
- detail_tuple = coeffs[1] # (lh, hl, hh) 元组
- high = torch.cat([detail_tuple[0], detail_tuple[1], detail_tuple[2]], dim=1)
- return ll, high
- def inverse(
- self, ll: torch.Tensor, high: torch.Tensor, output_size: tuple[int, int]
- ) -> torch.Tensor:
- """
- 从低频和高频系数重建原始张量。
- Args:
- ll: 低频近似系数
- high: 高频细节张量 [B, C*3, H', W']
- output_size: 目标输出尺寸 (H, W)
- """
- lh = high[:, 0 : high.shape[1] // 3]
- hl = high[:, high.shape[1] // 3 : 2 * high.shape[1] // 3]
- hh = high[:, 2 * high.shape[1] // 3 :]
- coeffs = [ll, (lh, hl, hh)]
- # ptwt.waverec2 自动处理边界对齐,无需手动裁剪
- return ptwt.waverec2(coeffs, self.wavelet)
- # --------------------------------------------------------------------------
- # XWaveletBranch2d:小波分支
- # 为什么:对小波分解后的低频和高频分别做特征学习,再重建回空间域
- # 关键行为:
- # - 当前仅支持 Haar 小波和 level=1(设计约束)
- # - 高频通道数 = channels * 3,需单独投影
- # --------------------------------------------------------------------------
- class XWaveletBranch2d(nn.Module):
- def __init__(
- self, channels: int, wavelet_type: str = "haar", wavelet_level: int = 1
- ) -> None:
- super().__init__()
- self.wavelet = XWaveletTransform2d(wavelet=wavelet_type, level=wavelet_level)
- # 低频通道投影
- self.ll_proj = nn.Sequential(
- Conv2dBN(channels, channels, 3, 1, 1),
- nn.ReLU(inplace=True),
- )
- # 高频通道投影(depthwise 处理多高频分量)
- self.high_proj = nn.Sequential(
- Conv2dBN(channels * 3, channels * 3, 3, 1, 1, groups=channels * 3),
- nn.ReLU(inplace=True),
- Conv2dBN(channels * 3, channels * 3, 1, 1, 0),
- )
- # 重建后输出投影
- self.out_proj = nn.Sequential(
- Conv2dBN(channels, channels, 1, 1, 0),
- nn.ReLU(inplace=True),
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- output_size = x.shape[-2:]
- ll, high = self.wavelet(x) # 分解
- ll = self.ll_proj(ll)
- high = self.high_proj(high)
- x = self.wavelet.inverse(ll, high, output_size=output_size) # 重建
- return self.out_proj(x)
- # --------------------------------------------------------------------------
- # XSSMGlobalBranch2d:SSM 全局分支(核心:VMamba SS2D)
- # 为什么:用 State Space Model 捕获长程依赖,弥补卷积局部感受野不足
- # 关键行为:
- # - 自动选择后端:CUDA→oflex(快速),否则→torch(兼容)
- # - 通过 monkey-patch forward_core 动态切换 scan 策略
- # - 用完后恢复原始 forward_core 避免状态污染
- # --------------------------------------------------------------------------
- class XSSMGlobalBranch2d(nn.Module):
- def __init__(
- self,
- channels: int,
- global_ratio: float = 2.0,
- d_state: int = 16,
- forward_type: str = "v3",
- ssm_backend: str = "auto",
- ) -> None:
- super().__init__()
- hidden_ratio = max(global_ratio, 1.0) # SSM 隐层缩放比例
- self.backend = ssm_backend
- self.pre = nn.Sequential(
- Conv2dBN(channels, channels, 1, 1, 0), # 预投影归一化
- nn.ReLU(inplace=True),
- )
- self.ssm = VMambaSS2D(
- d_model=channels,
- d_state=d_state,
- ssm_ratio=hidden_ratio,
- d_conv=3,
- dropout=0.0,
- initialize="v0",
- forward_type=forward_type,
- channel_first=True,
- )
- self.post = nn.Sequential(
- Conv2dBN(channels, channels, 1, 1, 0), # 后投影归一化
- nn.ReLU(inplace=True),
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- x = self.pre(x)
- prev_backend = None
- backend = self.backend.lower()
- if backend == "auto":
- backend = "oflex" if x.is_cuda else "torch"
- # 动态切换 SSM 后端(避免修改全局配置)
- if backend == "oflex" and hasattr(self.ssm, "forward_core"):
- prev_backend = self.ssm.forward_core
- self.ssm.forward_core = lambda z, _core=prev_backend: _core(
- z,
- selective_scan_backend="oflex",
- scan_force_torch=False,
- )
- elif backend == "torch" and hasattr(self.ssm, "forward_core"):
- prev_backend = self.ssm.forward_core
- self.ssm.forward_core = lambda z, _core=prev_backend: _core(
- z,
- selective_scan_backend="torch",
- scan_force_torch=True,
- )
- try:
- x = self.ssm(x) # SSM 全局建模
- finally:
- if prev_backend is not None:
- self.ssm.forward_core = prev_backend # 恢复原始后端
- return self.post(x)
- # --------------------------------------------------------------------------
- # XGlobalBranch2d:全局分支包装器
- # 为什么:提供统一接口,将 SSM 分支暴露为可开关的模块
- # --------------------------------------------------------------------------
- class XGlobalBranch2d(nn.Module):
- def __init__(
- self,
- channels: int,
- global_ratio: float = 2.0,
- ssm_d_state: int = 16,
- ssm_forward_type: str = "v3",
- ssm_backend: str = "auto",
- ) -> None:
- super().__init__()
- self.ssm_branch = XSSMGlobalBranch2d(
- channels=channels,
- global_ratio=global_ratio,
- d_state=ssm_d_state,
- forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- return self.ssm_branch(x)
- # --------------------------------------------------------------------------
- # XBranchFusion2d:多分支特征融合
- # 为什么:将局部/小波/全局三个分支的输出自适应加权融合
- # 关键行为:
- # - 通道拼接 → 1×1 压缩 → 通道注意力门控(Channel Attention Gate)
- # - 门控值经 Sigmoid 后与融合特征逐元素相乘
- # --------------------------------------------------------------------------
- class XBranchFusion2d(nn.Module):
- def __init__(self, channels: int, num_branches: int = 3) -> None:
- super().__init__()
- fused_channels = channels * num_branches
- hidden_channels = max(channels // 4, 8) # 门控网络隐藏维度
- self.fuse = nn.Sequential(
- Conv2dBN(fused_channels, channels, 1, 1, 0), # 通道降维融合
- nn.ReLU(inplace=True),
- )
- # 通道注意力门控
- self.gate = nn.Sequential(
- nn.AdaptiveAvgPool2d(1), # 全局平均池化 → 空间不变
- nn.Conv2d(fused_channels, hidden_channels, kernel_size=1, bias=True),
- nn.ReLU(inplace=True),
- nn.Conv2d(hidden_channels, channels, kernel_size=1, bias=True),
- nn.Sigmoid(), # 门控值 [0, 1]
- )
- def forward(self, branch_outputs: Sequence[torch.Tensor]) -> torch.Tensor:
- x_cat = torch.cat(list(branch_outputs), dim=1) # 拼接所有分支
- x_fused = self.fuse(x_cat)
- gate = self.gate(x_cat) # 计算通道门控
- return x_fused * gate # 门控加权融合
- # --------------------------------------------------------------------------
- # XTEB2d:X-Tri-Enhance-Block (2D) — 核心构建块
- # 为什么:将局部、小波、全局三个分支并行融合,并叠加 FFN 残差
- # 关键行为:
- # - pre_norm:先做 1×1 投影再输入多分支
- # - fusion:XBranchFusion2d 自适应融合三分支
- # - post + FFN:双层残差连接(post-fusion + FFN)
- # --------------------------------------------------------------------------
- class XTEB2d(nn.Module):
- def __init__(
- self,
- channels: int,
- global_ratio: float = 2.0,
- wavelet_type: str = "haar",
- wavelet_level: int = 1,
- use_wavelet_branch: bool = True,
- use_global_branch: bool = True,
- ssm_d_state: int = 16,
- ssm_forward_type: str = "v3",
- ssm_backend: str = "auto",
- ) -> None:
- super().__init__()
- self.pre_norm = Conv2dBN(channels, channels, 1, 1, 0) # 预投影
- self.local_branch = XLocalBranch2d(channels) # 局部分支(始终启用)
- # 小波分支(可开关)
- self.wavelet_branch = (
- XWaveletBranch2d(
- channels, wavelet_type=wavelet_type, wavelet_level=wavelet_level
- )
- if use_wavelet_branch
- else nn.Identity()
- )
- # 全局 SSM 分支(可开关)
- self.global_branch = (
- XGlobalBranch2d(
- channels,
- global_ratio=global_ratio,
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- if use_global_branch
- else nn.Identity()
- )
- self.fusion = XBranchFusion2d(channels, num_branches=3) # 三分支融合
- # 后处理残差块
- self.post = nn.Sequential(
- Conv2dBN(channels, channels, 3, 1, 1),
- nn.ReLU(inplace=True),
- Conv2dBN(channels, channels, 1, 1, 0, bn_weight_init=0.0), # 零初始化
- )
- # FFN 残差块
- self.ffn = nn.Sequential(
- Conv2dBN(channels, channels * 2, 1, 1, 0), # 通道扩展
- nn.ReLU(inplace=True),
- Conv2dBN(channels * 2, channels, 1, 1, 0, bn_weight_init=0.0), # 零初始化
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- x_in = x
- x = self.pre_norm(x)
- # 三分支并行 + 融合 + 残差
- x = x_in + self.post(
- self.fusion(
- [self.local_branch(x), self.wavelet_branch(x), self.global_branch(x)]
- )
- )
- # FFN 残差
- return x + self.ffn(x)
- # --------------------------------------------------------------------------
- # XNetEncoderStage2d:编码器阶段
- # 为什么:堆叠多个 XTEB2d 块作为单一编码器层级
- # --------------------------------------------------------------------------
- class XNetEncoderStage2d(nn.Module):
- def __init__(
- self,
- channels: int,
- depth: int,
- global_ratio: float = 2.0,
- wavelet_type: str = "haar",
- wavelet_level: int = 1,
- use_wavelet_branch: bool = True,
- use_global_branch: bool = True,
- ssm_d_state: int = 16,
- ssm_forward_type: str = "v3",
- ssm_backend: str = "auto",
- ) -> None:
- super().__init__()
- self.blocks = nn.Sequential(
- *[
- XTEB2d(
- channels=channels,
- global_ratio=global_ratio,
- wavelet_type=wavelet_type,
- wavelet_level=wavelet_level,
- use_wavelet_branch=use_wavelet_branch,
- use_global_branch=use_global_branch,
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- for _ in range(depth)
- ]
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- return self.blocks(x)
- # --------------------------------------------------------------------------
- # XNetEncoder2d:完整编码器
- # 为什么:Stem + 4 个阶段 + 3 个下采样 → 多尺度特征金字塔 [e1, e2, e3, e4]
- # 关键约束:
- # - 阶段数固定为 4(由构造函数校验)
- # - Stage1 默认关闭全局 SSM(浅层特征不适合长程建模)
- # - stage_channels 属性暴露各阶段输出通道数
- # --------------------------------------------------------------------------
- class XNetEncoder2d(nn.Module):
- def __init__(
- self,
- in_channels: int,
- stem_channels: int,
- encoder_channels: Sequence[int],
- encoder_depths: Sequence[int],
- global_ratio: float = 2.0,
- wavelet_type: str = "haar",
- wavelet_level: int = 1,
- use_wavelet_branch: bool = True,
- use_global_branch_stage1: bool = False,
- ssm_d_state: int = 16,
- ssm_forward_type: str = "v3",
- ssm_backend: str = "auto",
- ) -> None:
- super().__init__()
- if len(encoder_channels) != 4 or len(encoder_depths) != 4:
- raise ValueError("XNetEncoder2d expects 4 encoder stages.")
- c1, c2, c3, c4 = encoder_channels
- d1, d2, d3, d4 = encoder_depths
- self.stem = XNetStem2d(in_channels, stem_channels, c1)
- # Stage 1:浅层,可选关闭全局分支
- self.stage1 = XNetEncoderStage2d(
- c1,
- d1,
- global_ratio,
- wavelet_type,
- wavelet_level,
- use_wavelet_branch=use_wavelet_branch,
- use_global_branch=use_global_branch_stage1,
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- self.down1 = XNetDownsample2d(c1, c2)
- # Stage 2-4:始终启用全局分支
- self.stage2 = XNetEncoderStage2d(
- c2,
- d2,
- global_ratio,
- wavelet_type,
- wavelet_level,
- use_wavelet_branch,
- True,
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- self.down2 = XNetDownsample2d(c2, c3)
- self.stage3 = XNetEncoderStage2d(
- c3,
- d3,
- global_ratio,
- wavelet_type,
- wavelet_level,
- use_wavelet_branch,
- True,
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- self.down3 = XNetDownsample2d(c3, c4)
- self.stage4 = XNetEncoderStage2d(
- c4,
- d4,
- global_ratio,
- wavelet_type,
- wavelet_level,
- use_wavelet_branch,
- True,
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- self.stage_channels = list(encoder_channels) # 暴露各阶段通道数
- def forward(self, x: torch.Tensor) -> list[torch.Tensor]:
- e1 = self.stage1(self.stem(x)) # 浅层特征
- e2 = self.stage2(self.down1(e1)) # 中层特征
- e3 = self.stage3(self.down2(e2)) # 深层特征
- e4 = self.stage4(self.down3(e3)) # 最深特征
- return [e1, e2, e3, e4] # 多尺度特征金字塔
- # --------------------------------------------------------------------------
- # XGuideProjector2d:引导投影器
- # 为什么:从编码器特征生成引导信号(guide),用于解码器的自适应调制
- # 关键行为:
- # - affine 模式:输出 (gamma, beta) 用于仿射调制
- # - feature 模式:直接输出特征
- # --------------------------------------------------------------------------
- class XGuideProjector2d(nn.Module):
- def __init__(
- self, in_channels: int, out_channels: int, mode: str = "affine"
- ) -> None:
- super().__init__()
- self.mode = mode
- if mode == "affine":
- # 输出双倍通道 → 后续拆分为 gamma 和 beta
- self.proj = nn.Sequential(
- Conv2dBN(in_channels, out_channels * 2, 1, 1, 0),
- nn.ReLU(inplace=True),
- nn.Conv2d(out_channels * 2, out_channels * 2, kernel_size=1, bias=True),
- )
- elif mode == "feature":
- self.proj = nn.Sequential(
- Conv2dBN(in_channels, out_channels, 1, 1, 0),
- nn.ReLU(inplace=True),
- )
- else:
- raise ValueError(f"Unsupported guide mode: {mode}")
- def forward(
- self,
- x: torch.Tensor,
- target_size: tuple[int, int],
- ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
- # 插值到目标尺寸(guide 需要与解码器特征空间对齐)
- x = F.interpolate(x, size=target_size, mode="bilinear", align_corners=False)
- x = self.proj(x)
- if self.mode == "affine":
- gamma, beta = torch.chunk(x, 2, dim=1) # 拆分为仿射参数
- gamma = torch.sigmoid(gamma) + 0.5 # gamma 偏置到 [0.5, 1.5]
- return gamma, beta
- return x
- # --------------------------------------------------------------------------
- # XSkipFusion2d:跳跃连接融合
- # 为什么:将编码器特征与解码器特征融合后传入
- # 关键行为:
- # - 分别投影输入和跳跃特征到相同维度
- # - 拼接 + 3×3 卷积融合
- # --------------------------------------------------------------------------
- class XSkipFusion2d(nn.Module):
- def __init__(self, in_channels: int, skip_channels: int, out_channels: int) -> None:
- super().__init__()
- self.input_proj = nn.Sequential(
- Conv2dBN(in_channels, out_channels, 1, 1, 0), # 解码器特征投影
- nn.ReLU(inplace=True),
- )
- self.skip_proj = nn.Sequential(
- Conv2dBN(skip_channels, out_channels, 1, 1, 0), # 跳跃特征投影
- nn.ReLU(inplace=True),
- )
- self.fuse = nn.Sequential(
- Conv2dBN(out_channels * 2, out_channels, 3, 1, 1), # 拼接后融合
- nn.ReLU(inplace=True),
- )
- def forward(self, x: torch.Tensor, skip: torch.Tensor) -> torch.Tensor:
- # 双线性插值对齐空间尺寸
- x = F.interpolate(x, size=skip.shape[-2:], mode="bilinear", align_corners=False)
- x = self.input_proj(x)
- skip = self.skip_proj(skip)
- return self.fuse(torch.cat([x, skip], dim=1)) # 通道拼接融合
- # --------------------------------------------------------------------------
- # XGuideModulation2d:引导调制器
- # 为什么:对特征应用仿射调制 (gamma * x + beta) 或特征驱动调制
- # --------------------------------------------------------------------------
- class XGuideModulation2d(nn.Module):
- def __init__(self, channels: int, guide_mode: str = "affine") -> None:
- super().__init__()
- self.guide_mode = guide_mode
- if guide_mode == "feature":
- # feature 模式下先将 guide 转为仿射参数
- self.to_affine = nn.Conv2d(channels, channels * 2, kernel_size=1, bias=True)
- def forward(
- self,
- x: torch.Tensor,
- guide: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
- ) -> torch.Tensor:
- if self.guide_mode == "affine":
- gamma, beta = guide # 直接使用仿射参数
- else:
- gamma, beta = torch.chunk(self.to_affine(guide), 2, dim=1)
- gamma = torch.sigmoid(gamma) + 0.5
- return gamma * x + beta # 仿射调制
- # --------------------------------------------------------------------------
- # XFrequencyRefine2d:频率域精炼
- # 为什么:在频域对低频/高频分别应用门控,增强关键频率成分
- # 关键行为:
- # - FFT → 低频中心保留 + 高频带通 → 逆 FFT
- # - 门控由自适应平均池化生成
- # --------------------------------------------------------------------------
- class XFrequencyRefine2d(nn.Module):
- def __init__(self, channels: int) -> None:
- super().__init__()
- # 低频门控
- self.low_gate = nn.Sequential(
- nn.AdaptiveAvgPool2d(1),
- nn.Conv2d(channels, channels, kernel_size=1, bias=True),
- nn.Sigmoid(),
- )
- # 高频门控
- self.high_gate = nn.Sequential(
- nn.AdaptiveAvgPool2d(1),
- nn.Conv2d(channels, channels, kernel_size=1, bias=True),
- nn.Sigmoid(),
- )
- # 频域精炼后的空间域细化
- self.refine = nn.Sequential(
- Conv2dBN(
- channels, channels, 3, 1, 1, groups=channels
- ), # depthwise 局部细化
- nn.ReLU(inplace=True),
- Conv2dBN(channels, channels, 1, 1, 0),
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- input_dtype = x.dtype
- if x.dtype != torch.float32:
- x = x.to(torch.float32) # FFT 需要 float32 精度
- fft = torch.fft.rfft2(x, norm="ortho") # 实值 FFT
- h_freq, w_freq = fft.shape[-2], fft.shape[-1]
- # 构建圆形低频掩码(中心位于四个角:FFT 未 shift 时低频在四角)
- # 使用 fftshift 将低频移至中心,应用掩码后再 ifftshift 还原
- fft_shifted = torch.fft.fftshift(fft, dim=(-2, -1))
- low = fft_shifted.clone()
- # 圆形低频掩码:保留中心区域
- radius_h = h_freq // 4
- radius_w = w_freq // 4
- y_grid, x_grid = torch.meshgrid(
- torch.arange(h_freq, device=fft.device),
- torch.arange(w_freq, device=fft.device),
- indexing="ij",
- )
- center_y, center_x = h_freq // 2, w_freq // 2
- mask = (y_grid - center_y) ** 2 + (x_grid - center_x) ** 2 <= max(
- radius_h, radius_w
- ) ** 2
- mask = mask.unsqueeze(0).unsqueeze(0).expand(fft.shape[0], fft.shape[1], -1, -1)
- low = low * mask # 低频分量
- high = fft_shifted - low # 高频 = 全部 - 低频
- # 还原到原始 FFT 坐标系
- low = torch.fft.ifftshift(low, dim=(-2, -1))
- high = torch.fft.ifftshift(high, dim=(-2, -1))
- # 应用通道门控(门控值来自空间域)
- low = low * self.low_gate(x)
- high = high * self.high_gate(x)
- out = torch.fft.irfft2(low + high, s=x.shape[-2:], norm="ortho") # 逆 FFT
- out = out.to(dtype=input_dtype)
- return self.refine(out) # 空间域细化
- # --------------------------------------------------------------------------
- # XCRB2d:X-ResBlock with Guide (2D) — 解码器核心块
- # 为什么:融合跳跃连接 + 引导调制 + 频率精炼,是解码器重建的基础单元
- # 数据流:
- # 输入特征 → SkipFusion → GuideModulation → FrequencyRefine → OutRefine
- # 每步均有残差连接
- # --------------------------------------------------------------------------
- class XCRB2d(nn.Module):
- def __init__(
- self,
- in_channels: int,
- skip_channels: int,
- guide_channels: int,
- out_channels: int,
- guide_mode: str = "affine",
- use_frequency_refine: bool = True,
- ) -> None:
- super().__init__()
- self.skip_fusion = XSkipFusion2d(in_channels, skip_channels, out_channels)
- self.guide_modulation = XGuideModulation2d(out_channels, guide_mode=guide_mode)
- self.frequency_refine = (
- XFrequencyRefine2d(out_channels) if use_frequency_refine else nn.Identity()
- )
- # 输出细化(零初始化末尾以渐进学习)
- self.out_refine = nn.Sequential(
- Conv2dBN(out_channels, out_channels, 3, 1, 1),
- nn.ReLU(inplace=True),
- Conv2dBN(out_channels, out_channels, 3, 1, 1, bn_weight_init=0.0),
- )
- self.guide_channels = guide_channels
- def forward(
- self,
- x: torch.Tensor,
- skip: torch.Tensor,
- guide: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
- ) -> torch.Tensor:
- x = self.skip_fusion(x, skip) # 跳跃融合
- x = self.guide_modulation(x, guide) # 引导调制
- x = x + self.frequency_refine(x) # 频率精炼残差
- return x + self.out_refine(x) # 输出细化残差
- # --------------------------------------------------------------------------
- # XNetHeadRefine2d:特征精炼头
- # 为什么:在解码器末端做最后的特征增强
- # --------------------------------------------------------------------------
- class XNetHeadRefine2d(nn.Module):
- def __init__(self, channels: int, out_channels: int | None = None) -> None:
- super().__init__()
- if out_channels is None:
- out_channels = channels
- self.block = nn.Sequential(
- Conv2dBN(channels, out_channels, 3, 1, 1),
- nn.ReLU(inplace=True),
- Conv2dBN(out_channels, out_channels, 3, 1, 1),
- nn.ReLU(inplace=True),
- )
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- return self.block(x)
- # --------------------------------------------------------------------------
- # XNetDecoder2d:完整解码器
- # 为什么:从最深特征 e4 逐步上采样,逐层引入引导信号和跳跃连接
- # 关键数据流:
- # e4 → guide4 → dec4 → guide3 → dec3 → guide2 → dec2 → head_refine
- # 返回:输出特征、所有解码特征、所有引导信号(供损失函数使用)
- # --------------------------------------------------------------------------
- class XNetDecoder2d(nn.Module):
- def __init__(
- self,
- encoder_channels: Sequence[int],
- decoder_channels: Sequence[int] = (128, 64, 32),
- guide_mode: str = "affine",
- use_frequency_refine: bool = True,
- out_channels: int | None = None,
- ) -> None:
- super().__init__()
- if len(encoder_channels) != 4:
- raise ValueError("XNetDecoder2d expects 4 encoder stages.")
- if len(decoder_channels) != 3:
- raise ValueError("XNetDecoder2d expects 3 decoder channels.")
- c1, c2, c3, c4 = encoder_channels
- d4, d3, d2 = decoder_channels
- # 引导投影器(从编码器特征生成 guide)
- self.guide4 = XGuideProjector2d(c4, d4, mode=guide_mode)
- self.guide3 = XGuideProjector2d(c3, d3, mode=guide_mode)
- self.guide2 = XGuideProjector2d(c2, d2, mode=guide_mode)
- # 解码块(逐层降通道 + 跳跃融合)
- self.dec4 = XCRB2d(
- c4,
- c3,
- d4,
- d4,
- guide_mode=guide_mode,
- use_frequency_refine=use_frequency_refine,
- )
- self.dec3 = XCRB2d(
- d4,
- c2,
- d3,
- d3,
- guide_mode=guide_mode,
- use_frequency_refine=use_frequency_refine,
- )
- self.dec2 = XCRB2d(
- d3,
- c1,
- d2,
- d2,
- guide_mode=guide_mode,
- use_frequency_refine=use_frequency_refine,
- )
- self.head_refine = XNetHeadRefine2d(d2, out_channels or d2)
- self.out_channels = out_channels or d2
- def forward(
- self,
- features: Sequence[torch.Tensor],
- ) -> tuple[
- torch.Tensor,
- list[torch.Tensor],
- list[torch.Tensor | tuple[torch.Tensor, torch.Tensor]],
- ]:
- e1, e2, e3, e4 = features
- # 从深到浅逐层解码
- g4 = self.guide4(e4, target_size=e3.shape[-2:]) # 从 e4 生成 guide
- d4 = self.dec4(e4, e3, g4) # 解码 + 跳跃 e3
- g3 = self.guide3(e3, target_size=e2.shape[-2:])
- d3 = self.dec3(d4, e2, g3) # 解码 + 跳跃 e2
- g2 = self.guide2(e2, target_size=e1.shape[-2:])
- d2 = self.dec2(d3, e1, g2) # 解码 + 跳跃 e1
- d1 = self.head_refine(d2) # 最终精炼
- # 返回解码输出、中间特征(用于辅助损失)、引导信号
- return d1, [d4, d3, d2, d1], [g4, g3, g2]
- # --------------------------------------------------------------------------
- # XNetSegHead2d:分割头
- # 为什么:将最终特征映射为 logits 图,并上采样到原始输入尺寸
- # --------------------------------------------------------------------------
- class XNetSegHead2d(nn.Module):
- def __init__(
- self, in_channels: int, num_classes: int, upsample_scale: int = 4
- ) -> None:
- super().__init__()
- self.block = nn.Sequential(
- Conv2dBN(in_channels, in_channels, 3, 1, 1),
- nn.ReLU(inplace=True),
- nn.Conv2d(
- in_channels, num_classes, kernel_size=1, bias=True
- ), # 映射到类别数
- )
- self.upsample_scale = upsample_scale
- def forward(self, x: torch.Tensor, output_size: tuple[int, int]) -> torch.Tensor:
- x = self.block(x)
- # 双线性上采样到目标尺寸(推理时传入原始输入 H, W)
- return F.interpolate(x, size=output_size, mode="bilinear", align_corners=False)
- # ==========================================================================
- # XNet2d:完整网络(编码器 + Bottleneck + 解码器 + 分割头)
- # 架构概览:
- # 输入 → Stem → [Stage1 ↓ Stage2 ↓ Stage3 ↓ Stage4] → Bottleneck
- # → [dec4 ← dec3 ← dec2] → Head → Logits
- # 业务特点:
- # - 编码器浅层(Stage1)默认关闭 SSM 以降低计算开销
- # - 解码器逐层注入 guide 信号,实现自适应特征调制
- # - 每个解码块支持频率精炼,增强医学图像细节保留
- # ==========================================================================
- class XNet2d(nn.Module):
- def __init__(
- self,
- in_channels: int,
- num_classes: int,
- encoder_channels: Sequence[int] = (32, 64, 128, 192),
- encoder_depths: Sequence[int] = (2, 2, 2, 2),
- decoder_channels: Sequence[int] = (128, 64, 32),
- stem_channels: int = 24,
- bottleneck_depth: int = 1,
- global_ratio: float = 2.0,
- wavelet_type: str = "haar",
- wavelet_level: int = 1,
- use_wavelet_branch: bool = True,
- use_global_branch_stage1: bool = False,
- ssm_d_state: int = 16,
- ssm_forward_type: str = "v3",
- ssm_backend: str = "auto",
- use_frequency_refine: bool = True,
- guide_mode: str = "affine",
- out_channels: int | None = None,
- ) -> None:
- super().__init__()
- # 编码器:多尺度特征金字塔
- self.encoder = XNetEncoder2d(
- in_channels=in_channels,
- stem_channels=stem_channels,
- encoder_channels=encoder_channels,
- encoder_depths=encoder_depths,
- global_ratio=global_ratio,
- wavelet_type=wavelet_type,
- wavelet_level=wavelet_level,
- use_wavelet_branch=use_wavelet_branch,
- use_global_branch_stage1=use_global_branch_stage1,
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- # Bottleneck:最深特征进一步建模
- bottleneck_channels = encoder_channels[-1]
- self.bottleneck = nn.Sequential(
- *[
- XTEB2d(
- channels=bottleneck_channels,
- global_ratio=global_ratio,
- wavelet_type=wavelet_type,
- wavelet_level=wavelet_level,
- use_wavelet_branch=use_wavelet_branch,
- use_global_branch=True, # bottleneck 始终启用全局分支
- ssm_d_state=ssm_d_state,
- ssm_forward_type=ssm_forward_type,
- ssm_backend=ssm_backend,
- )
- for _ in range(bottleneck_depth)
- ]
- )
- # 解码器
- self.decoder = XNetDecoder2d(
- encoder_channels=encoder_channels,
- decoder_channels=decoder_channels,
- guide_mode=guide_mode,
- use_frequency_refine=use_frequency_refine,
- out_channels=out_channels,
- )
- # 分割头
- head_in_channels = self.decoder.out_channels
- self.segmentation_head = XNetSegHead2d(head_in_channels, num_classes)
- def forward(
- self, x: torch.Tensor
- ) -> dict[
- str, torch.Tensor | list[torch.Tensor] | list[tuple[torch.Tensor, torch.Tensor]]
- ]:
- encoder_features = self.encoder(x) # 多尺度特征 [e1, e2, e3, e4]
- encoder_features[-1] = self.bottleneck(encoder_features[-1]) # bottleneck
- decoder_out, decoder_features, guides = self.decoder(encoder_features) # 解码
- output_size = x.shape[-2:]
- logits = self.segmentation_head(
- decoder_out, output_size=output_size
- ) # 分割 logits
- # 返回字典:包含 logits、中间特征(用于辅助损失)、引导信号
- outputs: dict[
- str,
- torch.Tensor | list[torch.Tensor] | list[tuple[torch.Tensor, torch.Tensor]],
- ] = {
- "logits": logits,
- "seg_logits": logits,
- "encoder_features": encoder_features,
- "decoder_features": decoder_features,
- "guides": guides,
- }
- return outputs
|