파이썬/Tensorflow

RNN_주식 예측

easysheep 2023. 2. 3. 22:34

RNN 을 사용하여 시계열 데이터인 주식을 예측 해보자 

import pandas as pd
import matplotlib.pyplot as plt
# 테스트 데이터와 트레인 데이터 홀드 아웃 할 때 사용하는 함수
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras.models import Model

import sklearn as sns

# 사용할 변수 정의 
batch_size = 100
path  = "/content/drive/MyDrive/TensorFlow/dataset/stock_daily.csv"
df = pd.read_csv(path,header=1)

불러온 데이터의 값은 다음과 같다.

df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 732 entries, 0 to 731
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   # Open  732 non-null    float64
 1   High    732 non-null    float64
 2   Low     732 non-null    float64
 3   Volume  732 non-null    int64  
 4   Close   732 non-null    float64
dtypes: float64(4), int64(1)
memory usage: 28.7 KB
  • open : 시작가
  • high : 제일 높았을 때
  • low: 제일 낮았을 때
  • Volume : 거래량
  • close : 마감가

이 값중 예측 하려는 값은 Close 이므로 따로 저장한다.

dataset=df.copy()
ori_y=dataset.pop("Close")
ori_x=dataset.copy()
# x_dim = 4 (속성 개수 )
# 시계열 데이터이므로  shuffle= False
# train_test_split을 이용하여 train, test,validation 데이터를 분리 해준다.
train_x ,test_x ,train_y,test_y = train_test_split(ori_x ,ori_y , test_size = 0.2 , shuffle= False)
train_x ,val_x ,train_y,val_y = train_test_split(train_x ,train_y , test_size = 0.1 ,  shuffle=False)

train 데이터를 확인하면 다음과 같이 Volume 값과 그 외의 값들이 크기의 차이가 많이 나는 것을 알 수있다.

이 데이터를 그냥 쓰면 Volume 값에 영향을 많이 받게 되므로 min_max 정규화를 실시한다.

# train_x의 평균 , 표준 편차, 최대, 최소값 등을 df형태로 저장한다.
train_x_des = train_x.describe()
# train_x_des를 tranpose(행,렬 변환) 해준다.
train_x_des = train_x_des.T
print(train_x_des)

        count          mean            std            min           25%  \
# Open  526.0  6.856973e+02      94.555220     507.252283  6.103425e+02   
High    526.0  6.912186e+02      94.682919     511.092313  6.165950e+02   
Low     526.0  6.797701e+02      94.269379     501.202274  6.000700e+02   
Volume  526.0  1.911786e+06  998970.828437  527200.000000  1.317550e+06   

                 50%           75%           max  
# Open  7.162900e+02  7.665200e+02  8.378100e+02  
High    7.215700e+02  7.699375e+02  8.419500e+02  
Low     7.081200e+02  7.580900e+02  8.283500e+02  
Volume  1.664050e+06  2.134750e+06  1.116490e+07

min_max 정규화 및 TimeseriesGenerator 생성

# min_max 정규화
def min_max_norm(x):
  return ((x - dataset_des['min'])/(dataset_des['max'] - dataset_des['min']))
# 기본 정규화
def standarc_norm(x):
  return (x - dataset_des['mean']) / dataset_des['std']

# https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/sequence/TimeseriesGenerator 참고
# tf.keras.preprocessing.sequence.TimeseriesGenerator(data, targets, length, sampling_rate=1, stride=1, start_index=0, end_index=None, shuffle=False, reverse=False, batch_size=128)
# tf.keras.preprocessing.sequence.TimeseriesGenerator 에 넣기 위해 데이터를 list 또는 Numpy로 변환 , 변환하는 과정에서 정규화 실시 

# min_max 정규화
min_max_norm_train_x = min_max_norm(train_x)
min_max_norm_test_x = min_max_norm(test_x)
min_max_norm_val_x = min_max_norm(val_x)
# list로 변환 
min_max_norm_train_x = min_max_norm_train_x.values.tolist()
min_max_norm_test_x = min_max_norm_test_x.values.tolist()
min_max_norm_val_x = min_max_norm_val_x.values.tolist()

data_gen_train = \
tf.keras.preprocessing.sequence.TimeseriesGenerator(min_max_norm_train_x ,
                                                    train_y.values.tolist() ,
                                                    length = 4 , 
                                                    batch_size = batch_size)
data_gen_test = \
tf.keras.preprocessing.sequence.TimeseriesGenerator(min_max_norm_test_x ,
                                                    test_y.values.tolist() ,
                                                    length = 4 , 
                                                    batch_size = batch_size)
data_gen_val = \
tf.keras.preprocessing.sequence.TimeseriesGenerator(min_max_norm_val_x ,
                                                    val_y.values.tolist() ,
                                                    length = 4 , 
                                                    batch_size = batch_size)

이제 모델을 생성하는 데 activation_function 을 2가지를 사용할 것 이다.

하나는 tanh 다른 하나는 relu이다.

 

# pytorch 와 유사한 방법인 Subclassing API 로 구현

class RnnModel(Model):
  def __init__(self , rnn_activation_function):
    # 부모 클래스의 생성자 실행
    super(RnnModel , self).__init__()
    # SimpleRNN(units)
    self.rnn_layer = layers.SimpleRNN(128 , activation = rnn_activation_function)
    self.dense_list = [layers.Dense(int(128/ 2**i ),activation = 'relu') if i != 4 else layers.Dense(1,activation = None)  for i in range(1,5) ]

    

# Functional 모델 처럼 정의
  def call(self , x):
    x = self.rnn_layer(x)
    for dense in self.dense_list:
      x = dense(x)
    return x

# 모델 요약을 출력
  def summary(self):
    inputs = layers.Input((5,4))
    Model(inputs,self.call(inputs)).summary()

rnn_model_tanh  = RnnModel('tanh')
rnn_model_relu  = RnnModel('relu')
rnn_model_tanh.build(input_shape =  (None,5,4))
rnn_model_relu.build(input_shape =  (None,5,4))


# 모델 요약
rnn_model_tanh.summary()
rnn_model_relu.summary()



# 손실 함수 mse
loss_function=tf.keras.losses.mean_squared_error

#정규화기(최적화 알고리즘) Adam
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001)

# 평가 지표 mae
metrics=tf.keras.metrics.mean_absolute_error


rnn_model_tanh.compile(loss=loss_function,
              optimizer=optimizer,
              metrics=[metrics])


rnn_model_relu.compile(loss=loss_function,
              optimizer=optimizer,
              metrics=[metrics])

생성된 2개의 모델은 다음과 같다.

Model: "model_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 input_7 (InputLayer)        [(None, 5, 4)]            0         
                                                                 
 simple_rnn_6 (SimpleRNN)    (None, 128)               17024     
                                                                 
 dense_24 (Dense)            (None, 64)                8256      
                                                                 
 dense_25 (Dense)            (None, 32)                2080      
                                                                 
 dense_26 (Dense)            (None, 16)                528       
                                                                 
 dense_27 (Dense)            (None, 1)                 17        
                                                                 
=================================================================
Total params: 27,905
Trainable params: 27,905
Non-trainable params: 0
_________________________________________________________________
Model: "model_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 input_8 (InputLayer)        [(None, 5, 4)]            0         
                                                                 
 simple_rnn_7 (SimpleRNN)    (None, 128)               17024     
                                                                 
 dense_28 (Dense)            (None, 64)                8256      
                                                                 
 dense_29 (Dense)            (None, 32)                2080      
                                                                 
 dense_30 (Dense)            (None, 16)                528       
                                                                 
 dense_31 (Dense)            (None, 1)                 17        
                                                                 
=================================================================
Total params: 27,905
Trainable params: 27,905
Non-trainable params: 0
_________________________________________________________________

모델을 위에서 정의한 TimeseriesGenerator 를 이용하여 훈련시킨다.

# 인수 참고 https://keras.io/ko/models/sequential/

# 모델 훈련
history_tanh = rnn_model_tanh.fit(
      data_gen_train, # 데이터 셋
      validation_data=data_gen_val,
      steps_per_epoch=len(train_x)/batch_size, # 한 세대의 종료를 선언하고 다음 세대를 시작하기까지 단계(샘플 배치)의 총 개수 
      epochs=100, # 반복 횟수
      validation_freq = 1
)
# 모델 훈련
history_relu = rnn_model_relu.fit(
      data_gen_train,
      validation_data=data_gen_val,
      steps_per_epoch=len(train_x)/batch_size,
      epochs=100,
      validation_freq = 1
)

만들어진 모델을 평가한다.

# data_gen_test에서 test x,y 데이터를 가지고 온다.
test_data_x, test_data_y=data_gen_test[0]
# 예상한 값을 test_data_y와 비교해 주시기 위해 평탄화(1차원)해준다.
prediction_tanh_y=rnn_model_tanh.predict(test_data_x).flatten()
prediction_relu_y=rnn_model_relu.predict(test_data_x).flatten()
test_y=test_data_y.flatten()

# plot.으로 나타내기 위해 다음과 같이한다.
time = range(1, len(test_y) + 1)
plt.plot(time, test_y, 'r', label='ture')
plt.plot(time, prediction_tanh_y, 'b', label='tanh')

plt.title('stock prediction')
plt.xlabel('time')
plt.ylabel('value')
plt.legend()
plt.show()

plt.plot(time, test_y, 'r', label='ture')
plt.plot(time, prediction_relu_y, 'g', label='relu')

plt.title('stock prediction')
plt.xlabel('time')
plt.ylabel('value')
plt.legend()
plt.show()

loss_tanh = history_tanh.history['loss']
loss_relu = history_relu.history['loss']


epochs = range(1, len(loss_tanh) + 1)

plt.plot(epochs, loss_tanh, 'bo', label='loss_tanh')
plt.plot(epochs, loss_relu, 'ro', label='loss_relu')
plt.title('relu and tanh loss')
plt.legend()
plt.show()

mae_tanh = history_tanh.history['mean_absolute_error']
mae_relu = history_relu.history['mean_absolute_error']


plt.plot(epochs, mae_tanh, 'bo', label='mae_tanh')
plt.plot(epochs, mae_relu, 'ro', label='mae_relu')
plt.title('relu and tanh mae')
plt.legend()
plt.show()

역시 tanh의 gradient vanishing 문제점을 해결 하려고 나온 relu 답게 더 빠른 훈련 속도를 보인다.  하지만 실제 값과 차이는 tanh보다 더 크다.  다음에는 Leaky ReLU를 사용해 봐야겠다.