앞 게시글에서 일주일 전 종가와 현재가를 비교하여 엑셀에 출력하는 프로그램을 만들었다.
이번 게시글에서는 1분단위로 거래량을 조회하여 거래량 급등종목을 찾을 예정이다.
다만, 업비트에서는 캔들 조회의 경우 초당 10회로 제한하고 있기 때문에 위 내용도 ChatGPT에게 전달하여 코드를 작성한다.
먼저, 업비트에서 KRW마켓에 상장된 종목들의 거래금액을 1분단위로 가져오는 코드를 ChatGPT에게 물었다.
업비트에서 모든 종목의 거래금액을 1분 단위로 가져오려면 pyupbit 라이브러리의 get_minutes_candles 함수를 사용할 수 있습니다. 이 함수는 특정 종목에 대한 1분봉 데이터를 제공합니다.
아래는 모든 KRW 마켓 종목에 대한 1분봉 데이터를 가져오는 간단한 예제 코드입니다. 코드를 실행하면 모든 KRW 마켓 종목에 대한 1분 단위로 현재까지의 거래금액이 출력됩니다.
import pyupbit
import pandas as pd
def get_all_krw_markets():
krw_markets = pyupbit.get_tickers("KRW")
return krw_markets
def get_minute_candles(symbol):
candles = pyupbit.get_minutes_candles(symbol, interval=1)
return candles
def main():
krw_markets = get_all_krw_markets()
for symbol in krw_markets:
try:
candles = get_minute_candles(symbol)
if candles:
df = pd.DataFrame(candles)
total_volume = df['volume'].sum()
print(f"{symbol}: {total_volume:.2f} KRW")
else:
print(f"No data available for {symbol}.")
except Exception as e:
print(f"Error for {symbol}: {e}")
if __name__ == "__main__":
main()
여기에 1초에 10개 종목만 조회하게 수정해달라고 요청하였으며 결과는 아래와 같다.
import pyupbit
import pandas as pd
import time
def get_all_krw_markets():
krw_markets = pyupbit.get_tickers("KRW")
return krw_markets
def get_minute_candles(symbol):
candles = pyupbit.get_minutes_candles(symbol, interval=1)
return candles
def main():
krw_markets = get_all_krw_markets()
# 10개씩 조회
batch_size = 10
for i in range(0, len(krw_markets), batch_size):
batch_symbols = krw_markets[i:i+batch_size]
for symbol in batch_symbols:
try:
candles = get_minute_candles(symbol)
if candles:
df = pd.DataFrame(candles)
total_volume = df['volume'].sum()
print(f"{symbol}: {total_volume:.2f} KRW")
else:
print(f"No data available for {symbol}.")
except Exception as e:
print(f"Error for {symbol}: {e}")
# 1초 간격으로 조회
time.sleep(1)
if __name__ == "__main__":
main()
나는 이제 이 결과를 내 코드에 맞게 수정해야 한다.
나는 이 데이터를 조회해서 어떻게 보여줄 것인지, 어떻게 활용할 것인지에 대해 생각해야 한다.
역시 처음엔 눈으로 보는게 가장 좋을 것이라고 생각하고 엑셀에 데이터를 통으로 내보내기로 한다.
먼저 get_all_krw_markets()함수와 get_minute_candles(symbol) 함수를 main.py 가장 아래에 추가해준다.
def get_all_krw_markets(self):
krw_markets = pyupbit.get_tickers("KRW")
return krw_markets
def get_minute_candles(self, symbol):
candles = pyupbit.get_minutes_candles(symbol, interval=1)
return candles
(비전공자로서 self가 의미하는 바가 뭔지 잘 모르겠으나 대충 전역변수를 의미하는 것 같아서 사용하고 있다. main.py에서 함수를 선언할 때는 항상 self를 넣어주고 있다.)
ChatGPT가 알려준 main함수의 내용을 get_minute_volume(self) 함수를 만들어 데이터를 추가해줬다.
기존 프로젝트에는 time 라이브러리가 없어 GPT가 알려준 코드대로 import도 해주었다.
def get_minute_volume(self):
krw_markets = self.get_all_krw_markets()
# 10개씩 조회
batch_size = 10
for i in range(0, len(krw_markets), batch_size):
batch_symbols = krw_markets[i:i + batch_size]
for symbol in batch_symbols:
try:
candles = self.get_minute_candles(symbol)
if candles:
df = pd.DataFrame(candles)
total_volume = df['volume'].sum()
print(f"{symbol}: {total_volume:.2f} KRW")
else:
print(f"No data available for {symbol}.")
except Exception as e:
print(f"Error for {symbol}: {e}")
# 1초 간격으로 조회
time.sleep(1)
여기까지 하고나니 뭔가 빠졌다. 데이터를 엑셀로 뽑고싶은데 print함수를 통해 콘솔에서 제공하고 있었다.
냅다 ChatGPT에게 이 데이터를 오늘날짜/조회시간/종목명/거래금액 으로 구분해서 바탕화면에 result.xlsx 파일에 입력해달라고 부탁했다.
import os
import pyupbit
import pandas as pd
from datetime import datetime
import time
def get_all_krw_markets():
krw_markets = pyupbit.get_tickers("KRW")
return krw_markets
def get_minute_candles(symbol):
candles = pyupbit.get_minutes_candles(symbol, interval=1)
return candles
def append_to_excel(file_path, data):
if not os.path.isfile(file_path):
df = pd.DataFrame(data)
df.to_excel(file_path, index=False, header=True)
else:
df = pd.read_excel(file_path)
df = df.append(data, ignore_index=True)
df.to_excel(file_path, index=False, header=True)
def main():
result_file_path = os.path.join(os.path.expanduser("~"), "Desktop", "result.xlsx")
krw_markets = get_all_krw_markets()
# 10개씩 조회
batch_size = 10
for i in range(0, len(krw_markets), batch_size):
batch_symbols = krw_markets[i:i+batch_size]
for symbol in batch_symbols:
try:
candles = get_minute_candles(symbol)
if candles:
df = pd.DataFrame(candles)
total_volume = df['volume'].sum()
# 현재 날짜와 시간
now = datetime.now()
date_time = now.strftime("%Y-%m-%d %H:%M:%S")
# 데이터 구성
data = {
'Date': [now.strftime("%Y-%m-%d")],
'Time': [now.strftime("%H:%M:%S")],
'Symbol': [symbol],
'TotalVolume': [total_volume],
}
# 결과 파일에 추가
append_to_excel(result_file_path, data)
print(f"{symbol}: {total_volume:.2f} KRW - Data added to result.xlsx")
else:
print(f"No data available for {symbol}.")
except Exception as e:
print(f"Error for {symbol}: {e}")
# 1초 간격으로 조회
time.sleep(1)
if __name__ == "__main__":
main()
기껏 내 입맛에 맞게 수정했더니 다시 수정할 코드가 생겼다. 하지만 여러분들은 걱정하지 마시라. 어차피 글 초반부에 오늘 목표가 적혀있고 결과는 항상 마지막에 있을 것이다. 가운데는 과정일 뿐. 급한 사람은 마지막 코드들을 보면서 뭐가 바뀌었는지만 확인해도 부족하지 않을 것이다.
아무튼 나는 내 입맛에 맞게 수정을 해야한다.
append_to_excel 함수를 일부 수정하며, file_path의 경우 main.py 상단에 전역변수로 result.xlsx 경로가 self.test_path 변수에 적혀있기 때문에 파라미터에서 제거하였다.
다 수정하고 기존에 있던 main.py를 다듬고 나니 문제가 생겼다. 난 거래 금액을 비교해서 거래량 급등종목을 찾으려던 것인데 1분전 캔들과 현재의 캔들에서 거래금액을 비교하면 아무리 급등종목이어도 새 캔들에서 비교하면 현재 거래량이 0에 가까울 수 있다. 그렇기 때문에 2분전 캔들과 1분전 캔들의 거래금액을 비교하는게 맞다는 생각이 들었다.
열심히 코드를 수정했지만 어쩔수 없다. 이런 과정 하나하나가 소중하다. 새로 ChatGPT에게 질문을 보낸다.
위 코드는 1분전과 현재의 거래금액을 비교하는 코드이니 2분전과 1분전의 거래금액을 비교하는 코드를 만들어 달라고 했다. 그런데 글을 쓰다보니 이상하다. 처음부터 나는 거래금액을 비교하는 코드를 달라고 요청한 적이 없었다. 아니 무슨 이런 황당한 경우가,,,
아무튼 새로 2분전과 1분전 거래금액을 비교하는 코드를 요청했고(아래) 내 입맛에 맞춰 최종 수정된 오늘의 코드는 마지막과 같다.
import os
import pyupbit
import pandas as pd
from datetime import datetime, timedelta
import time
def get_all_krw_markets():
krw_markets = pyupbit.get_tickers("KRW")
return krw_markets
def get_minute_candles(symbol, interval, count):
candles = pyupbit.get_minutes_candles(symbol, interval=interval, count=count)
return candles
def append_to_excel(file_path, data):
if not os.path.isfile(file_path):
df = pd.DataFrame(data)
df.to_excel(file_path, index=False, header=True)
else:
df = pd.read_excel(file_path)
df = df.append(data, ignore_index=True)
df.to_excel(file_path, index=False, header=True)
def main():
result_file_path = os.path.join(os.path.expanduser("~"), "Desktop", "result.xlsx")
krw_markets = get_all_krw_markets()
# 10개씩 조회
batch_size = 10
for i in range(0, len(krw_markets), batch_size):
batch_symbols = krw_markets[i:i+batch_size]
for symbol in batch_symbols:
try:
# 2분 전 데이터 조회
candles_2min = get_minute_candles(symbol, interval=1, count=2)
if len(candles_2min) >= 2:
volume_2min = candles_2min.iloc[0]['volume']
# 1분 전 데이터 조회
candles_1min = get_minute_candles(symbol, interval=1, count=1)
if len(candles_1min) == 1:
volume_1min = candles_1min.iloc[0]['volume']
# 현재 날짜와 시간
now = datetime.now()
date_time = now.strftime("%Y-%m-%d %H:%M:%S")
# 데이터 구성
data = {
'Date': [now.strftime("%Y-%m-%d")],
'Time': [now.strftime("%H:%M:%S")],
'Symbol': [symbol],
'Volume_2min': [volume_2min],
'Volume_1min': [volume_1min],
}
# 결과 파일에 추가
append_to_excel(result_file_path, data)
print(f"{symbol}: 2min: {volume_2min:.2f}, 1min: {volume_1min:.2f} - Data added to result.xlsx")
else:
print(f"No data available for {symbol} (1min data).")
else:
print(f"No data available for {symbol} (2min data).")
except Exception as e:
print(f"Error for {symbol}: {e}")
# 1초 간격으로 조회
time.sleep(1)
if __name__ == "__main__":
main()
위 코드를 실행하니 "Error for KRW-BTC: study.get_minute_candles() got an unexpected keyword argument 'interval'"와 같이 에러메시지가 발생하였으나 위 에러를 그대로 복사해 ChatGPT에게 해결방법을 물어보니 interval 대신 unit을 사용해야 한다며 수정해주었다. (그럴거면 처음부터 제대로 알려주지)
근데 또 에러나서 다시 물어보니 OHLCV를 통해 확인해야 한다고 한다. (아니 이놈이)
이 과정에서 3~4번의 에러가 나왔고 나올때마다 에러를 복사해 이번엔 이런 에러가 나왔다고 얘기해주니 하나씩 수정해주었다. (한번에좀..) 아무튼 최종적으로 이런 엑셀 파일을 얻을 수 있었다.
이번 게시글에서 내가 추가한 코드는 os와 time 라이브러리를 임포트 하였으며, 기존 __init__()함수 내에 있던 1주일 전 가격비교를 새 함수를 만들어 보기 좋게 수정하였고, 모든 종목들의 2분전, 1분전 캔들에서 거래금액을 엑셀로 추출하여 사용자가 엑셀을 통해 거래량 급등 종목을 찾을 수 있게 하였다.
지금은 엑셀을 통해 보고있지만 추후에는 모든 항목들 중 거래량 급등 종목만 추려내 슬랙으로 보내거나 바로 매매에 뛰어들도록 프로그램을 수정해야겠다.
최종코드(main.py)
import openpyxl
from pathlib import Path
from datetime import datetime, timedelta
import atexit
import pandas as pd
import pyupbit
import time
import os
import sys
############### 변수 선언부 #################
# 셀에 데이터 입력---------------------
class study():
def __init__(self):
super().__init__()
# 변수 선언
# 엑셀 오픈
self.path_to_excel_file = str(Path.home() / "Desktop" / "test.xlsx")
self.path_to_excel_file = self.path_to_excel_file.replace("\\", "/")
self.workbook = openpyxl.load_workbook(self.path_to_excel_file)
self.sheet = self.workbook.active
self.last_row = self.sheet.max_row
self.data_count = 0 # rcount를 세기 위한 변수
self.rcount = 0 # 최종 rcount(데이터가 입력될 row)
self.test_path = str(Path.home() / "Desktop" / "result.xlsx")
# 프로그램 오류 발생시 엑셀 종료
print("실행")
# 초기 실행 함수들 설정
# 여기부터 하나씩 실행
# self.get_difference_week()
# 1주일 전과 현재가 비교
self.get_minute_volume_difference()
# 1분간 거래금액 비교
# self.close_excel()
# 마지막에 항상 close
# atexit.register(self.close_excel())
print("프로그램 작동 완료")
def close_excel(self):
# 엑셀 파일 저장
self.workbook.save(self.path_to_excel_file)
# 엑셀 파일 닫기
self.workbook.close()
def input_df(self):
##--------------------셀에 데이터 입력
for row in range(3, self.last_row + 1):
cell_value = self.sheet.cell(row=row, column=1).value
if cell_value is not None:
self.data_count += 1
# 마지막 행에 데이터 추가
# new_data = "새로운 데이터"
# sheet.cell(row=data_count + 3, column=1, value=new_data)
self.rcount = self.data_count + 3
# An 셀에 오늘 날짜 입력
today_date = datetime.now().date()
self.sheet[f'A{self.rcount}'] = today_date
# Bn 셀에 현재 시간 입력
current_time = datetime.now().time()
self.sheet[f'B{self.rcount}'] = current_time
def get_price_difference(self, symbol):
# 현재 가격 조회
# print("called")
print(symbol)
try:
# 현재 가격 조회
current_price = pyupbit.get_current_price(symbol)
# 1주일 전 가격 조회
historical_data = pyupbit.get_ohlcv(symbol, interval="day", count=7)
if historical_data is None:
raise ValueError(f"Error: No data available for {symbol}.")
# week_ago_price 계산
week_ago_price = historical_data['close'].iloc[0]
# 가격 차이 및 등락률 계산
price_difference = current_price - week_ago_price
price_change_rate = (price_difference / week_ago_price) * 100
return current_price, week_ago_price, price_difference, price_change_rate
except TypeError as e:
print(f"Error: {e}")
return None
def get_difference_week(self):
self.all_coins = self.get_all_krw_markets()
# 데이터프레임 초기화
self.data = {'종목명': [], '현재가': [], '1주일전가격': [], '등락률': []}
for symbol in self.all_coins:
current_price, week_ago_price, price_difference, price_change_rate = self.get_price_difference(symbol)
# 데이터프레임에 데이터 추가
self.data['종목명'].append(symbol)
self.data['현재가'].append(current_price)
self.data['1주일전가격'].append(week_ago_price)
self.data['등락률'].append(price_change_rate)
# 데이터프레임 생성
df = pd.DataFrame(self.data)
# 등락률을 기준으로 오름차순 정렬
df_sorted = df.sort_values(by='등락률', ascending=True)
# 엑셀 파일로 출력
df_sorted.to_excel(self.test_path, index=False)
print("lastone")
def get_all_krw_markets(self):
# krw 마켓에 있는 모든 종목리스트 반환
krw_markets = pyupbit.get_tickers("KRW")
return krw_markets
def get_minute_candles(self, symbol, unit, count):
# 종목의 분단위 캔들 데이터 반환
candles = pyupbit.get_ohlcv(symbol, interval=f"{unit}m", count=count)
return candles
def get_minute_volume_difference(self):
# 종목의 분단위 캔들 데이터에서 거래금액 추출
krw_markets = self.get_all_krw_markets()
# 10개씩 조회
batch_size = 10
for i in range(0, len(krw_markets), batch_size):
batch_symbols = krw_markets[i:i + batch_size]
for symbol in batch_symbols:
try:
# 2분 전 데이터 조회
candles_2min = self.get_minute_candles(symbol, unit=1, count=2)
if len(candles_2min) >= 2:
volume_2min = candles_2min.iloc[0]['volume']
# 1분 전 데이터 조회
candles_1min = self.get_minute_candles(symbol, unit=1, count=1)
if len(candles_1min) == 1:
volume_1min = candles_1min.iloc[0]['volume']
# 현재 날짜와 시간
now = datetime.now()
date_time = now.strftime("%Y-%m-%d %H:%M:%S")
# 데이터 구성
data = {
'Date': [now.strftime("%Y-%m-%d")],
'Time': [now.strftime("%H:%M:%S")],
'Symbol': [symbol],
'Volume_2min': [volume_2min],
'Volume_1min': [volume_1min],
}
# 결과 파일에 추가
self.append_to_excel(data)
print(
f"{symbol}: 2min: {volume_2min:.2f}, 1min: {volume_1min:.2f} - Data added to result.xlsx")
else:
print(f"No data available for {symbol} (1min data).")
else:
print(f"No data available for {symbol} (2min data).")
except Exception as e:
print(f"Error for {symbol}: {e}")
# 1초 간격으로 조회
time.sleep(1)
def append_to_excel(self, data):
# 분단위로 추출된 거래금액을 날짜/시간/종목명/거래금액 으로 구분하여 엑셀에 입력
if not os.path.isfile(self.test_path):
df = pd.DataFrame(data)
df.to_excel(self.test_path, index=False, header=True)
else:
df = pd.read_excel(self.test_path)
df = pd.concat([df, pd.DataFrame(data)], ignore_index=True)
df.to_excel(self.test_path, index=False, header=True)
'프로그램' 카테고리의 다른 글
비트코인 프로젝트_타점찾기1 (2) | 2024.03.24 |
---|---|
비트코인 프로젝트_1분단위 가격 조회 (1) | 2024.03.17 |
새로운 프로젝트. 비트코인. (0) | 2024.03.03 |