Source code for gammagl.layers.conv.hardgat_conv

import tensorlayerx as tlx
from gammagl.layers.conv import MessagePassing
from gammagl.utils import segment_softmax
import math
import numpy as np


[docs] class HardGATConv(MessagePassing): r"""The graph hard attentional operator from the `"Graph Representation Learning via Hard and Channel-Wise Attention Networks" <https://dl.acm.org/doi/pdf/10.1145/3292500.3330897>`_ paper .. math:: \begin{aligned} &y=\frac{\left|X^T p\right|}{\|p\|}\\ &\text { for } i=1,2, \cdots, N \text { do }\\ &\quad\quad id x_i=\text { Ranking }_k\left(A_{: i} \circ y\right) \quad \in \mathbb{R}^k\\ &\quad\quad\hat{X}_i=X\left(:, i d x_i\right) \quad \in \mathbb{R}^{d \times k}\\ &\quad\quad\tilde{y}_i=\operatorname{sigmoid}\left(y\left(i d x_i\right)\right) \quad \in \mathbb{R}^k\\ &\quad\quad\tilde{X}_i=\hat{X}_i \operatorname{diag}\left(\tilde{y}_i\right) \quad \in \mathbb{R}^{d \times k}\\ &\quad\quad z_i=\operatorname{attn}\left(x_i, \tilde{X}_i, \tilde{X}_i\right) \quad \in \mathbb{R}^d\\ &Z=\left[z_1, z_2, \ldots, z_N\right]\in \mathbb{R}^{d \times N} \end{aligned} where the attn operation is the same as GAT, and the process is as follows. .. math:: \mathbf{x}^{\prime}_i = \alpha_{i,i}\mathbf{\Theta}\mathbf{x}_{i} + \sum_{j \in \mathcal{N}(i)} \alpha_{i,j}\mathbf{\Theta}\mathbf{x}_{j}, where the attention coefficients :math:`\alpha_{i,j}` are computed as .. math:: \alpha_{i,j} = \frac{ \exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top} [\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_j] \right)\right)} {\sum_{k \in \mathcal{N}(i) \cup \{ i \}} \exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top} [\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_k] \right)\right)}. Parameters ---------- in_channels: int, tuple Size of each input sample, or :obj:`-1` to derive the size from the first input(s) to the forward method. A tuple corresponds to the sizes of source and target dimensionalities. out_channels: int Size of each output sample. heads: int, optional Number of multi-head-attentions. (default: :obj:`1`) k: int, optional Number of neighbors to attention (default: :obj:`8`) concat: bool, optional If set to :obj:`False`, the multi-head attentions are averaged instead of concatenated. (default: :obj:`True`) negative_slope: float, optional LeakyReLU angle of the negative slope. (default: :obj:`0.2`) dropout_rate: float, optional Dropout probability of the normalized attention coefficients which exposes each node to a stochastically sampled neighborhood during training. (default: :obj:`0`) add_self_loops: bool, optional If set to :obj:`False`, will not add self-loops to the input graph. (default: :obj:`True`) add_bias: bool, optional If set to :obj:`False`, the layer will not learn an additive bias. (default: :obj:`True`) """ def __init__(self, in_channels, out_channels, k=8, heads=1, concat=True, negative_slope=0.2, dropout_rate=0., add_bias=True): super().__init__() self.in_channels = in_channels self.out_channels = out_channels self.heads = heads self.concat = concat self.negetive_slop = negative_slope self.dropout_rate = dropout_rate self.k = k # self.add_self_loops = add_self_loops self.add_bias = add_bias self.linear = tlx.layers.Linear(out_features=self.out_channels * self.heads, in_features=self.in_channels, b_init=None) initor = tlx.initializers.XavierNormal(gain=math.sqrt(2.0)) self.p = self._get_weights("proj", shape=(1, self.in_channels), init=initor, order=True) initor = tlx.initializers.TruncatedNormal() self.att_src = self._get_weights("att_src", shape=(1, self.heads, self.out_channels), init=initor, order=True) self.att_dst = self._get_weights("att_dst", shape=(1, self.heads, self.out_channels), init=initor, order=True) self.leaky_relu = tlx.layers.LeakyReLU(negative_slope) self.dropout = tlx.layers.Dropout(self.dropout_rate) if self.add_bias and concat: self.bias = self._get_weights("bias", shape=(self.heads * self.out_channels,), init=initor) elif self.add_bias and not concat: self.bias = self._get_weights("bias", shape=(self.out_channels,), init=initor)
[docs] def select_topk(self, edge_index, value): # Select Top k neighbors, return new edge_index src, dst = tlx.convert_to_numpy(edge_index) score = tlx.convert_to_numpy(value) score = score[src] # Sort by value in descending rank = np.argsort(score)[::-1] src = src[rank] dst = dst[rank] # Sort by dst in ascending index = np.argsort(dst) src = src[index] dst = dst[index] # Each dst-node choose the-top-k edges e_id = [] rowptr = np.concatenate(([0], np.bincount(dst).cumsum())) for dst_node in np.unique(dst): st = rowptr[dst_node] len1 = rowptr[dst_node + 1] - st for i in range(st, min(st + self.k, st + len1)): e_id.append(i) return tlx.convert_to_tensor(np.array([src[e_id], dst[e_id]]))
[docs] def message(self, x, edge_index, edge_weight=None, num_nodes=None): node_src = edge_index[0, :] node_dst = edge_index[1, :] weight_src = tlx.gather(tlx.reduce_sum(x * self.att_src, -1), node_src) weight_dst = tlx.gather(tlx.reduce_sum(x * self.att_dst, -1), node_dst) weight = self.leaky_relu(weight_src + weight_dst) alpha = self.dropout(segment_softmax(weight, node_dst, num_nodes)) x = tlx.gather(x, node_src) * tlx.expand_dims(alpha, -1) return x * edge_weight if edge_weight else x
[docs] def forward(self, x, edge_index, num_nodes): # projection process to get importance vector y y = tlx.abs(tlx.squeeze(tlx.matmul(self.p, tlx.transpose(x)), axis=0)) / math.sqrt(tlx.ops.reduce_sum(self.p ** 2)) edge_index = self.select_topk(edge_index, y) # Sigmoid as information threshold y = tlx.sigmoid(y) # Using vector matrix elementwise mul for acceleration x = tlx.reshape(y, shape=(-1, 1)) * x # attn operation same as 'gat_conv.py' x = tlx.reshape(self.linear(x), shape=(-1, self.heads, self.out_channels)) x = self.propagate(x, edge_index, num_nodes=num_nodes) if self.concat: x = tlx.reshape(x, (-1, self.heads * self.out_channels)) else: x = tlx.reduce_mean(x, axis=1) if self.add_bias: x += self.bias return x