torch 1.10.0+cu113を使用しています
class Image_captioning: # Device configuration def load_image(self,color_image_path, transform=None): image = Image.open(color_image_path) image = image.resize([224, 224], Image.LANCZOS) if transform is not None: image = transform(image).unsqueeze(0) return image def text_reading(self): device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Model parameters (should be same as paramters in train.py) embed_size=256 hidden_size=512 num_layers=1 files = sorted(glob.glob(count_image_path)) for i, image_path in enumerate (files): # Image preprocessing transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))]) # Load vocabulary wrapper with open(vocab_path, 'rb') as f: vocab = pickle.load(f) # Build models encoder = EncoderCNN(embed_size).eval() # eval mode (batchnorm uses moving mean/variance) decoder = DecoderRNN(embed_size, hidden_size, len(vocab), num_layers) encoder = encoder.to(device) decoder = decoder.to(device) # Load the trained model parameters encoder.load_state_dict(torch.load(encoder_path)) decoder.load_state_dict(torch.load(decoder_path)) # Prepare an image image = self.load_image(image_path, transform) image_tensor = image.to(device) # Generate an caption from the image feature = encoder(image_tensor) sampled_ids = decoder.sample(feature) sampled_ids = sampled_ids[0].cpu().numpy() # (1, max_seq_length) -> (max_seq_length) # Convert word_ids to words sampled_caption = [] for word_id in sampled_ids: word = vocab.idx2word[word_id] sampled_caption.append(word) if word == '<end>': break sentence = ' '.join(sampled_caption) # Print out the image and the generated caption image = Image.open(image_path) description = sentence.replace('<start>',' ',1).replace('<end>',' ',1) print (description)
def main(): m = Main() r = Run_image() executor = concurrent.futures.ProcessPoolExecutor(max_workers=60) executor.submit(m.process) r_running = False while True: if not r_running and os.path.exists(count_image_path)== True: #executor.submit(r.run)こちらがclass Image_captioningにつながる関数です executor.submit(r.run) r_running = True print(r_running) if not os.path.exists(count_image_path): r_running = False
import torch import torch.nn as nn import torchvision.models as models from torch.nn.utils.rnn import pack_padded_sequence class EncoderCNN(nn.Module): def __init__(self, embed_size): """Load the pretrained ResNet-152 and replace top fc layer.""" super(EncoderCNN, self).__init__() resnet = models.resnet152(pretrained=True) modules = list(resnet.children())[:-1] # delete the last fc layer. self.resnet = nn.Sequential(*modules) self.linear = nn.Linear(resnet.fc.in_features, embed_size) self.bn = nn.BatchNorm1d(embed_size, momentum=0.01) def forward(self, images): """Extract feature vectors from input images.""" with torch.no_grad(): features = self.resnet(images) features = features.reshape(features.size(0), -1) features = self.bn(self.linear(features)) return features class DecoderRNN(nn.Module): def __init__(self, embed_size, hidden_size, vocab_size, num_layers, max_seq_length=20): """Set the hyper-parameters and build the layers.""" super(DecoderRNN, self).__init__() self.embed = nn.Embedding(vocab_size, embed_size) self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True) self.linear = nn.Linear(hidden_size, vocab_size) self.max_seg_length = max_seq_length def forward(self, features, captions, lengths): """Decode image feature vectors and generates captions.""" embeddings = self.embed(captions) embeddings = torch.cat((features.unsqueeze(1), embeddings), 1) packed = pack_padded_sequence(embeddings, lengths, batch_first=True) hiddens, _ = self.lstm(packed) outputs = self.linear(hiddens[0]) return outputs def sample(self, features, states=None): """Generate captions for given image features using greedy search.""" sampled_ids = [] inputs = features.unsqueeze(1) for i in range(self.max_seg_length): hiddens, states = self.lstm(inputs, states) # hiddens: (batch_size, 1, hidden_size) outputs = self.linear(hiddens.squeeze(1)) # outputs: (batch_size, vocab_size) _, predicted = outputs.max(1) # predicted: (batch_size) sampled_ids.append(predicted) inputs = self.embed(predicted) # inputs: (batch_size, embed_size) inputs = inputs.unsqueeze(1) # inputs: (batch_size, 1, embed_size) sampled_ids = torch.stack(sampled_ids, 1) # sampled_ids: (batch_size, max_seq_length) return sampled_ids