質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

88.36%

GAN 損失関数の定義について

受付中

回答 0

投稿

  • 評価
  • クリップ 0
  • VIEW 1,394

uekin_

score 11

GANの解説ページを見ると次のような数式が有り,
プログラム上で何らかの計算を定義しないといけないのかと考えました。

イメージ説明

しかしながら,参考にした次のページのコードには,
それらしき物が確認できませんでした。
http://cedro3.com/ai/keras-anogan-anomaly/
(anoGANで異常検知を解説しているページです)
(コードは後ろに記載)

質問は
①参考ページのコードは,上記の数式を実現しているのでしょうか?
②それとも上記の数式を使用せずに動かしているのでしょうか?
③ここで出力されるloss(d_loss,g_loss)が小さくなることが学習が進んでいる事になるのでしょうか?

ご教授いただけると幸いです
よろしくお願いします

ちなみに
LOSSに関係する部分は次のとおり・・・かな?
【モデルの作成】
self.d = Discriminator(image_shape).get_model()
self.g = Generator(input_dim, image_shape).get_model()
【コンパイル】 
self.dcgan = Sequential([self.g, self.d])
self.dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
self.d.compile(loss='binary_crossentropy', optimizer=d_optim)
【勾配更新】
d_loss = self.d.train_on_batch(X, y)
g_loss = self.dcgan.train_on_batch(noise, np.array([1] * batch_size))

(参考)
g_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
d_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)

(字数制限でコードは一部)

# Generator
class Generator(object):
    def __init__(self, input_dim, image_shape):
        INITIAL_CHANNELS = 128
        INITIAL_SIZE = 7

        inputs = Input((input_dim,))
        fc1 = Dense(input_dim=input_dim, units=INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE)(inputs)
        fc1 = BatchNormalization()(fc1)
        fc1 = LeakyReLU(0.2)(fc1)
        fc2 = Reshape((INITIAL_SIZE, INITIAL_SIZE, INITIAL_CHANNELS), input_shape=(INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE,))(fc1)
        up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
        conv1 = Conv2D(64, (3, 3), padding='same')(up1)
        conv1 = BatchNormalization()(conv1)
        conv1 = Activation('relu')(conv1)
        up2 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv1)
        conv2 = Conv2D(image_shape[2], (5, 5), padding='same')(up2)
        outputs = Activation('tanh')(conv2)

        self.model = Model(inputs=[inputs], outputs=[outputs])

    def get_model(self):
        return self.model

# Discriminator
class Discriminator(object):
    def __init__(self, input_shape):
        inputs = Input(input_shape)
        conv1 = Conv2D(64, (5, 5), padding='same')(inputs)
        conv1 = LeakyReLU(0.2)(conv1)
        pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        conv2 = Conv2D(128, (5, 5), padding='same')(pool1)
        conv2 = LeakyReLU(0.2)(conv2)
        pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        fc1 = Flatten()(pool2)
        fc1 = Dense(1)(fc1)
        outputs = Activation('sigmoid')(fc1)

        self.model = Model(inputs=[inputs], outputs=[outputs])

    def get_model(self):
        return self.model

# DCGAN
class DCGAN(object):
    def __init__(self, input_dim, image_shape):
        self.input_dim = input_dim
        self.d = Discriminator(image_shape).get_model()
        self.g = Generator(input_dim, image_shape).get_model()

    def compile(self, g_optim, d_optim):
        self.d.trainable = False
        self.dcgan = Sequential([self.g, self.d])
        self.dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
        self.d.trainable = True
        self.d.compile(loss='binary_crossentropy', optimizer=d_optim)

    def train(self, epochs, batch_size, X_train):
        g_losses = []
        d_losses = []
        for epoch in range(epochs):
            np.random.shuffle(X_train)
            n_iter = X_train.shape[0] // batch_size
            progress_bar = Progbar(target=n_iter)
            for index in range(n_iter):
                # create random noise -> N latent vectors
                noise = np.random.uniform(-1, 1, size=(batch_size, self.input_dim))

                # load real data & generate fake data
                image_batch = X_train[index * batch_size:(index + 1) * batch_size]
                for i in range(batch_size):
                    if np.random.random() > 0.5:
                        image_batch[i] = np.fliplr(image_batch[i])
                    if np.random.random() > 0.5:
                        image_batch[i] = np.flipud(image_batch[i])
                generated_images = self.g.predict(noise, verbose=0)

                # attach label for training discriminator
                X = np.concatenate((image_batch, generated_images))
                y = np.array([1] * batch_size + [0] * batch_size)

                # training discriminator
                d_loss = self.d.train_on_batch(X, y)

                # training generator
                g_loss = self.dcgan.train_on_batch(noise, np.array([1] * batch_size))

                progress_bar.update(index, values=[('g', g_loss), ('d', d_loss)])
            g_losses.append(g_loss)
            d_losses.append(d_loss)
        return g_losses, d_losses


# AnoGAN
def sum_of_residual(y_true, y_pred):
    return K.sum(K.abs(y_true - y_pred))

class ANOGAN(object):
    def __init__(self, input_dim, g):
        self.input_dim = input_dim
        self.g = g
        g.trainable = False
        # Input layer cann't be trained. Add new layer as same size & same distribution
        anogan_in = Input(shape=(input_dim,))
        g_in = Dense((input_dim), activation='tanh', trainable=True)(anogan_in)
        g_out = g(g_in)
        self.model = Model(inputs=anogan_in, outputs=g_out)
        self.model_weight = None

    def compile(self, optim):
        self.model.compile(loss=sum_of_residual, optimizer=optim)
        K.set_learning_phase(0)

    def compute_anomaly_score(self, x, iterations=300):
        z = np.random.uniform(-1, 1, size=(1, self.input_dim))

        # learning for changing latent
        loss = self.model.fit(z, x, batch_size=1, epochs=iterations, verbose=0)
        loss = loss.history['loss'][-1]
        similar_data = self.model.predict_on_batch(z)

        return loss, similar_data

# train
if __name__ == '__main__':
    batch_size = 16
    epochs = 100   
    input_dim = 30
    g_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
    d_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)

    from keras.datasets import mnist
    #from keras.datasets import fashion_mnist

    # データセットの読み込み
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    #(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()  # Fashion MNIST 
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32') / 255
    x_test = x_test.astype('float32') / 255

    # 学習データの作成
    x_train_1 = []
    for i in range(len(x_train)):
        if y_train[i] == 1: 
           x_train_1.append(x_train[i].reshape((28, 28, 1))) 
    x_train_1 = np.array(x_train_1)
    print("train data:",len(x_train_1))

    # 評価データの作成
    cnt = 0
    x_test_9, y = [], []
    for i in range(len(x_test)):
        if y_test[i] == 1 or y_test[i] == 9:  
           x_test_9.append(x_test[i].reshape((28, 28, 1))) 
           y.append(y_test[i])
           cnt +=1

      if cnt == 100:
              break           
    x_test_9 = np.array(x_test_9)
    print("test_data:",len(x_test_9))    
    input_shape = x_train_1[0].shape   
    X_test_original = x_test_9.copy()    


    # train generator & discriminator
    dcgan = DCGAN(input_dim, input_shape)
    dcgan.compile(g_optim, d_optim)
    g_losses, d_losses = dcgan.train(epochs, batch_size, x_train_1)  
    with open('loss.csv', 'w') as f:
        for g_loss, d_loss in zip(g_losses, d_losses):
            f.write(str(g_loss) + ',' + str(d_loss) + '\n')


# test
K.set_learning_phase(1)

def denormalize(X):
    return ((X + 1.0)/2.0*255.0).astype(dtype=np.uint8)

if __name__ == '__main__':
    iterations = 100
    input_dim = 30
    anogan_optim = Adam(lr=0.001, beta_1=0.9, beta_2=0.999)

    # load weights
    dcgan = DCGAN(input_dim, input_shape)
    dcgan.load_weights('weights/generator_99.h5', 'weights/discriminator_99.h5')  #3999.99

    for i, test_img in enumerate(x_test_9):  
        test_img = test_img[np.newaxis,:,:,:]
        anogan = ANOGAN(input_dim, dcgan.g)

        anogan.compile(anogan_optim)
        anomaly_score, generated_img = anogan.compute_anomaly_score(test_img, iterations)

        generated_img = denormalize(generated_img)
        imgs = np.concatenate((denormalize(test_img[0]), generated_img[0]), axis=1)
        cv2.imwrite('predict' + os.sep + str(int(anomaly_score)) + '_' + str(i) + '.png', imgs)
        print(str(i) + ' %.2f'%anomaly_score)

        if y[i] == 1 :       
           with open('scores_1.txt', 'a') as f:
                f.write(str(anomaly_score) + '\n')
        else:
           with open('scores_9.txt', 'a') as f:
                f.write(str(anomaly_score) + '\n') 
  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 過去に投稿した質問と同じ内容の質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

まだ回答がついていません

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 88.36%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

関連した質問

同じタグがついた質問を見る