import torch
import copy
from typing import List
[docs]
class EnsembleKGE:
def __init__(self, seed_model=None,pretrained_models:List=None):
if seed_model is not None:
self.models = []
self.optimizers = []
self.loss_history = []
for i in range(torch.cuda.device_count()):
i_model=copy.deepcopy(seed_model)
# TODO: Why we cant send the compile model to cpu ?
#i_model = torch.compile(i_model)
i_model.to(torch.device(f"cuda:{i}"))
self.optimizers.append(i_model.configure_optimizers())
self.models.append(i_model)
else:
assert pretrained_models is not None
self.models = pretrained_models
self.optimizers = []
self.loss_history = []
for i in range(torch.cuda.device_count()):
self.models[i].to(torch.device(f"cuda:{i}"))
self.optimizers.append(self.models[i].configure_optimizers())
# Maybe use the original model's name ?
self.name=self.models[0].name
self.train_mode=True
[docs]
def named_children(self):
return self.models[0].named_children()
@property
def example_input_array(self):
return self.models[0].example_input_array
@property
def _trainer(self):
return self.models[0]._trainer
[docs]
def parameters(self):
return [ x for i in self.models for x in i.parameters()]
# return self.models[0].parameters()
[docs]
def modules(self):
return [x for i in self.models for x in i.modules()]
[docs]
def __iter__(self):
return (i for i in self.models)
[docs]
def __len__(self):
return len(self.models)
[docs]
def eval(self):
for model in self.models:
model.eval()
self.train_mode=False
[docs]
def to(self,device):
for i in range(len(self.models)):
if device == "cpu":
self.models[i].cpu()
else:
raise NotImplementedError
[docs]
def mem_of_model(self):
mem_of_ensemble={'EstimatedSizeMB': 0, 'NumParam': 0}
for i in self.models:
for k,v in i.mem_of_model().items():
mem_of_ensemble[k] += v
return mem_of_ensemble
[docs]
def __call__(self,x_batch):
if self.train_mode is False:
yhat=0
for gpu_id, model in enumerate(self.models):
yhat += model(x_batch)
return yhat / len(self.models)
else:
for opt in self.optimizers:
opt.zero_grad()
yhat=None
for gpu_id, model in enumerate(self.models):
# Move batch into the GPU where the i.th model resides
if isinstance(x_batch, tuple):
x_batch=(x_batch[0].to(f"cuda:{gpu_id}"),x_batch[1].to(f"cuda:{gpu_id}"))
else:
x_batch=x_batch.to(f"cuda:{gpu_id}")
if yhat is None:
yhat=model(x_batch)
else:
yhat+=model(x_batch).to("cuda:0")
return yhat/len(self.models)
[docs]
def step(self):
for opt in self.optimizers:
opt.step()
[docs]
def get_embeddings(self):
entity_embeddings=[]
relation_embeddings=[]
# () Iterate
for trained_model in self.models:
entity_emb, relation_ebm = trained_model.get_embeddings()
entity_embeddings.append(entity_emb)
if relation_ebm is not None:
relation_embeddings.append(relation_ebm)
# () Concat the embedding vectors horizontally.
entity_embeddings=torch.cat(entity_embeddings,dim=1)
if relation_embeddings:
relation_embeddings=torch.cat(relation_embeddings,dim=1)
else:
relation_embeddings=None
return entity_embeddings, relation_embeddings
"""
def __getattr__(self, name):
# Create a function that will call the same attribute/method on each model
def method(*args, **kwargs):
results = []
for model in self.models:
attr = getattr(model, name)
if callable(attr):
# If it's a method, call it with provided arguments
results.append(attr(*args, **kwargs))
else:
# If it's an attribute, just get its value
results.append(attr)
return results
return method
"""
[docs]
def __str__(self):
return f"EnsembleKGE of {len(self.models)} {self.models[0]}"