from .base_model import BaseKGE
import torch
import numpy as np
[docs]
class FMult(BaseKGE):
""" Learning Knowledge Neural Graphs"""
""" Learning Neural Networks for Knowledge Graphs"""
def __init__(self, args):
super().__init__(args)
self.name = 'FMult'
self.entity_embeddings = torch.nn.Embedding(self.num_entities, self.embedding_dim)
self.relation_embeddings = torch.nn.Embedding(self.num_relations, self.embedding_dim)
self.param_init(self.entity_embeddings.weight.data), self.param_init(self.relation_embeddings.weight.data)
self.k = int(np.sqrt(self.embedding_dim // 2))
self.num_sample = 50
# self.gamma = torch.rand(self.k, self.num_sample) [0,1) uniform=> worse results
self.gamma = torch.randn(self.k, self.num_sample) # N(0,1)
# Lazy import
from scipy.special import roots_legendre
roots, weights = roots_legendre(self.num_sample)
self.roots = torch.from_numpy(roots).repeat(self.k, 1).float() # shape self.k by self.n
self.weights = torch.from_numpy(weights).reshape(1, -1).float() # shape 1 by self.n
[docs]
def compute_func(self, weights: torch.FloatTensor, x) -> torch.FloatTensor:
n = len(weights)
# Weights for two linear layers.
w1, w2 = torch.hsplit(weights, 2)
# (1) Construct two-layered neural network
w1 = w1.view(n, self.k, self.k)
w2 = w2.view(n, self.k, self.k)
# (2) Forward Pass
out1 = torch.tanh(w1 @ x) # torch.sigmoid => worse results
out2 = w2 @ out1
return out2 # no non-linearity => better results
[docs]
def chain_func(self, weights, x: torch.FloatTensor):
n = len(weights)
# Weights for two linear layers.
w1, w2 = torch.hsplit(weights, 2)
# (1) Construct two-layered neural network
w1 = w1.view(n, self.k, self.k)
w2 = w2.view(n, self.k, self.k)
# (2) Perform the forward pass
out1 = torch.tanh(torch.bmm(w1, x))
out2 = torch.bmm(w2, out1)
return out2
[docs]
def forward_triples(self, idx_triple: torch.Tensor) -> torch.Tensor:
# (1) Retrieve embeddings: batch, \mathbb R^d
head_ent_emb, rel_ent_emb, tail_ent_emb = self.get_triple_representation(idx_triple)
# (2) Compute NNs on \Gamma
# Logits via FDistMult...
# h_x = self.compute_func(head_ent_emb, x=self.gamma) # batch, \mathbb{R}^k, |\Gamma|
# r_x = self.compute_func(rel_ent_emb, x=self.gamma) # batch, \mathbb{R}^k, |\Gamma|
# t_x = self.compute_func(tail_ent_emb, x=self.gamma) # batch, \mathbb{R}^k, |\Gamma|
# out = h_x * r_x * t_x # batch, \mathbb{R}^k, |gamma|
# (2) Compute NNs on \Gamma
self.gamma=self.gamma.to(head_ent_emb.device)
h_x = self.compute_func(head_ent_emb, x=self.gamma) # batch, \mathbb{R}^k, |\Gamma|
t_x = self.compute_func(tail_ent_emb, x=self.gamma) # batch, \mathbb{R}^k, |\Gamma|
r_h_x = self.chain_func(weights=rel_ent_emb, x=h_x) # batch, \mathbb{R}^k, |\Gamma|
# (3) Compute |\Gamma| predictions
out = torch.sum(r_h_x * t_x, dim=1) # batch, |gamma| #
# (4) Average (3) over \Gamma
out = torch.mean(out, dim=1) # batch
return out
[docs]
class GFMult(BaseKGE):
""" Learning Knowledge Neural Graphs"""
""" Learning Neural Networks for Knowledge Graphs"""
def __init__(self, args):
super().__init__(args)
self.name = 'GFMult'
self.entity_embeddings = torch.nn.Embedding(self.num_entities, self.embedding_dim)
self.relation_embeddings = torch.nn.Embedding(self.num_relations, self.embedding_dim)
self.param_init(self.entity_embeddings.weight.data), self.param_init(self.relation_embeddings.weight.data)
self.k = int(np.sqrt(self.embedding_dim // 2))
self.num_sample = 250
# Lazy import
from scipy.special import roots_legendre
roots, weights = roots_legendre(self.num_sample)
self.roots = torch.from_numpy(roots).repeat(self.k, 1).float() # shape self.k by self.n
self.weights = torch.from_numpy(weights).reshape(1, -1).float() # shape 1 by self.n
[docs]
def compute_func(self, weights: torch.FloatTensor, x) -> torch.FloatTensor:
n = len(weights)
# Weights for two linear layers.
w1, w2 = torch.hsplit(weights, 2)
# (1) Construct two-layered neural network
w1 = w1.view(n, self.k, self.k)
w2 = w2.view(n, self.k, self.k)
# (2) Forward Pass
out1 = torch.tanh(w1 @ x) # torch.sigmoid => worse results
out2 = w2 @ out1
return out2 # no non-linearity => better results
[docs]
def chain_func(self, weights, x: torch.FloatTensor):
n = len(weights)
# Weights for two linear layers.
w1, w2 = torch.hsplit(weights, 2)
# (1) Construct two-layered neural network
w1 = w1.view(n, self.k, self.k)
w2 = w2.view(n, self.k, self.k)
# (2) Perform the forward pass
out1 = torch.tanh(torch.bmm(w1, x))
out2 = torch.bmm(w2, out1)
return out2
[docs]
def forward_triples(self, idx_triple: torch.Tensor) -> torch.Tensor:
# (1) Retrieve embeddings: batch, \mathbb R^d
head_ent_emb, rel_ent_emb, tail_ent_emb = self.get_triple_representation(idx_triple)
# (2) Compute NNs on \Gamma
self.roots=self.roots.to(head_ent_emb.device)
self.weights=self.weights.to(head_ent_emb.device)
h_x = self.compute_func(head_ent_emb, x=self.roots) # batch, \mathbb{R}^k, |\Gamma|
t_x = self.compute_func(tail_ent_emb, x=self.roots) # batch, \mathbb{R}^k, |\Gamma|
r_h_x = self.chain_func(weights=rel_ent_emb, x=h_x) # batch, \mathbb{R}^k, |\Gamma|
# (3) Compute |\Gamma| predictions.
out = torch.sum(r_h_x * t_x, dim=1)*self.weights # batch, |gamma| #
# (4) Average (3) over \Gamma
out = torch.mean(out, dim=1) # batch
return out
[docs]
class FMult2(BaseKGE):
""" Learning Knowledge Neural Graphs"""
""" Learning Neural Networks for Knowledge Graphs"""
def __init__(self, args):
super().__init__(args)
self.name = 'FMult2'
self.n_layers = 3
tuned_embedding_dim = False
while int(np.sqrt((self.embedding_dim - 1) / self.n_layers)) != np.sqrt(
(self.embedding_dim - 1) / self.n_layers):
self.embedding_dim += 1
tuned_embedding_dim = True
if tuned_embedding_dim:
print(f"\n\n*****Embedding dimension reset to {self.embedding_dim} to fit model architecture!*****\n")
self.k = int(np.sqrt((self.embedding_dim - 1) // self.n_layers))
self.n = 50
self.a, self.b = -1.0, 1.0
# self.score_func = "vtp" # "vector triple product"
# self.score_func = "trilinear"
self.score_func = "compositional"
# self.score_func = "full-compositional"
# self.discrete_points = torch.linspace(self.a, self.b, steps=self.n)
self.discrete_points = torch.linspace(self.a, self.b, steps=self.n).repeat(self.k, 1)
self.entity_embeddings = torch.nn.Embedding(self.num_entities, self.embedding_dim)
self.relation_embeddings = torch.nn.Embedding(self.num_relations, self.embedding_dim)
self.param_init(self.entity_embeddings.weight.data), self.param_init(self.relation_embeddings.weight.data)
[docs]
def build_func(self, Vec):
n = len(Vec)
# (1) Construct self.n_layers layered neural network
W = list(torch.hsplit(Vec[:, :-1], self.n_layers))
# (2) Reshape weights of the layers
for i, w in enumerate(W):
W[i] = w.reshape(n, self.k, self.k)
return W, Vec[:, -1]
[docs]
def build_chain_funcs(self, list_Vec):
list_W = []
list_b = []
for Vec in list_Vec:
W_, b = self.build_func(Vec)
list_W.append(W_)
list_b.append(b)
W = list_W[-1][1:]
for i in range(len(list_W) - 1):
for j, w in enumerate(list_W[i]):
if i == 0 and j == 0:
W_temp = w
else:
W_temp = w @ W_temp
W_temp = W_temp + list_b[i].reshape(-1, 1, 1)
W_temp = list_W[-1][0] @ W_temp / ((len(list_Vec) - 1) * w.shape[1])
W.insert(0, W_temp)
return W, list_b[-1]
[docs]
def compute_func(self, W, b, x) -> torch.FloatTensor:
out = W[0] @ x
for i, w in enumerate(W[1:]):
if i % 2 == 0: # no non-linearity => better results
out = out + torch.tanh(w @ out)
else:
out = out + w @ out
return out + b.reshape(-1, 1, 1)
[docs]
def function(self, list_W, list_b):
def f(x):
if len(list_W) == 1:
return self.compute_func(list_W[0], list_b[0], x)
score = self.compute_func(list_W[0], list_b[0], x)
for W, b in zip(list_W[1:], list_b[1:]):
score = score * self.compute_func(W, b, x)
return score
return f
[docs]
def trapezoid(self, list_W, list_b):
return torch.trapezoid(self.function(list_W, list_b)(self.discrete_points), x=self.discrete_points, dim=-1).sum(
dim=-1)
[docs]
def forward_triples(self, idx_triple: torch.Tensor) -> torch.Tensor:
# (1) Retrieve embeddings: batch, \mathbb R^d
head_ent_emb, rel_emb, tail_ent_emb = self.get_triple_representation(idx_triple)
if self.discrete_points.device != head_ent_emb.device:
self.discrete_points = self.discrete_points.to(head_ent_emb.device)
if self.score_func == "vtp":
h_W, h_b = self.build_func(head_ent_emb)
r_W, r_b = self.build_func(rel_emb)
t_W, t_b = self.build_func(tail_ent_emb)
out = -self.trapezoid([t_W], [t_b]) * self.trapezoid([h_W, r_W], [h_b, r_b]) + self.trapezoid([r_W], [
r_b]) * self.trapezoid([t_W, h_W], [t_b, h_b])
elif self.score_func == "compositional":
t_W, t_b = self.build_func(tail_ent_emb)
chain_W, chain_b = self.build_chain_funcs([head_ent_emb, rel_emb])
out = self.trapezoid([chain_W, t_W], [chain_b, t_b])
elif self.score_func == "full-compositional":
chain_W, chain_b = self.build_chain_funcs([head_ent_emb, rel_emb, tail_ent_emb])
out = self.trapezoid([chain_W], [chain_b])
elif self.score_func == "trilinear":
h_W, h_b = self.build_func(head_ent_emb)
r_W, r_b = self.build_func(rel_emb)
t_W, t_b = self.build_func(tail_ent_emb)
out = self.trapezoid([h_W, r_W, t_W], [h_b, r_b, t_b])
return out
[docs]
class LFMult1(BaseKGE):
'''Embedding with trigonometric functions. We represent all entities and relations in the complex number space as:
f(x) = \sum_{k=0}^{k=d-1}wk e^{kix}. and use the three differents scoring function as in the paper to evaluate the score'''
def __init__(self,args):
super().__init__(args)
self.name = 'LFMult1'
self.entity_embeddings = torch.nn.Embedding(self.num_entities, self.embedding_dim)
self.relation_embeddings = torch.nn.Embedding(self.num_relations, self.embedding_dim)
[docs]
def forward_triples(self, idx_triple): # idx_triplet = (h_idx, r_idx, t_idx) #change this to the forward_triples
head_ent_emb, rel_emb, tail_ent_emb = self.get_triple_representation(idx_triple)
score = self.vtp_score(head_ent_emb,rel_emb,tail_ent_emb)
return score
[docs]
def tri_score(self,h,r,t):
i_range, j_range, k_range = torch.meshgrid(torch.arange(self.embedding_dim),torch.arange(self.embedding_dim),torch.arange(self.embedding_dim))
eps = 10**-6 #for stability reason
cond = i_range + j_range == k_range
s1 = torch.sum(torch.where(~cond, torch.zeros_like(~cond), h[:, i_range] * r[:, j_range] * t[:, k_range]),dim=(-3,-2,-1)) # sum on i+j = k
s2 = torch.sum(torch.where(cond, torch.zeros_like(cond), torch.sin(i_range + j_range - k_range) \
* h[:, i_range] * r[:, j_range] * t[:, k_range] /(eps+i_range + j_range - k_range)),dim=(-3,-2,-1))# sum on i+j != k
s = s1 + s2 # combine the two sums.
return s
[docs]
def vtp_score(self,h,r,t):
i_range, j_range = torch.meshgrid(torch.arange(self.embedding_dim),torch.arange(self.embedding_dim))
eps = 10**-6 #for stability reason
cond = i_range == j_range
p1 = torch.sum(torch.where(cond, torch.zeros_like(cond), torch.sin(i_range - j_range) \
* h[:, i_range] * t[:, j_range] /(eps+i_range - j_range)),dim=(-3,-2,-1)) \
+ torch.sum(h[:, i_range] * t[:, i_range],dim=(-3,-2,-1))# sum on i != j
i_1 = torch.arange(1,self.embedding_dim)
p2 = torch.sum(r[:, i_1] * torch.sin(i_1)/(i_1) ,dim=-1) + r[:,0]
s1 = p1*p2
p3 = torch.sum(torch.where(cond, torch.zeros_like(cond), torch.sin(i_range - j_range) \
* r[:, i_range] * t[:, j_range] /(eps+i_range - j_range)),dim=(-3,-2,-1)) \
+ torch.sum(r[:, i_range] * t[:, i_range],dim=(-3,-2,-1))# sum on i != j
p4 = torch.sum(h[:, i_1] * torch.sin(i_1)/(i_1) ,dim=-1) + h[:,0]
s2 = p3*p4
s = s1 - s2 # combine the two sums.
return s
[docs]
class LFMult(BaseKGE):
'''Embedding with polynomial functions. We represent all entities and relations in the polynomial space as:
f(x) = \sum_{i=0}^{d-1} a_k x^{i%d} and use the three differents scoring function as in the paper to evaluate the score.
We also consider combining with Neural Networks.'''
def __init__(self,args):
super().__init__(args)
self.name = 'LFMult'
self.entity_embeddings = torch.nn.Embedding(self.num_entities, self.embedding_dim)
self.relation_embeddings = torch.nn.Embedding(self.num_relations, self.embedding_dim)
self.degree = self.args.get("degree",0)
self.m = int(self.embedding_dim/(1+self.degree))
self.x_values = torch.linspace(0, 1, 100)
[docs]
def forward_triples(self, idx_triple): # idx_triplet = (h_idx, r_idx, t_idx) #change this to the forward_triples
head_ent_emb, rel_emb, tail_ent_emb = self.get_triple_representation(idx_triple)
coeff_head, coeff_rel, coeff_tail = self.construct_multi_coeff(head_ent_emb), self.construct_multi_coeff(rel_emb), self.construct_multi_coeff(tail_ent_emb)
###### polynomial score with trilinear scoring
# score = self.tri_score(coeff_head,coeff_rel,coeff_tail)
# score = score.reshape(-1,self.m).sum(dim=1)
##### polynomial score with NN
score = torch.trapezoid(self.poly_NN(self.x_values,coeff_head, coeff_rel, coeff_tail),self.x_values)
# score = integral_value.reshape(1,-1).squeeze(0)
return score
[docs]
def construct_multi_coeff(self, x):
coeffs = torch.hsplit(x,self.degree + 1)
coeffs = torch.stack(coeffs,dim=1)
return coeffs.transpose(1,2)
[docs]
def poly_NN(self, x, coefh, coefr, coeft):
''' Constructing a 2 layers NN to represent the embeddings.
h = \sigma(wh^T x + bh ), r = \sigma(wr^T x + br ), t = \sigma(wt^T x + bt )'''
wh, bh = coefh[:, :self.m,0], coefh[:, :self.m,1]
wr, br = coefr[:, :self.m,0], coefr[:, :self.m,1]
wt, bt = coeft[:, :self.m,0], coeft[:, :self.m,1]
h_emb = self.linear(x,wh,bh).reshape(-1,self.m,x.size(0))
r_emb = self.linear(x,wr,br).reshape(-1,self.m,x.size(0))
t_emb = self.linear(x,wt,bt).reshape(-1,self.m,x.size(0))
return self.scalar_batch_NN(h_emb, r_emb, t_emb)#(linear(x,wh,bh)*linear(x,wr,br)*linear(x,wt,bt))
[docs]
def linear(self,x,w,b):
return torch.tanh((w.reshape(-1,1)*x.unsqueeze(0) + b.reshape(-1,1)))
[docs]
def scalar_batch_NN(self, a, b, c):
'''element wise multiplication between a,b and c:
Inputs : a, b, c ====> torch.tensor of size batch_size x m x d
Output : a tensor of size batch_size x d'''
a_reshaped = a.transpose(1, 2).reshape(-1, a.size(-1), a.size(-2))
b_reshaped = b.transpose(1, 2).reshape(-1, b.size(-1), b.size(-2))
c_reshaped = c.transpose(1, 2).reshape(-1, c.size(-1), c.size(-2))
mul_result = a_reshaped * b_reshaped * c_reshaped
return mul_result.sum(dim=-1)
[docs]
def tri_score(self, coeff_h, coeff_r, coeff_t):
'''this part implement the trilinear scoring techniques:
score(h,r,t) = \int_{0}{1} h(x)r(x)t(x) dx = \sum_{i,j,k = 0}^{d-1} \dfrac{a_i*b_j*c_k}{1+(i+j+k)%d}
1. generate the range for i,j and k from [0 d-1]
2. perform
\dfrac{a_i*b_j*c_k}{1+(i+j+k)%d} in parallel for every batch
3. take the sum over each batch
'''
i_range, j_range, k_range = torch.meshgrid(torch.arange(self.degree+1),torch.arange(self.degree+1),torch.arange(self.degree+1))
if self.degree == 0:
terms = 1 / (1 + i_range + j_range + k_range) #%self.degree
else:
terms = 1 / (1 + (i_range + j_range + k_range)%self.degree) #%self.degree
weighted_terms = terms.unsqueeze(0)*coeff_h.reshape(-1, 1, self.degree+1, 1) *coeff_r.reshape(-1, self.degree+1, 1, 1) * coeff_t.reshape(-1, 1, 1,self.degree+1)
result = torch.sum(weighted_terms, dim=[-3,-2,-1])
return result
[docs]
def vtp_score(self, h, r, t):
'''this part implement the vector triple product scoring techniques:
score(h,r,t) = \int_{0}{1} h(x)r(x)t(x) dx = \sum_{i,j,k = 0}^{d-1} \dfrac{a_i*c_j*b_k - b_i*c_j*a_k}{(1+(i+j)%d)(1+k)}
1. generate the range for i,j and k from [0 d-1]
2. Compute the first and second terms of the sum
3. Multiply with then denominator and take the sum
4. take the sum over each batch
'''
i_range, j_range, k_range = torch.meshgrid(torch.arange(self.embedding_dim),torch.arange(self.embedding_dim),torch.arange(self.embedding_dim))
# terms = 1 / (1 + (i_range + j_range)%self.embedding_dim) / (1+ k_range) # with modulo
terms = 1 / (1 + i_range + j_range) / (1+ k_range) #without dthe modulo
terms1 = h.view(-1, 1, self.embedding_dim, 1) * t.view(-1, self.embedding_dim, 1, 1) * r.view(-1, 1, 1,self.embedding_dim)
terms2 = r.view(-1, 1, self.embedding_dim, 1) * t.view(-1, self.embedding_dim, 1, 1) * h.view(-1, 1, 1,self.embedding_dim)
weighted_terms = terms * (terms1-terms2)
result = torch.sum(weighted_terms, dim=[-3,-2,-1])
return result
[docs]
def comp_func(self,h,r,t):
'''this part implement the function composition scoring techniques: i.e. score = <hor, t>'''
degree = torch.arange(self.embedding_dim, dtype=torch.float32)
r_emb = self.polynomial(r,self.x_values,degree)
t_emb = self.polynomial(t,self.x_values,degree)
hor = self.pop(h,r_emb,degree)
score = torch.trapz(hor*t_emb , self.x_values) #Computing the score with the trapezoid method
return score
[docs]
def polynomial(self,coeff,x,degree):
'''This function takes a matrix tensor of coefficients (coeff), a tensor vector of points x and range of integer [0,1,...d]
and return a vector tensor (coeff[0][0] + coeff[0][1]x +...+ coeff[0][d]x^d,
coeff[1][0] + coeff[1][1]x +...+ coeff[1][d]x^d)
....'''
x_powers = x.unsqueeze(1) ** degree
vect = torch.matmul(coeff,x_powers.T)
return vect
[docs]
def pop(self,coeff,x,degree):
'''This function allow us to evaluate the composition of two polynomes without for loops :)
it takes a matrix tensor of coefficients (coeff), a matrix tensor of points x and range of integer [0,1,...d]
and return a tensor (coeff[0][0] + coeff[0][1]x +...+ coeff[0][d]x^d,
coeff[1][0] + coeff[1][1]x +...+ coeff[1][d]x^d)
....'''
x_powers = x.unsqueeze(2) ** degree
Mat = (coeff.unsqueeze(1)*x_powers).sum(dim=-1)
return Mat