イタリックテキスト### 前提
LSTMの勉強のため、pythonでLSTMを用いて株価の予測を行うシステムを作成しています。
100日分のデータの内,80日分を学習データとして20日分の予測を行うシステムは完成しました。
様々なLSTMの実装を参考にしましたが、「どれも一つの時系列データの前半部分を学習に使って、後半部分を予測する」というのが趣旨でした。
しかし、僕は「A社,B社,C社...と複数の時系列データを100日分学習させて、ではZ社はどういう株価の推移をするのか,説明変数を与えて1~100日目まで全部予想する」というシステムを作成したいです。
そもそもこのような,複数の時系列データを学習させて,説明変数のみで0から予測するシステムをLSTMで作るのは可能なのでしょうか。
可能であるならば,どのような点を変更すれば良いのか。
ご教授お願いします。
初めての質問なので至らぬ点があればお願いします。
また、今回使用した入力データは80(時間)*129(説明変数),
出力データの次元は20(時間)*1(目的変数)となっています。
LSTMはpytorchで実装しました。
実現したいこと
t=1→80までを学習データ,t=81→100テストデータとするのではなく,
複数の時系列データのt=1→100を学習データとし,別の時系列データをt=1→100まで予測するモデルの作成
###参考にしたサイト
PyTorchを使ってLSTMでコロナ陽性者数を予測してみる
https://qiita.com/tsubauaaa/items/8411a22465811ec2ee11
pytorch で LSTM に入門したい...したくない?
https://hilinker.hatenablog.com/entry/2018/06/23/204910
該当のソースコード
python
ソースコード
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
%matplotlib inline
device=torch.device('cpu')
#基本情報
seq_length = 30# 1データの予測に用いる過去フレームは40とする
test_data_size = 20# 直近20frameをテストデータにする
pred_frames = 20# 予測するフレーム
epochs_num = 1#バッチを何回回すか
batch_size = 10
path="D:\最強データ\喜び_13~20\Actor_1\合体13.csv"
df=pd.read_csv(path,index_col=0,header=0)
#要らない項目の削除
df.pop("AU01_r")
df.pop("AU04_r")
df.pop("AU05_r")
df.pop("AU07_r")
df.pop("AU12_r")
df.pop("AU15_r")
df.pop("AU23_r")
data = df.values.astype(float)
data_length=data.shape[0]
data.shape
#学習データとテストデータ
train_data = data[:-test_data_size]
test_data = data[-test_data_size:]
#学習データとテストデータを最小値0と最大値1の範囲で正規化してTensor型にする
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 1))
train_data_normalized = scaler.fit_transform(train_data)
train_data_normalized = torch.FloatTensor(train_data_normalized)
test_data_normalized = scaler.fit_transform(test_data)
test_data_normalized = torch.FloatTensor(test_data_normalized)
#シーケンスに沿ったデータを作成する関数
def make_sequence_data(input_data, sequence_length):
data_length = len(input_data) # 全体のデータ数取得
seq_data=[]#説明変数が入った時系列データを入れるリスト
target_data=[]#seq_dataの目的変数を入れるリスト
# 全体からシーケンス分引いた数までループする
for i in range(data_length - sequence_length):
# 1個ずらして、シーケンス分のデータを取得していく
seq = input_data[i:i+sequence_length]
target = input_data[:,128][i+sequence_length:i+sequence_length+1]
seq_data.append(seq)
target_data.append(target)
return seq_data, target_data
seq,labels = make_sequence_data(train_data_normalized, seq_length)
#LSTMmodelを作成してインスタンス生成&損失関数と最適化関数を定義
class LSTM(nn.Module):
def init(self, input_size=129, hidden_layer_size=100, output_size=1):
super().init()
self.hidden_layer_size = hidden_layer_size
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_layer_size, batch_first=True)
self.linear = nn.Linear(in_features=hidden_layer_size, out_features=output_size)
def forward(self, x): # LSTMのinputは(batch_size, seq_len, input_size)にする # LSTMのoutputは(batch_size, seq_len, hidden_layer_size)となる # hidden stateとcell stateにはNoneを渡して0ベクトルを渡す lstm_out, (hn, cn) = self.lstm(x, None) # Linearのinputは(N,∗,in_features)にする prediction = self.linear(lstm_out[:, -1, :]) #[:,-1,:]で各バッチの最終時刻の特徴量の行が取れる return prediction
model = LSTM()
model.to(device)
#損失関数と最適化関数を定義
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
#バッチを作成する関数
losses = []
training_size=data.shape[0]-seq_length-test_data_size#学習データの数
def mkRandomBatch(train_x,train_y, batch_size,training_size,seq_length):
batch_x=torch.zeros(training_size, seq_length, 129, dtype=torch.float)
batch_y=torch.zeros(training_size, 1, dtype=torch.float)
batch_x_list=[]
batch_y_list=[]
ramdom_list=[]#ランダムな数がどう発生したのか記録するリスト
for i in range(batch_size): idx = np.random.randint(0, len(train_x) - 1) batch_x_list.append(train_x[idx]) batch_y_list.append(train_y[idx]) batch_x[i]=batch_x_list[i] batch_y[i]=batch_y_list[i] ramdom_list.append(idx) return batch_x,batch_y
batch_seq, batch_labels = mkRandomBatch(seq,labels,batch_size,training_size,seq_length)
#エポックを回して学習させてる
for epoch in range(epochs_num):
for i in range(int(training_size / batch_size)):
optimizer.zero_grad()#勾配初期化
batch_seq, batch_labels = mkRandomBatch(seq,labels,batch_size,training_size,seq_length)
y_pred = model(batch_seq)
single_loss = criterion(y_pred, batch_labels)
single_loss.backward()
optimizer.step()
losses.append(single_loss.item())
print(f'epoch: {i}, loss : {single_loss.item()}')
test_inputs = train_data_normalized[-seq_length:].tolist()
#予測するフェーズ
model.eval()# モデルを評価モードとする
test_outputs = []# 予測値を入れるリスト
for i in range(pred_frames):
seq = torch.FloatTensor(test_inputs[-seq_length:])
seq = torch.unsqueeze(seq, 0)
seq = seq.to(device)
with torch.no_grad():
test_inputs.append(test_data_normalized.tolist()[i])#test_inputにどんどん追加
test_outputs.append(model(seq).item())
#予測結果の整形
#列方向に同じ値を追加して(20, 129)にする
np_test_outputs = np.array(test_outputs).reshape(-1,1)
np_test_outputs2 = np.hstack((np_test_outputs, np_test_outputs))
np_test_outputs3 = np.hstack((np_test_outputs2, np_test_outputs))
np_test_outputs4 = np.hstack((np_test_outputs3, np_test_outputs))
np_test_outputs5 = np.hstack((np_test_outputs4, np_test_outputs))
#....長いので省略
np_test_outputs128 = np.hstack((np_test_outputs127, np_test_outputs))
np_test_outputs129 = np.hstack((np_test_outputs128, np_test_outputs))
#予測値test_outputsを正規化したデータから元のデータに戻した配列
actual_predictions = scaler.inverse_transform(np_test_outputs129)
#予測結果グラフ表示のための準備-------------------------
#testdataの時間を1ずつ並べた配列
x = np.arange(data_length-pred_frames, data_length, 1)
#予測結果表示ーーーーーーーーーーーーーーーーーーーーー
fig_size = plt.rcParams['figure.figsize']#グラフのサイズを決める変数
fig_size[0] = 5
fig_size[1] = 5
plt.rcParams['figure.figsize'] = fig_size
plt.title('Number of AU6')#グラフタイトル
plt.ylabel('AU6')#y軸の値
plt.grid(True)#グラフにグリッド線を引く
plt.autoscale(axis='x', tight=True)#グラフの自動スケーリング
a=df['AU06_r']
b=a.values
plt.plot(b,label='jissokuti')#実測値の値を持ってくる
plt.plot(x, actual_predictions[:,-1], label='yosokuti')#xの範囲にactual_predictions[:,-1]の要素を配置する
plt.legend()#凡例を表示してくれる
plt.show()
試したこと
一つの時系列データを学習データとテストデータに分けて,予想を行うLSTMの実装

回答1件
あなたの回答
tips
プレビュー
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2022/12/08 12:17
2022/12/08 13:14
2022/12/09 01:40 編集
2022/12/09 04:01