장난감 프로젝트

[Python]주식 데이터 시각화

easysheep 2023. 10. 26. 16:56

0. 목적

https://www.data.go.kr/ 에서 주식데이터를 가지고 와서 Grafana를 이용하여 시각화를 해보자

1. Data 추출 및 저장 코드 및 도커파일 , requirements.txt

import requests
import pandas as pd
import time
from sqlalchemy import create_engine,text
from datetime import datetime, timedelta
from bs4 import BeautifulSoup as bs

def getStockCode():
    """
    현재 있는 모든 종목의 종목명 , isin Code , 종목 코드를 크롤링 하여 가지고 Json형식으로 반환하는 함수 
    request와 BeautifulSoup4를 사용하였다.
    """

    # requests를 이용하여 해당 페이지를 가지고 온다.
    page = requests.get("https://www.ktb.co.kr/trading/popup/itemPop.jspx")
    # BeautifulSoup4를 이용하여 해당 페이지의 html 내용을 전부 가지고 온다.
    soup = bs(page.text ,"html.parser")
    # 그 중 tbody.tbody_content  중 tr 태그 중 td 중 a 태그 안에 있는 데이터를 전부 가지도 온다.
    elements = soup.select("tbody.tbody_content tr td a")
    stockCodeJson = {}


    for e in elements:
        # 필요한 데이터만 추출 하여 dict 형식으로 저장한다.
        data = str(e).split(",")
        code , codeName, isincode = data[1][1:-1] , data[2][1:-1].strip(), data[-1][1:13]
        stockCodeJson[code] = (codeName,isincode)
    return stockCodeJson

def saveStockCode(stockCodeJson:dict ,db_connection):
    """
    json 변수와 sqlalchemy의 create_engine으로 생성한 객체를 인수로 받아서 Json데이터를 MySql에 저장하는 함수
    """
    # 데이터를 넣기 편하게 Pandas DataFrame 으로 변환
    stockCodeDf = pd.DataFrame(stockCodeJson).T.reset_index()
    # 컬럼명 설정
    stockCodeDf.columns = ['srtnCd','itmsNm','isinCd']
    # 과거 종목 정보 데이터 버리기
    with db_connection.connect() as conn:
        conn.execute(text('TRUNCATE TABLE stock_code;'))
        conn.commit()
    # 데이터 Mysql에 넣기
    stockCodeDf.to_sql(name='stock_code', con=db_connection, if_exists='append',index=False)  



def getStockInfo(db_connection):
    ## 날짜 가져오기
    last_day = getLastDay(db_connection)

    # request data to API
    headers = {'Content-Type': 'application/json', 'charset': 'UTF-8', 'Accept': '*/*'}
    # for school
    # key_path = "/Users/c05/Desktop/learn/web/PipeLine_Project/key.txt"
    # for home
    # key_path = "C:\\Users\\AW17R4\\.appkey\\open_stock_api_key.txt"
    # for ec2
    key_path = "./key.txt"
    url = "https://apis.data.go.kr/1160100/service/GetStockSecuritiesInfoService/getStockPriceInfo"


    with open(key_path,'r',encoding="UTF-8") as f:
        key = f.readline()

    params = {'serviceKey' : key
            , 'numOfRows' : 10000
            , 'pageNo' : 1
            , 'resultType' : "json"
            , 'beginBasDt' : last_day
            }
    # api 에 데이터 요청
    response = requests.get(url ,params=params)
    total_count = response.json()['response']['body']['totalCount']
    if total_count == 0 :
        print("Nothing to Update")
        return 0
    df = pd.DataFrame(response.json()['response']['body']['items']['item'])
    df.to_sql(name='stockinfotable', con=db_connection, if_exists='append',index=False)  
    if total_count > params['numOfRows']:
        for i in range(2,total_count//10000+1):
            params['pageNo'] = i
            errorCount = 1
            while True:
                try:
                    response = requests.get(url ,params=params,verify=True)
                    temp_df =  pd.DataFrame(response.json()['response']['body']['items']['item'])
                    temp_df.to_sql(name='stockinfotable', con=db_connection, if_exists='append',index=False)
                    break
                except Exception:
                    time.sleep(5)
                    print(f'error:{errorCount}')
                    if errorCount==10:
                        print(f"Error: {temp_df}")
                        break
                    continue
            time.sleep(0.5)



def getDatabaseConnection():
    # connect to  MySql DataBase
    # db_connection_str = 'mysql+pymysql://root:@localhost/stockDB'
    # for home
    # db_connection_str = 'mysql+pymysql://root:1234@localhost/stock_db'
    # for ec2
    db_connection_str = 'mysql+pymysql://stock_user:1234asde@172.31.13.248/stockDB'
    db_connection = create_engine(db_connection_str)
    conn = db_connection.connect()
    return db_connection ,conn


def getLastDay(db_connection):
    # databas에서 제일 마지막에 저장된 날짜를 기자고 오기
    with db_connection.connect() as conn:
        last_day = pd.read_sql_query(text("SELECT basDt FROM stockDB.stockinfotable ORDER BY basDt DESC LIMIT 1;"), conn)
    start_day = (last_day + timedelta(days=1))['basDt'][0]
    result  = str(start_day.date()).replace("-","")
    return result



if __name__ == "__main__":
    db_connection,conn = getDatabaseConnection()
    getStockInfo(db_connection)
    stockCodeJson = getStockCode()
    saveStockCode(stockCodeJson,db_connection)
    conn.close()
# docker file

#사용할 베이스 도커 이미지
FROM python:3.9.18

# 해당 디렉토리로 이동
## copy . . 과 같이 쓰면 : 이미지를 빌드한 디렉터리의 모든 파일을 컨테이너의 /usr/src/app 디렉터리로 복사
WORKDIR /usr/src/app
COPY . .

# python pip 설치 후 필수 라이브러리 설치 
RUN python -m pip install --upgrade pip 
RUN pip install -r requirements.txt



CMD python daily.py

beautifulsoup4==4.12.2
bs4==0.0.1
certifi==2023.7.22
cffi==1.16.0
charset-normalizer==3.3.0
cryptography==41.0.4
greenlet==3.0.0
idna==3.4
numpy==1.26.0
pandas==2.1.1
pycparser==2.21
PyMySQL==1.1.0
python-dateutil==2.8.2
pytz==2023.3.post1
requests==2.31.0
six==1.16.0
soupsieve==2.5
SQLAlchemy==2.0.21
typing_extensions==4.8.0
tzdata==2023.3
urllib3==2.0.6

2.주식 값 예측저장 코드 및 도커파일 , requirements.txt

from sqlalchemy import create_engine,text
import pandas as pd
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from datetime import datetime
from sklearn.metrics import mean_absolute_error
import warnings
warnings.filterwarnings('ignore')




def getDataFromDB():
    db_connection_str = 'mysql+pymysql://stock_user:1234asde@172.31.13.248/stockDB'
    db_connection = create_engine(db_connection_str)
    with db_connection.connect() as conn:
        data_df = pd.read_sql_table("stockinfotable", conn)
    return data_df , db_connection

def getCodeList(data_df):
    return list(data_df['srtnCd'].unique())


def predictFutureStockPrice(srtnCd:str ,conn ,data_df):


    db_connection_str = 'mysql+pymysql://stock_user:1234asde@172.31.13.248/stockDB'
    db_connection = create_engine(db_connection_str, pool_pre_ping=True)

    today = datetime.today().strftime("%Y%m%d")
    x_cols = ['clpr','vs','fltRt','mkp','lopr','trqu','lstgStCnt']
    y_col = 'tmkp'

    all_data_df = data_df.loc[data_df['srtnCd']==srtnCd].loc[:,['clpr','vs','fltRt','mkp','lopr','trqu','lstgStCnt']]
    data_count = len(all_data_df)

    if data_count < 100:
        return 0

    all_data_df['tmkp'] = 0 
    all_data_df.iloc[:data_count-1,-1] = all_data_df.iloc[1:,3]

    today_data_x = pd.DataFrame(all_data_df.iloc[-1,:-1])
    all_data_df = all_data_df[:data_count-1]

    ss = StandardScaler()

    x_df = ss.fit_transform(all_data_df.loc[:,x_cols])
    today_data_x = ss.fit_transform(today_data_x)
    y_df = all_data_df.loc[:,y_col]

    x_train , x_test  = train_test_split(x_df,test_size=0.3,shuffle=False)
    y_train , y_test = train_test_split(y_df,test_size=0.3,shuffle=False)

    model = XGBRegressor()

    model.fit(x_train , y_train)
    y_pred = model.predict(x_test)

    mae = mean_absolute_error(y_test ,y_pred)

    result =model.predict(today_data_x.reshape(1,-1))

    result = np.round(result)

    pred_df = pd.DataFrame([today,srtnCd,result[0],mae]).T
    pred_df.columns = ['basDt','srtnCd','mkp','mae']


    db_connection_str = 'mysql+pymysql://stock_user:1234asde@172.31.13.248/stockDB'
    db_connection = create_engine(db_connection_str, pool_pre_ping=True)

    try:
        pred_df.to_sql(name='stockpredicttable' , con=conn , if_exists='append',index=False)
    except:
        print("Error or Already predict Check DB")
        return 1
    return 0


if __name__ == "__main__":
    data_df,db_connection = getDataFromDB()

    code_list = getCodeList(data_df)

    for code in code_list:
        if predictFutureStockPrice(code,db_connection,data_df) == 1:
            break
# docker file

#사용할 베이스 도커 이미지
FROM python:3.9.18

# 해당 디렉토리로 이동
## copy . . 과 같이 쓰면 : 이미지를 빌드한 디렉터리의 모든 파일을 컨테이너의 /usr/src/app 디렉터리로 복사
WORKDIR /usr/src/app
COPY . .

# python pip 설치 후 필수 라이브러리 설치 
RUN python -m pip install --upgrade pip 
RUN pip install -r requirements.txt



CMD python predict.py

greenlet==3.0.0
joblib==1.3.2
numpy==1.26.1
pandas==2.1.1
PyMySQL==1.1.0
python-dateutil==2.8.2
pytz==2023.3.post1
scikit-learn==1.3.1
scipy==1.11.3
six==1.16.0
SQLAlchemy==2.0.22
threadpoolctl==3.2.0
typing_extensions==4.8.0
tzdata==2023.3
xgboost==2.0.0

 

3. 결과

그라파나 연동은 다음 링크에서 https://mydevjourney.tistory.com/90