###前提・実現したいこと
csv形式の複数の時系列データをRNNによってクラス分類したいです。
下記のサイトでは、
http://docs.fabo.io/tensorflow/model_sequence/rnn_basic.html
3種類のランダムな波形を作っていますが、実現したいこととしては、
用意した3種類のcsvの時系列のデータ(それぞれのcsvの列毎に計測回数分のデータがあります。下記は1つのファイル内のイメージです)を読み込み、分類したいと考えています。
よろしくお願いいたします。
###該当のソースコード
from future import absolute_import
from future import division
from future import print_function
import random
import numpy as np
import tensorflow as tf
from tensorflow.contrib import rnn
random.seed(777)
np.random.seed(777)
tf.set_random_seed(777)
#パラメーター
N_CLASSES = 3 # クラス数
N_INPUTS = 1 # 1ステップに入力されるデータ数
N_STEPS = 200 # 学習ステップ数
LEN_SEQ = 10 # 系列長
N_NODES = 64 # ノード数
N_DATA = 1000 # 各クラスの学習用データ数
N_TEST = 1000 # テスト用データ数
BATCH_SIZE = 20 # バッチサイズ
# データの準備
def gen_non_pulse(len_seq):
"""波を持たない系列データを生成する"""
ret = np.random.rand(len_seq)
ret = np.append(ret, 0)
return ret.reshape(-1,1)
def gen_pulse(len_seq, positive=True):
"""波を持つ系列データを生成する"""
seq = np.zeros(len_seq)
i = random.randint(0, len_seq-3) # 波を立てる位置
w = random.randint(1, 4)
w = w if positive else w * (-1.)
e = 3 if positive else -3
l = 1 if positive else 2 # ラベル
seq[i], seq[i+1], seq[i+2] = w, w+e, w
noise = np.random.rand(len_seq)
ret = seq + noise
ret = np.append(ret, l) # ラベルを加える
return ret.reshape(-1,1)
def gen_dataset(len_seq, n_data):
class_01_data = [gen_non_pulse(len_seq) for _ in range(n_data)]
class_02_data = [gen_pulse(len_seq, positive=True) for _ in range(n_data)]
class_03_data = [gen_pulse(len_seq, positive=False) for _ in range(n_data)]
dataset = np.r_[class_01_data, class_02_data, class_03_data]
np.random.shuffle(dataset)
x_ = dataset[:,:10]
t_ = dataset[:,10].reshape(-1)
return x_, t_
x_train, t_train = gen_dataset(LEN_SEQ, N_DATA) # 学習用データセット
x_test, t_test = gen_dataset(LEN_SEQ, N_DATA) # テスト用データセット
# モデルの構築
x = tf.placeholder(tf.float32, [None, LEN_SEQ, N_INPUTS]) # 入力データ
t = tf.placeholder(tf.int32, [None]) # 教師データ
t_on_hot = tf.one_hot(t, depth=N_CLASSES, dtype=tf.float32) # 1-of-Kベクトル
cell = rnn.BasicRNNCell(num_units=N_NODES, activation=tf.nn.tanh) # 中間層のセル
# RNNに入力およびセル設定する
outputs, states = tf.nn.dynamic_rnn(cell=cell, inputs=x, dtype=tf.float32, time_major=False)
# [ミニバッチサイズ,系列長,出力数]→[系列長,ミニバッチサイズ,出力数]
outputs = tf.transpose(outputs, perm=[1, 0, 2])
w = tf.Variable(tf.random_normal([N_NODES, N_CLASSES], stddev=0.01))
b = tf.Variable(tf.zeros([N_CLASSES]))
logits = tf.matmul(outputs[-1], w) + b # 出力層
pred = tf.nn.softmax(logits) # ソフトマックス
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=t_on_hot, logits=logits)
loss = tf.reduce_mean(cross_entropy) # 誤差関数
train_step = tf.train.AdamOptimizer().minimize(loss) # 学習アルゴリズム
correct_prediction = tf.equal(tf.argmax(pred,1), tf.argmax(t_on_hot,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # 精度
# 学習の実行
sess = tf.Session()
sess.run(tf.global_variables_initializer())
i = 0
for _ in range(N_STEPS):
cycle = int(N_DATA*3 / BATCH_SIZE)
begin = int(BATCH_SIZE * (i % cycle))
end = begin + BATCH_SIZE
x_batch, t_batch = x_train[begin:end], t_train[begin:end]
sess.run(train_step, feed_dict={x:x_batch, t:t_batch})
i += 1
if i % 10 == 0:
loss_, acc_ = sess.run([loss, accuracy], feed_dict={x:x_batch,t:t_batch})
loss_test_, acc_test_ = sess.run([loss, accuracy], feed_dict={x:x_test,t:t_test})
print("[TRAIN] loss : %f, accuracy : %f" %(loss_, acc_))
print("[TEST loss : %f, accuracy : %f" %(loss_test_, acc_test_))
sess.close()
あなたの回答
tips
プレビュー