Source code for gammagl.utils.negative_sampling

import random

import numpy as np
import tensorlayerx as tlx

from gammagl.utils import coalesce, degree, remove_self_loops
from .num_nodes import maybe_num_nodes

[docs] def negative_sampling(edge_index, num_nodes = None, num_neg_samples = None, method = 'sparse', force_undirected = False): r"""Samples random negative edges of a graph given by :attr:`edge_index`. parameters ---------- edge_index: tensor The edge indices. num_nodes: int, tuple[int, int], optional The number of nodes, *i.e.* :obj:`max_val + 1` of :attr:`edge_index`. If given as a tuple, then :obj:`edge_index` is interpreted as a bipartite graph with shape :obj:`(num_src_nodes, num_dst_nodes)`. (default: :obj:`None`) num_neg_samples: int, optional The (approximate) number of negative samples to return. If set to :obj:`None`, will try to return a negative edge for every positive edge. (default: :obj:`None`) method: str, optional The method to use for negative sampling, *i.e.*, :obj:`"sparse"` or :obj:`"dense"`. This is a memory/runtime trade-off. :obj:`"sparse"` will work on any graph of any size, while :obj:`"dense"` can perform faster true-negative checks. (default: :obj:`"sparse"`) force_undirected: bool, optional If set to :obj:`True`, sampled negative edges will be undirected. (default: :obj:`False`) """ assert method in ['sparse', 'dense'] bipartite = isinstance(num_nodes, (tuple, list)) if num_nodes is None: num_nodes = maybe_num_nodes(edge_index) if bipartite: force_undirected = False else: num_nodes = (num_nodes, num_nodes) idx, population = edge_index_to_vector(edge_index, num_nodes, bipartite, force_undirected) if tlx.convert_to_numpy(idx).size >= population: return tlx.convert_to_tensor([[],[]], dtype=edge_index.dtype) if num_neg_samples is None: num_neg_samples = edge_index.shape[1] if force_undirected: num_neg_samples = num_neg_samples // 2 prob = 1. - tlx.convert_to_numpy(idx).size / population sample_size = int(1.1 * num_neg_samples / prob) neg_idx = None if method == 'dense': if tlx.BACKEND == 'paddle': mask = tlx.ones((population,), dtype=tlx.int64) mask[idx] = 0 else: mask = tlx.ones((population,), dtype=tlx.bool) mask = tlx.scatter_update(mask, idx, tlx.zeros((idx.shape[0],), dtype=tlx.bool)) for _ in range(3): rnd = sample(population, sample_size) if tlx.BACKEND == 'paddle': indices = tlx.convert_to_tensor(tlx.convert_to_numpy(tlx.gather(mask, rnd)), dtype = tlx.bool) else: indices = tlx.gather(mask, rnd) rnd = tlx.mask_select(rnd, indices) if neg_idx is None: neg_idx = rnd else: neg_idx = tlx.concat([neg_idx, rnd], axis=0) if tlx.convert_to_numpy(neg_idx).size >= num_neg_samples: neg_idx = neg_idx[:num_neg_samples] break if tlx.BACKEND == 'paddle': mask[neg_idx] = 0 else: mask = tlx.scatter_update(mask, neg_idx, tlx.zeros((neg_idx.shape[0],), dtype=tlx.bool)) else: idx = tlx.to_device(idx, 'cpu') for _ in range(3): rnd = sample(population, sample_size, True) mask = np.isin(tlx.convert_to_numpy(rnd), tlx.convert_to_numpy(idx)) if neg_idx is not None: mask |= np.isin(tlx.convert_to_numpy(rnd), tlx.convert_to_numpy(neg_idx)) mask = tlx.convert_to_tensor(mask, dtype=tlx.bool, device='cpu') rnd = tlx.mask_select(rnd, ~mask) if neg_idx is None: neg_idx = rnd else: neg_idx = tlx.concat([neg_idx, rnd], axis=0) if tlx.convert_to_numpy(neg_idx).size >= num_neg_samples: neg_idx = neg_idx[:num_neg_samples] break neg_idx = tlx.convert_to_tensor(tlx.convert_to_numpy(neg_idx), dtype = tlx.int64) return vector_to_edge_index(neg_idx, num_nodes, bipartite, force_undirected)
def sample(population, k, cpu=False): if population <= k: if cpu: return tlx.convert_to_tensor(np.arange(population), device='cpu') else: return tlx.convert_to_tensor(np.arange(population)) else: if cpu: return tlx.convert_to_tensor(random.sample(range(population), k), device='cpu') else: return tlx.convert_to_tensor(random.sample(range(population), k)) def edge_index_to_vector(edge_index, size, bipartite, force_undirected): row, col = edge_index if bipartite: return row * size[1] + col, size[0] * size[1] elif force_undirected: assert size[0] == size[1] num_nodes = size[0] mask = row < col row, col = tlx.mask_select(row, mask), tlx.mask_select(col, mask) offset = tlx.gather(tlx.cumsum(tlx.arange(1, num_nodes)), row) idx = row * num_nodes + col - offset population = (num_nodes * (num_nodes + 1)) // 2 - num_nodes return idx, population else: assert size[0] == size[1] num_nodes = size[0] mask = row != col row = tlx.convert_to_numpy(tlx.mask_select(row, mask)) col = tlx.convert_to_numpy(tlx.mask_select(col, mask)) indice = tlx.mask_select(tlx.arange(0, col.shape[0]), tlx.convert_to_tensor(row < col)) indice = tlx.convert_to_numpy(indice) col[indice] = col[indice] - 1 # col = tlx.tensor_scatter_nd_update(col, indice, tlx.gather(col, indice) - 1) idx = row * (num_nodes - 1) + col population = num_nodes * (num_nodes - 1) return tlx.convert_to_tensor(idx), population def vector_to_edge_index(idx, size, bipartite, force_undirected): if bipartite: row = idx // size[1] col = idx % size[1] return tlx.stack([row, col]) else: assert size[0] == size[1] num_nodes = size[0] if force_undirected: offset = tlx.cumsum(tlx.arange(1, num_nodes)) end = tlx.arange(num_nodes, num_nodes * num_nodes, num_nodes) row = tlx.convert_to_tensor(np.digitize(idx, end - offset, right=True)) col = (tlx.gather(offset, row) + idx) % num_nodes return tlx.stack([tlx.concat([row, col]), tlx.concat([col, row])]) else: row = idx // (num_nodes - 1) col = idx % (num_nodes - 1) row = tlx.convert_to_numpy(row) col = tlx.convert_to_numpy(col) indice = tlx.mask_select(tlx.arange(0, col.shape[0]), tlx.convert_to_tensor(row <= col)) indice = tlx.convert_to_numpy(indice) col[indice] = col[indice] + 1 row = tlx.convert_to_tensor(row) col = tlx.convert_to_tensor(col) # col = tlx.scatter_update(col, indice, tlx.gather(col, indice) + 1) return tlx.stack([row, col])