RNN 을 사용하여 시계열 데이터인 주식을 예측 해보자
import pandas as pd
import matplotlib.pyplot as plt
# 테스트 데이터와 트레인 데이터 홀드 아웃 할 때 사용하는 함수
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras.models import Model
import sklearn as sns
# 사용할 변수 정의
batch_size = 100
path = "/content/drive/MyDrive/TensorFlow/dataset/stock_daily.csv"
df = pd.read_csv(path,header=1)
불러온 데이터의 값은 다음과 같다.
df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 732 entries, 0 to 731
Data columns (total 5 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 # Open 732 non-null float64
1 High 732 non-null float64
2 Low 732 non-null float64
3 Volume 732 non-null int64
4 Close 732 non-null float64
dtypes: float64(4), int64(1)
memory usage: 28.7 KB
- open : 시작가
- high : 제일 높았을 때
- low: 제일 낮았을 때
- Volume : 거래량
- close : 마감가
이 값중 예측 하려는 값은 Close 이므로 따로 저장한다.
dataset=df.copy()
ori_y=dataset.pop("Close")
ori_x=dataset.copy()
# x_dim = 4 (속성 개수 )
# 시계열 데이터이므로 shuffle= False
# train_test_split을 이용하여 train, test,validation 데이터를 분리 해준다.
train_x ,test_x ,train_y,test_y = train_test_split(ori_x ,ori_y , test_size = 0.2 , shuffle= False)
train_x ,val_x ,train_y,val_y = train_test_split(train_x ,train_y , test_size = 0.1 , shuffle=False)
train 데이터를 확인하면 다음과 같이 Volume 값과 그 외의 값들이 크기의 차이가 많이 나는 것을 알 수있다.
이 데이터를 그냥 쓰면 Volume 값에 영향을 많이 받게 되므로 min_max 정규화를 실시한다.
# train_x의 평균 , 표준 편차, 최대, 최소값 등을 df형태로 저장한다.
train_x_des = train_x.describe()
# train_x_des를 tranpose(행,렬 변환) 해준다.
train_x_des = train_x_des.T
print(train_x_des)
count mean std min 25% \
# Open 526.0 6.856973e+02 94.555220 507.252283 6.103425e+02
High 526.0 6.912186e+02 94.682919 511.092313 6.165950e+02
Low 526.0 6.797701e+02 94.269379 501.202274 6.000700e+02
Volume 526.0 1.911786e+06 998970.828437 527200.000000 1.317550e+06
50% 75% max
# Open 7.162900e+02 7.665200e+02 8.378100e+02
High 7.215700e+02 7.699375e+02 8.419500e+02
Low 7.081200e+02 7.580900e+02 8.283500e+02
Volume 1.664050e+06 2.134750e+06 1.116490e+07
min_max 정규화 및 TimeseriesGenerator 생성
# min_max 정규화
def min_max_norm(x):
return ((x - dataset_des['min'])/(dataset_des['max'] - dataset_des['min']))
# 기본 정규화
def standarc_norm(x):
return (x - dataset_des['mean']) / dataset_des['std']
# https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/sequence/TimeseriesGenerator 참고
# tf.keras.preprocessing.sequence.TimeseriesGenerator(data, targets, length, sampling_rate=1, stride=1, start_index=0, end_index=None, shuffle=False, reverse=False, batch_size=128)
# tf.keras.preprocessing.sequence.TimeseriesGenerator 에 넣기 위해 데이터를 list 또는 Numpy로 변환 , 변환하는 과정에서 정규화 실시
# min_max 정규화
min_max_norm_train_x = min_max_norm(train_x)
min_max_norm_test_x = min_max_norm(test_x)
min_max_norm_val_x = min_max_norm(val_x)
# list로 변환
min_max_norm_train_x = min_max_norm_train_x.values.tolist()
min_max_norm_test_x = min_max_norm_test_x.values.tolist()
min_max_norm_val_x = min_max_norm_val_x.values.tolist()
data_gen_train = \
tf.keras.preprocessing.sequence.TimeseriesGenerator(min_max_norm_train_x ,
train_y.values.tolist() ,
length = 4 ,
batch_size = batch_size)
data_gen_test = \
tf.keras.preprocessing.sequence.TimeseriesGenerator(min_max_norm_test_x ,
test_y.values.tolist() ,
length = 4 ,
batch_size = batch_size)
data_gen_val = \
tf.keras.preprocessing.sequence.TimeseriesGenerator(min_max_norm_val_x ,
val_y.values.tolist() ,
length = 4 ,
batch_size = batch_size)
이제 모델을 생성하는 데 activation_function 을 2가지를 사용할 것 이다.
하나는 tanh 다른 하나는 relu이다.
# pytorch 와 유사한 방법인 Subclassing API 로 구현
class RnnModel(Model):
def __init__(self , rnn_activation_function):
# 부모 클래스의 생성자 실행
super(RnnModel , self).__init__()
# SimpleRNN(units)
self.rnn_layer = layers.SimpleRNN(128 , activation = rnn_activation_function)
self.dense_list = [layers.Dense(int(128/ 2**i ),activation = 'relu') if i != 4 else layers.Dense(1,activation = None) for i in range(1,5) ]
# Functional 모델 처럼 정의
def call(self , x):
x = self.rnn_layer(x)
for dense in self.dense_list:
x = dense(x)
return x
# 모델 요약을 출력
def summary(self):
inputs = layers.Input((5,4))
Model(inputs,self.call(inputs)).summary()
rnn_model_tanh = RnnModel('tanh')
rnn_model_relu = RnnModel('relu')
rnn_model_tanh.build(input_shape = (None,5,4))
rnn_model_relu.build(input_shape = (None,5,4))
# 모델 요약
rnn_model_tanh.summary()
rnn_model_relu.summary()
# 손실 함수 mse
loss_function=tf.keras.losses.mean_squared_error
#정규화기(최적화 알고리즘) Adam
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001)
# 평가 지표 mae
metrics=tf.keras.metrics.mean_absolute_error
rnn_model_tanh.compile(loss=loss_function,
optimizer=optimizer,
metrics=[metrics])
rnn_model_relu.compile(loss=loss_function,
optimizer=optimizer,
metrics=[metrics])
생성된 2개의 모델은 다음과 같다.
Model: "model_6"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_7 (InputLayer) [(None, 5, 4)] 0
simple_rnn_6 (SimpleRNN) (None, 128) 17024
dense_24 (Dense) (None, 64) 8256
dense_25 (Dense) (None, 32) 2080
dense_26 (Dense) (None, 16) 528
dense_27 (Dense) (None, 1) 17
=================================================================
Total params: 27,905
Trainable params: 27,905
Non-trainable params: 0
_________________________________________________________________
Model: "model_7"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_8 (InputLayer) [(None, 5, 4)] 0
simple_rnn_7 (SimpleRNN) (None, 128) 17024
dense_28 (Dense) (None, 64) 8256
dense_29 (Dense) (None, 32) 2080
dense_30 (Dense) (None, 16) 528
dense_31 (Dense) (None, 1) 17
=================================================================
Total params: 27,905
Trainable params: 27,905
Non-trainable params: 0
_________________________________________________________________
모델을 위에서 정의한 TimeseriesGenerator 를 이용하여 훈련시킨다.
# 인수 참고 https://keras.io/ko/models/sequential/
# 모델 훈련
history_tanh = rnn_model_tanh.fit(
data_gen_train, # 데이터 셋
validation_data=data_gen_val,
steps_per_epoch=len(train_x)/batch_size, # 한 세대의 종료를 선언하고 다음 세대를 시작하기까지 단계(샘플 배치)의 총 개수
epochs=100, # 반복 횟수
validation_freq = 1
)
# 모델 훈련
history_relu = rnn_model_relu.fit(
data_gen_train,
validation_data=data_gen_val,
steps_per_epoch=len(train_x)/batch_size,
epochs=100,
validation_freq = 1
)
만들어진 모델을 평가한다.
# data_gen_test에서 test x,y 데이터를 가지고 온다.
test_data_x, test_data_y=data_gen_test[0]
# 예상한 값을 test_data_y와 비교해 주시기 위해 평탄화(1차원)해준다.
prediction_tanh_y=rnn_model_tanh.predict(test_data_x).flatten()
prediction_relu_y=rnn_model_relu.predict(test_data_x).flatten()
test_y=test_data_y.flatten()
# plot.으로 나타내기 위해 다음과 같이한다.
time = range(1, len(test_y) + 1)
plt.plot(time, test_y, 'r', label='ture')
plt.plot(time, prediction_tanh_y, 'b', label='tanh')
plt.title('stock prediction')
plt.xlabel('time')
plt.ylabel('value')
plt.legend()
plt.show()
plt.plot(time, test_y, 'r', label='ture')
plt.plot(time, prediction_relu_y, 'g', label='relu')
plt.title('stock prediction')
plt.xlabel('time')
plt.ylabel('value')
plt.legend()
plt.show()
loss_tanh = history_tanh.history['loss']
loss_relu = history_relu.history['loss']
epochs = range(1, len(loss_tanh) + 1)
plt.plot(epochs, loss_tanh, 'bo', label='loss_tanh')
plt.plot(epochs, loss_relu, 'ro', label='loss_relu')
plt.title('relu and tanh loss')
plt.legend()
plt.show()
mae_tanh = history_tanh.history['mean_absolute_error']
mae_relu = history_relu.history['mean_absolute_error']
plt.plot(epochs, mae_tanh, 'bo', label='mae_tanh')
plt.plot(epochs, mae_relu, 'ro', label='mae_relu')
plt.title('relu and tanh mae')
plt.legend()
plt.show()
역시 tanh의 gradient vanishing 문제점을 해결 하려고 나온 relu 답게 더 빠른 훈련 속도를 보인다. 하지만 실제 값과 차이는 tanh보다 더 크다. 다음에는 Leaky ReLU를 사용해 봐야겠다.