프로젝트/미분류 프로젝트

[TensorFlow] 비트코인 예측 딥러닝(LSTM) 모델 PROC 1

모영이 2021. 4. 9. 01:00

TensorFlow 생활코딩 강의를 듣고나서, 주가 예측에 대해 찾아보았고 이를 비트코인(가상화폐)에 적용시키면 어떨까 싶었습니다. 사실 돈복사 기계를 만들고 싶었지만, 돈잃는 기계를 만들어버렸습니다. 기록을 정리해보겠습니다. 

 

아래의 블로그에서 거의 다 가져왔습니다.  90%가 복붙된 코드입니다.

딥러닝(LSTM) 삼성전자 주가 예측 블로그

 

구글의 Colaboratory로 개발했습니다.

 

import 하는 모듈

import requests
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Conv1D, Lambda
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import os
import datetime
import numpy as np

 

업비트에서 1분 단위 시가, 고가, 저가, 종가, 누적거래량, 누적거래금액를 가져왔습니다.

참고로 SC(시아코인)의 데이터를 가져왔는데 이렇게 정한 이유가, 제가 시아코인에 돈이 박혀있기 때문입니다..!

UNIT = '1'
MARKET = 'KRW-SC'

data_columns = {
    'market': 'Market',
    'candle_date_time_utc': 'Date',
    'opening_price': 'Open',
    'high_price': 'High',
    'low_price': 'Low',
    'trade_price': 'Close',
    'candle_acc_trade_volume': 'Volume',
    'candle_acc_trade_price' : 'Price',
}

# str UTC DATETIME
def get_query_datetime():
    now = datetime.datetime.now() #- datetime.timedelta(hours=9)
    now = str(now.strftime('%Y-%m-%d %H:%M:%S'))
    now = now.replace(' ', 'T') + 'Z'
    return now


# 크롤링한 데이터 전처리
def preprocessing_crawled_data(datas: str):
    array_dict_datas = []
    array_datas = datas.split('},')

    for data in array_datas:
        array_data = []
        data = data.replace('{', '')
        data = data.replace('}', '')
        array_data = data.split(',')

        dict_data = {}
        for value in array_data:
            tmp = value.split(':', 1)
            if tmp[0][1: -1] in data_columns:
                dict_data[data_columns[tmp[0][1: -1]]] = tmp[1]

        dict_data['Market'] = dict_data['Market'][1: -1]
        dict_data['Date'] = dict_data['Date'][1: -1]
        dict_data['Open'] = float(dict_data['Open'])
        dict_data['High'] = float(dict_data['High'])
        dict_data['Low'] = float(dict_data['Low'])
        dict_data['Close'] = float(dict_data['Close'])
        dict_data['Volume'] = float(dict_data['Volume'])
        dict_data['Price'] = float(dict_data['Price'])
        array_dict_datas.append(dict_data)

    return array_dict_datas


# 크롤링 후 df 리턴
def do_crawl(unit: str, market: str, time: str, count: str, loop: int):
    df = pd.DataFrame([], columns=['Date', 'Market', 'Open', 'High', 'Low', 'Close', 'Volume', 'Price'])
    url = "https://api.upbit.com/v1/candles/minutes/" + unit

    for _ in range(loop):
        querystring = {"market": market, "to": time, "count": count}
        response = requests.request("GET", url, params=querystring)
        df = df.append(preprocessing_crawled_data(response.text[1:-1]), ignore_index=True)
        time = df.iloc[-1]['Date'] + 'Z'
    df = df.loc[::-1].reset_index(drop=True)
    return df




# first 데이터 셋 만들기
df = do_crawl(UNIT, MARKET, get_query_datetime(), '200', 50)
df.to_csv("test.csv")

print(df)

 

종가의 그래프를 보여주는 부분입니다.

plt.figure(figsize=(16, 9))
sns.lineplot(y=df['Close'], x=df.index)
plt.xlabel('time')
plt.ylabel('price')

 

시가, 고가, 저가, 종가, 누적거래량, 누적거래금액 이 6가지 컬럼으로 예측을 진행할 것입니다. MinMaxScaler()로 데이터를 정규화 시켜주는 과정을 거칩니다. 정규화를 시켜주면 학습의 효율이 높아집니다.

scaler = MinMaxScaler()
# 스케일을 적용할 column을 정의합니다.
scale_cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'Price']
# 스케일 후 columns
scaled = scaler.fit_transform(df[scale_cols])

df = pd.DataFrame(scaled, columns=scale_cols)
print(df)

 

학습 데이터와 테스트 데이터로 쪼개줄건데, 10000개의 레코드를 90%를 학습에 사용하고 10%를 테스트에 사용하겠습니다.

x_train, x_test, y_train, y_test = train_test_split(df.drop('Close', 1), df['Close'], test_size=0.1, random_state=0, shuffle=False)
x_train.shape, y_train.shape
print(type(x_test))
print(x_test)

 

이게 데이터를 학습 모델에 넣을 수 있게 변환해주는 과정인데, 링크 걸어놓은 블로그에 유튜브를 보면 이해가 잘 됐습니다. 핵심은 100개의 데이터가 있다면, 몇개의 과거 데이터로 다음것을 예측할 것이냐. 그리고 batch_size를 뭐로 정할 것이냐. 입니다. 

def windowed_dataset(x, y, window_size, batch_size, shuffle):
    # X값 window dataset 구성
    ds_x = tf.data.Dataset.from_tensor_slices(x)
    ds_x = ds_x.window(window_size, shift=1, stride=1, drop_remainder=True)
    ds_x = ds_x.flat_map(lambda x: x.batch(window_size))
    # y값 추가
    ds_y = tf.data.Dataset.from_tensor_slices(y[window_size:])
    ds = tf.data.Dataset.zip((ds_x, ds_y))
    if shuffle:
        ds = ds.shuffle(1000)
    return ds.batch(batch_size).prefetch(1)

 

WINDOW_SIZE가 몇개의 과거 데이터를 가지고 다음 것을 예측할 것이냐이고, 

BATCH_SIZE가 한번에 통째로 넣긴 힘드니깐, 몇 개로 쪼개서 학습시킬거냐 이정도로 이해했습니다. 보통 32아니면 64를 넣는다고 합니다.

WINDOW_SIZE=5
BATCH_SIZE=32

 

이제 windowed_dataset함수를 통과시켜 데이터를 전처리 해줍니다.

train_data = windowed_dataset(x_train, y_train, WINDOW_SIZE, BATCH_SIZE, True)
test_data = windowed_dataset(x_test, y_test, WINDOW_SIZE, BATCH_SIZE, False)

 

모델을 선언하는 부분인데, 여기서 LSTM모델을 사용합니다. LSTM은 저도 아직 이해를 못해서, 설명을 못드리겠습니다. 하지만 구글에 검색해보면 이해할 수 있을 것입니다. 제가 이해한 정도는 RNN보다 LSTM을 쓰는게 주가 예측에선 좋다.

model = Sequential([
    # 1차원 feature map 생성
    Conv1D(filters=32, kernel_size=5,
           padding="causal",
           activation="relu",
           input_shape=[WINDOW_SIZE, 5]),
    # LSTM
    LSTM(16, activation='tanh'),
    Dense(16, activation="relu"),
    Dense(1),
])

# Sequence 학습에 비교적 좋은 퍼포먼스를 내는 Huber()를 사용합니다.
loss = Huber()
optimizer = Adam(0.0005)
model.compile(loss=Huber(), optimizer=optimizer, metrics=['mse'])

 

학습을 계속 시켜도 val_loss가 줄어들지 않는 경우가 발생하는데 35번동안 개선이 없으면 학습을 멈추는 부분입니다.

그리고 1000번을 학습시켜주었습니다. 학습은 179번하고 멈췄습니다.

# earlystopping은 35번 epoch통안 val_loss 개선이 없다면 학습을 멈춥니다.
earlystopping = EarlyStopping(monitor='val_loss', patience=35)
# val_loss 기준 체크포인터도 생성합니다.
filename = os.path.join('tmp', 'ckeckpointer.ckpt')
checkpoint = ModelCheckpoint(filename, 
                             save_weights_only=True, 
                             save_best_only=True, 
                             monitor='val_loss', 
                             verbose=1)
                             
history = model.fit(train_data, 
                    validation_data=(test_data), 
                    epochs=1000, 
                    callbacks=[checkpoint, earlystopping])

 

학습된 모델을 사용하는 부분입니다. predict를 사용하면 예측된 값을 반환합니다.

print("%.7f" % (float(min(history.history['val_loss']))))
model.load_weights(filename)
pred = model.predict(test_data)
actual = np.asarray(y_test)[WINDOW_SIZE:]
actual = np.reshape(actual, (len(actual), 1))
print(pred.shape)
print(actual.shape)

 

언뜻보면 잘 예측되는것 같아서 진짜 돈복사 기계의 탄생인 줄 알았습니다. 하지만 돈은 그렇게 쉽게 버는게 아니였습니다.

plt.figure(figsize=(10,10))
plt.plot(actual, label='actual')
plt.plot(pred, label='prediction')
plt.legend()
plt.show()

 

이제 실제 다음 1분뒤 결과를 예측을 해봅니다. 그리고 1000개의 데이터로 실제 투자를 진행했다면 얼마에 손익이 났는지 계산을 해봤습니다. 투자 매매 알고리즘은 무지성 알고리즘을 사용했는데, 무지성 알고리즘은 제가 그냥 만든겁니다. 단순히 예측한 증가 퍼센트가 평균 보다 크면 사서 1분뒤에 파는 알고리즘입니다. 그냥 1분뒤에 오를것이라고 예측하면 매매를 진행하는 알고리즘이라고 생각하면 됩니다.

live_df = do_crawl(UNIT, MARKET, get_query_datetime(), '200', 5)
print(datetime.datetime.now())
print(get_query_datetime())

scale_cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'Price']
scaled = scaler.fit_transform(live_df[scale_cols])
live_scaled_df = pd.DataFrame(scaled, columns=scale_cols)
live_x, live_y = live_scaled_df.drop('Close', 1), live_scaled_df['Close']
live_data = windowed_dataset(live_x, live_y, WINDOW_SIZE, BATCH_SIZE, False)

pred = model.predict(live_data)

# create empty table with 12 fields
trainPredict_dataset_like = np.zeros(shape=(len(pred), 6) )
# put the predicted values in the right field
trainPredict_dataset_like[:,0] = pred[:,0]
# inverse transform and then select the right field
pred = scaler.inverse_transform(trainPredict_dataset_like)[:,0]

def muzisung_buy(pred_percent: float, live_percent: float, money: float):
    global array_pred_percents
    array_pred_percents.sort()
    if pred_percent > np.mean(array_pred_percents):
        money = money + (money * live_percent / 100)
    return money

 

1000개의 데이터의 예측한 그래프입니다. 이렇게보면 돈을 진짜 많이 벌 수 있겠다 싶었습니다.

plt.figure(figsize=(20,10))
plt.plot(live_df['Close'][WINDOW_SIZE:], label='actual')
plt.plot(pred, label='prediction')
plt.legend()
plt.show()

 

10분뒤가 아니라 1분 뒤입니다. 보면 알겠듯이 이걸로 돈을 벌기 힘듭니다. 이번 테스트에서는 돈을 번 것으로 나왔지만, 다른 테스트를 많이 돌려봤는데 돈을 잃는 경우도 많았습니다. 제 생각에는 예측 모델을 좀 더 업그레이드 시키고, 매매 알고리즘을 좀 더 신경 써서 만든다면, 좀 더 좋은 결과를 낼 수 있지 않을까 싶습니다.

cnt_minus = 0
cnt_fault = 0
cnt_total = 0
seed_money = 1000000
print(len(live_df['Close'][WINDOW_SIZE:]))
print(len(pred))
origin_money = seed_money
origin_percent = (float(live_df['Close'][len(live_df['Close'])-1]) - float(live_df['Close'][WINDOW_SIZE])) / float(live_df['Close'][WINDOW_SIZE]) * 100
array_pred_percents = []

for indx in range(len(pred)):
    live_close = float(live_df['Close'][indx+WINDOW_SIZE])
    pred_close = float(pred[indx])
    pred_percent = (pred_close - live_close) / live_close * 100
    if pred_percent > 0:
        array_pred_percents.append(pred_percent)

    print("%d - (현재)%.2f -> (10분 뒤)%.2f" % (indx, live_close, pred_close), end='')
    print(" ==> (예측)%.2f%%" % (pred_percent), end='')
    
    if indx + WINDOW_SIZE + 1 < len(live_df['Close']):
        live_percent = (float(live_df['Close'][indx+WINDOW_SIZE+1]) - live_close) / live_close * 100
        print(" | (실제)%.2f%%  | " % (live_percent), end='')
        prev_money = seed_money
        next_money = muzisung_buy(pred_percent, live_percent, seed_money)
        print("[%d] -> [%d] == (%d)" % (int(seed_money), int(next_money), int(next_money - seed_money)))
        seed_money = next_money
    else:
        print()
print("(존버시) %d" % (int(origin_money + (origin_money * origin_percent / 100))))
print("(무지성) %d" % (int(seed_money)))
print(cnt_fault)

 

사실 그래프를 크게 봐서 예측이 되는 것 처럼 보였던 것 같습니다. 생각 날 때마다 계속 공부하면서 업데이트 해보겠습니다. 아직 원리도 하나도 모르고, 아무것도 아는게 없지만, 우선 따라하면서 배워보자라는 마인드로 시작했고 학습시켜보니깐 재미있었습니다.