Source code for UM2N.model.extractor
# Author: Chunyang Wang
# GitHub Username: acse-cw1722
import torch
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing
from transformer_model import TransformerModel
__all__ = ["LocalFeatExtractor", "GlobalFeatExtractor"]
[docs]
class LocalFeatExtractor(MessagePassing):
"""
Custom PyTorch geometric layer that performs feature extraction
on local graph structure.
The class extends the torch_geometric.nn.MessagePassing class
and employs additive aggregation as the message-passing scheme.
Attributes:
lin_1 (torch.nn.Linear): First linear layer.
lin_2 (torch.nn.Linear): Second linear layer.
lin_3 (torch.nn.Linear): Third linear layer.
activate (torch.nn.SELU): Activation function.
"""
def __init__(self, num_feat=10, out=16):
"""
Initialize the layer.
Args:
num_feat (int): Number of input features per node.
out (int): Number of output features per node.
"""
super().__init__(aggr="add")
# 1*distance + 2*feat + 2*coord
num_in_feat = 1 + (num_feat - 2) * 2 + 2
self.lin_1 = torch.nn.Linear(num_in_feat, 64)
self.lin_2 = torch.nn.Linear(64, 64)
# minus 3 because dist, corrd is added back
self.lin_3 = torch.nn.Linear(64, out - 1)
self.activate = torch.nn.SELU()
[docs]
def forward(self, input, edge_index):
"""
Forward pass.
Args:
input (Tensor): Node features.
edge_index (Tensor): Edge indices.
Returns:
Tensor: Updated node features.
"""
local_feat = self.propagate(edge_index, x=input)
return local_feat
[docs]
def message(self, x_i, x_j):
coord_idx = 2
x_i_coord = x_i[:, :coord_idx]
x_j_coord = x_j[:, :coord_idx]
x_i_feat = x_i[:, coord_idx:]
x_j_feat = x_j[:, coord_idx:]
x_coord_diff = x_j_coord - x_i_coord
x_coord_dist = torch.norm(x_coord_diff, dim=1, keepdim=True)
x_edge_feat = torch.cat(
[x_coord_diff, x_coord_dist, x_i_feat, x_j_feat], dim=1
) # [num_node, feat_dim]
# print("x_i x_j ", x_i.shape, x_j.shape, "x_edge_feat ", x_edge_feat.shape)
x_edge_feat = self.lin_1(x_edge_feat)
x_edge_feat = self.activate(x_edge_feat)
x_edge_feat = self.lin_2(x_edge_feat)
x_edge_feat = self.activate(x_edge_feat)
x_edge_feat = self.lin_3(x_edge_feat)
x_edge_feat = self.activate(x_edge_feat)
x_edge_feat = torch.cat([x_edge_feat, x_coord_dist], dim=1)
return x_edge_feat
[docs]
class GlobalFeatExtractor(torch.nn.Module):
"""
Custom PyTorch layer for global feature extraction.
The class employs multiple convolutional layers and dropout layers.
Attributes:
conv1, conv2, conv3, conv4 (torch.nn.Conv2d): Convolutional layers.
dropout (torch.nn.Dropout): Dropout layer.
final_pool (torch.nn.AdaptiveAvgPool2d): Final pooling layer.
"""
def __init__(self, in_c, out_c, drop_p=0.2, use_drop=True):
super().__init__()
"""
Initialize the layer.
Args:
in_c (int): Number of input channels.
out_c (int): Number of output channels.
drop_p (float, optional): Dropout probability.
use_drop: (bool, optional): Use dropout layer or not,
When it is set to `False`, this building block is exactly
the block used in the original M2N model with out any
change. Then set to `True`, it is used for MRN model.
"""
self.in_c = in_c
self.out_c = out_c
self.conv1 = torch.nn.Conv2d(in_c, 32, 3, padding=1, stride=1)
self.conv2 = torch.nn.Conv2d(32, 64, 5, padding=2, stride=1)
self.conv3 = torch.nn.Conv2d(64, 32, 3, padding=2, stride=1)
self.conv4 = torch.nn.Conv2d(32, out_c, 3, padding=2, stride=1)
self.use_drop = use_drop
self.dropout = torch.nn.Dropout(drop_p) if use_drop else None
self.final_pool = torch.nn.AdaptiveAvgPool2d(1)
[docs]
def forward(self, data):
"""
Forward pass.
Args:
data (Tensor): Input data.
Returns:
Tensor: Extracted global features.
"""
x = self.conv1(data)
x = F.selu(x)
x = self.dropout(x) if self.use_drop else x
x = self.conv2(x)
x = F.selu(x)
x = self.dropout(x) if self.use_drop else x
x = self.conv3(x)
x = F.selu(x)
x = self.dropout(x) if self.use_drop else x
x = self.conv4(x)
# print(f"before selu {x.shape}")
x = F.selu(x)
x = self.dropout(x) if self.use_drop else x
x = self.final_pool(x)
# print(f"after final pool {x.shape}")
x = x.reshape(-1, self.out_c)
return x
class TransformerEncoder(torch.nn.Module):
"""
Mesh Refinement Network (MRN) implementing transformer as feature
extrator and recurrent graph-based deformations.
Attributes:
num_loop (int): Number of loops for the recurrent layer.
gfe_out_c (int): Output channels for global feature extractor.
lfe_out_c (int): Output channels for local feature extractor.
hidden_size (int): Size of the hidden layer.
gfe (GlobalFeatExtractor): Global feature extractor.
lfe (LocalFeatExtractor): Local feature extractor.
lin (nn.Linear): Linear layer for feature transformation.
deformer (RecurrentGATConv): GAT-based deformer block.
"""
def __init__(
self,
num_transformer_in=3,
num_transformer_out=16,
num_transformer_embed_dim=64,
num_transformer_heads=4,
num_transformer_layers=1,
transformer_training_mask=False,
transformer_key_padding_training_mask=False,
transformer_attention_training_mask=False,
transformer_training_mask_ratio_lower_bound=0.5,
transformer_training_mask_ratio_upper_bound=0.9,
deform_in_c=7,
deform_out_type="coord",
num_loop=3,
device="cuda",
):
"""
Initialize MRN.
Args:
gfe_in_c (int): Input channels for the global feature extractor.
lfe_in_c (int): Input channels for the local feature extractor.
deform_in_c (int): Input channels for the deformer block.
num_loop (int): Number of loops for the recurrent layer.
"""
super().__init__()
self.device = device
# self.num_loop = num_loop
# self.hidden_size = 512 # set here
self.mask_in_trainig = transformer_training_mask
self.key_padding_mask_in_training = transformer_key_padding_training_mask
self.attention_mask_in_training = transformer_attention_training_mask
self.mask_ratio_ub = transformer_training_mask_ratio_upper_bound
self.mask_ratio_lb = transformer_training_mask_ratio_lower_bound
assert (
self.mask_ratio_ub >= self.mask_ratio_lb
), "Training mask ratio upper bound smaller than lower bound."
self.num_transformer_in = num_transformer_in
self.num_transformer_out = num_transformer_out
self.num_transformer_embed_dim = num_transformer_embed_dim
self.num_heads = num_transformer_heads
self.num_layers = num_transformer_layers
self.transformer_encoder = TransformerModel(
input_dim=self.num_transformer_in,
embed_dim=self.num_transformer_embed_dim,
output_dim=self.num_transformer_out,
num_heads=self.num_heads,
num_layers=self.num_layers,
)
# self.all_feat_c = (deform_in_c - 2) + self.num_transformer_out
# use a linear layer to transform the input feature to hidden
# state size
# self.lin = nn.Linear(self.all_feat_c, self.hidden_size)
def _transformer_forward(self, batch_size, input_q, input_kv, get_attens=False):
"""
Forward pass for MRN.
Args:
data (Data): Input data object containing mesh and feature info.
Returns:
coord (Tensor): Deformed coordinates.
"""
# mesh_feat: [num_nodes * batch_size, 4]
# mesh_feat [coord_x, coord_y, u, hessian_norm]
transformer_input_q = input_q.view(batch_size, -1, input_q.shape[-1])
transformer_input_kv = input_kv.view(batch_size, -1, input_kv.shape[-1])
node_num = transformer_input_q.shape[1]
# print("transformer input shape ", transformer_input_q.shape, transformer_input_kv.shape)
key_padding_mask = None
attention_mask = None
if self.train and self.mask_in_trainig:
mask_ratio = (self.mask_ratio_ub - self.mask_ratio_lb) * torch.rand(
1
) + self.mask_ratio_lb
masked_num = int(node_num * mask_ratio)
mask = torch.randperm(node_num)[:masked_num]
if self.key_padding_mask_in_training:
# Key padding mask
key_padding_mask = torch.zeros(
[batch_size, node_num], dtype=torch.bool
).to(self.device)
key_padding_mask[:, mask] = True
# print(key_padding_mask.shape, key_padding_mask)
# print("Now is training")
elif self.attention_mask_in_training:
# Attention mask
attention_mask = torch.zeros(
[batch_size * self.num_heads, node_num, node_num], dtype=torch.bool
).to(self.device)
attention_mask[:, mask, mask] = True
features = self.transformer_encoder(
transformer_input_q,
transformer_input_kv,
transformer_input_kv,
key_padding_mask=key_padding_mask,
attention_mask=attention_mask,
)
# features = features.view(-1, self.num_transformer_out)
# features = torch.cat([boundary, features], dim=1)
# print(f"transformer raw features: {features.shape}")
features = F.selu(features)
if not get_attens:
return features
else:
# TODO: adapt q k v
atten_scores = self.transformer_encoder.get_attention_scores(
x=transformer_input_q, key_padding_mask=key_padding_mask
)
return features, atten_scores
def forward(self, data):
# batch_size = data.conv_feat.shape[0]
batch_size = data.shape[0]
feat_dim = data.shape[-1]
# input_q, input_kv, boundary
input_q = data.view(-1, feat_dim)
input_kv = data.view(-1, feat_dim)
# [coord_ori_x, coord_ori_y, u, hessian_norm]
# intput_features = torch.cat([coord_ori, data.mesh_feat[:, 2:4]], dim=-1)
# print(f"input q shape: {input_q.shape} input kv shape: {input_kv.shape}")
hidden = self._transformer_forward(batch_size, input_q, input_kv)
# print(f"global feat before reshape: {hidden.shape}")
feat_dim = hidden.shape[-1]
return hidden.view(-1, feat_dim)