본문 바로가기
  • You find inspiration to create your own path !
업무 자동화/python & CAD

Python 학습] 주식 종목 이름 가져오기 #2

by ToolBOX01 2025. 8. 19.
반응형

▣ 데이터 베이스에 주식 종목이름, 액면가, 상장주식수 입력하기

[stock_name 테이블]

▷ 프로그램 화면

이 프로그램은 Python으로 만들어진 GUI(Graphical User Interface) 애플리케이션으로, 다음과 같은 주요 기능을 수행합니다.

  1. 코스피(KOSPI) 종목 정보 크롤링: 네이버 금융 웹사이트에서 현재 상장된 코스피 종목들의 이름, 액면가, 상장 주식수를 자동으로 수집합니다. 여러 페이지를 넘나들며 모든 데이터를 가져오는 기능을 포함하고 있습니다.
  2. PostgreSQL 데이터베이스 연동: 크롤링한 데이터를 로컬 PostgreSQL 데이터베이스에 저장하고, 저장된 데이터를 다시 읽어와서 사용자 인터페이스에 표시합니다. 데이터베이스 연결 및 오류 처리가 안정적으로 구현되어 있습니다.
  3. 데이터베이스 테이블 관리:
    • 테이블 초기화: recreate_database_table 함수를 통해 기존 stock_name 테이블을 완전히 삭제하고, 필요한 컬럼들(stock_id, stock_name, face_value, number_listed_shares, created_at)을 포함한 새로운 테이블을 깨끗하게 다시 생성합니다. 이는 데이터베이스 구조 관련 오류를 해결하는 데 유용합니다.
    • 데이터 저장: save_to_database 함수를 통해 스크래핑한 종목 데이터를 데이터베이스에 저장합니다. ON CONFLICT (stock_name) DO NOTHING 명령어를 사용하여 이미 존재하는 종목은 중복 저장되지 않도록 처리합니다.
    • 데이터 조회: show_data_from_database 함수를 통해 데이터베이스에 저장된 모든 종목 정보를 불러와 GUI 화면에 출력합니다.
  4. 사용자 인터페이스(GUI): CustomTkinter 라이브러리를 사용하여 시각적으로 보기 좋은 UI를 제공합니다.
    • 진행 상태 표시: 크롤링 및 데이터베이스 작업의 진행률을 실시간으로 보여주는 프로그레스 바와 라벨이 있습니다.
    • 버튼 기능: "데이터베이스 테이블 초기화", "종목 정보 크롤링 및 저장", "데이터베이스 종목 보기" 세 가지 버튼을 통해 사용자가 각 기능을 쉽게 실행할 수 있습니다.
    • 결과 출력: scrolledtext 위젯에 작업 결과(크롤링된 데이터, 데이터베이스 내용 등)가 표시되어 사용자가 내용을 한눈에 확인할 수 있습니다.

요약하자면, 이 프로그램은 웹 스크래핑과 데이터베이스 연동을 결합하여 특정 웹사이트의 금융 데이터를 자동으로 수집하고, 이를 구조화된 형태로 관리하며, 사용자에게 직관적인 GUI를 통해 보여주는 역할을 합니다.

▷ 프로그램 코드

import customtkinter as ctk
from tkinter import scrolledtext
import requests
from bs4 import BeautifulSoup
import time
import psycopg2
from psycopg2 import OperationalError
import threading

# Database connection details
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "user_accounts"
DB_USER = "designer"
DB_PASSWORD = "7777"

def show_data_from_database(text_widget, status_label):
    """
    Connects to the PostgreSQL database and retrieves all stock names to display.
    """
    conn = None
    cur = None
    try:
        status_label.configure(text="데이터베이스에서 데이터 불러오는 중...")
        app.update()
        conn = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        cur = conn.cursor()

        # Retrieve all stock names from the table
        cur.execute("SELECT stock_id, stock_name, face_value, number_listed_shares, created_at FROM stock_name ORDER BY stock_name;")
        all_stocks = cur.fetchall()
        
        # Display data in the text widget
        text_widget.delete(1.0, ctk.END)
        if not all_stocks:
            text_widget.insert(ctk.END, "데이터베이스에 저장된 종목이 없습니다.")
        else:
            text_widget.insert(ctk.END, "--- 데이터베이스에 저장된 모든 종목명 ---\n\n")
            for i, stock in enumerate(all_stocks, 1):
                text_widget.insert(ctk.END, f"{i}. (ID: {stock[0]}) {stock[1]} | 액면가: {stock[2]} | 상장주식수: {stock[3]} | 생성 시간: {stock[4]}\n")
        status_label.configure(text="데이터 불러오기 완료.")

    except OperationalError as e:
        print(f"데이터베이스 연결 오류: {e}")
        text_widget.insert(ctk.END, f"데이터베이스 연결 오류가 발생했습니다: {e}")
        status_label.configure(text=f"오류: {e}")
    except Exception as e:
        print(f"데이터 불러오기 중 오류 발생: {e}")
        text_widget.insert(ctk.END, f"오류: 데이터 불러오기 중 예기치 않은 오류 발생: {e}")
        status_label.configure(text=f"오류: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
            print("PostgreSQL connection is closed")
        app.update()

def save_to_database(stock_data, status_label):
    """
    Connects to the PostgreSQL database and saves the scraped stock names.
    This version handles transactions more robustly.
    """
    conn = None
    cur = None
    try:
        # Establish a connection to the database
        status_label.configure(text="데이터베이스에 연결 중...")
        app.update()
        
        conn = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        cur = conn.cursor()
        
        # Insert stock names into the table
        status_label.configure(text="데이터베이스에 종목 정보 저장 중...")
        app.update()
        
        # Use a single transaction block for the entire operation
        for stock in stock_data:
            name, raw_face_value, raw_listed_shares = stock
            
            # Clean and convert data to integers, handling potential errors
            try:
                face_value = int(raw_face_value.replace(',', ''))
            except (ValueError, TypeError):
                face_value = 0
            
            try:
                listed_shares = int(raw_listed_shares.replace(',', ''))
            except (ValueError, TypeError):
                listed_shares = 0
            
            cur.execute(
                "INSERT INTO stock_name (stock_name, face_value, number_listed_shares) VALUES (%s, %s, %s) ON CONFLICT (stock_name) DO NOTHING;", 
                (name, face_value, listed_shares)
            )
        
        # If the loop completes without error, commit the transaction
        conn.commit()
        status_label.configure(text="완료: 100% (데이터베이스 저장 완료)")

    except OperationalError as e:
        status_label.configure(text=f"데이터베이스 연결 오류: {e}")
        print(f"The error '{e}' occurred")
        if conn:
            conn.rollback() # Rollback the transaction on error
            print("Transaction rolled back due to an error.")
    except Exception as e:
        status_label.configure(text=f"오류: 데이터 저장 중 예기치 않은 오류 발생: {e}")
        print(f"Unexpected error during data saving: {e}")
        if conn:
            conn.rollback() # Rollback the transaction on error
            print("Transaction rolled back due to an unexpected error.")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
            print("PostgreSQL connection is closed")
        app.update()

def recreate_database_table(status_label):
    """
    Drops the existing stock_name table and recreates it with the correct schema,
    including new columns.
    """
    conn = None
    cur = None
    try:
        status_label.configure(text="데이터베이스 테이블 초기화 중...")
        app.update()
        conn = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        cur = conn.cursor()

        # Drop the table if it exists
        cur.execute("DROP TABLE IF EXISTS stock_name;")
        
        # Create the table with the new columns: stock_id (integer), created_at (timestamp)
        cur.execute("""
            CREATE TABLE stock_name (
                stock_id SERIAL PRIMARY KEY,
                stock_code INTEGER UNIQUE,     
                stock_name VARCHAR(255) UNIQUE NOT NULL,
                face_value INTEGER,
                number_listed_shares INTEGER,
                created_at TIMESTAMP DEFAULT NOW()
            );
        """)
        
        conn.commit()
        status_label.configure(text="완료: 데이터베이스 테이블이 성공적으로 초기화되었습니다.")
        print("Database table recreated successfully.")

    except OperationalError as e:
        status_label.configure(text=f"데이터베이스 연결 오류: {e}")
        print(f"The error '{e}' occurred")
        if conn:
            conn.rollback()
    except Exception as e:
        status_label.configure(text=f"오류: 테이블 초기화 중 예기치 않은 오류 발생: {e}")
        print(f"Unexpected error during table recreation: {e}")
        if conn:
            conn.rollback()
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
            print("PostgreSQL connection is closed")
        app.update()


def scrape_kospi_stock_names(progress_bar, progress_label):
    """
    Scrapes KOSPI stock names from Naver Finance.
    """
    stock_data = []
    page = 1
    base_url = "https://finance.naver.com/sise/sise_market_sum.naver?sosok=0&page={}"

    # Check max pages from the first page
    url = base_url.format(1)
    try:
        response = requests.get(url, timeout=10)
        response.encoding = 'euc-kr'
        if response.status_code != 200:
            print(f"첫 페이지 요청 실패. 상태 코드: {response.status_code}")
            return stock_data
        soup = BeautifulSoup(response.text, 'html.parser')
        pagination = soup.find('td', class_='pgRR')
        max_pages = 1
        if pagination:
            last_page_link = pagination.find('a')['href']
            max_pages = int(last_page_link.split('page=')[-1])
        print(f"최대 페이지 수: {max_pages}")
    except requests.exceptions.RequestException as e:
        print(f"첫 페이지 요청 중 오류 발생: {e}")
        progress_label.configure(text="오류: 네트워크 연결을 확인하세요.")
        return stock_data

    # Loop through all pages
    while True:
        progress = (page - 1) / max_pages if max_pages > 0 else 0
        progress_bar.set(progress)
        progress_label.configure(text=f"진행 중: {int(progress * 100)}% (페이지 {page}/{max_pages})")
        app.update()

        url = base_url.format(page)
        try:
            response = requests.get(url, timeout=10)
            response.encoding = 'euc-kr'
            if response.status_code != 200:
                print(f"페이지 {page} 요청 실패. 상태 코드: {response.status_code}")
                break

            soup = BeautifulSoup(response.text, 'html.parser')
            table = soup.find('table', class_='type_2')

            if not table:
                print(f"페이지 {page}에 테이블이 없음. 크롤링 종료.")
                break

            rows = table.find('tbody').find_all('tr')
            found_data = False
            for row in rows:
                cells = row.find_all('td')
                if len(cells) > 7:
                    stock_name = cells[1].find('a').text.strip()
                    face_value = cells[5].text.strip()
                    listed_shares = cells[7].text.strip()
                    stock_data.append((stock_name, face_value, listed_shares))
                    found_data = True

            if not found_data:
                print(f"페이지 {page}에 데이터가 없음. 크롤링 종료.")
                break

            print(f"페이지 {page} 크롤링 완료. 현재 종목 수: {len(stock_data)}")
            page += 1
            time.sleep(1)
            
            # Stop if we have reached the max pages
            if page > max_pages:
                print(f"최대 페이지 ({max_pages})에 도달하여 크롤링을 종료합니다.")
                break
        
        except requests.exceptions.RequestException as e:
            print(f"페이지 {page} 요청 중 오류 발생: {e}")
            progress_label.configure(text=f"오류: 페이지 {page} 요청 중 문제 발생.")
            break
            
    progress_bar.set(1.0)
    progress_label.configure(text="크롤링 완료: 100%")
    app.update()

    return stock_data

def start_scraping_and_saving():
    """
    Main function to orchestrate scraping and saving.
    Runs the scraping in a separate thread to prevent UI from freezing.
    """
    text_widget.delete(1.0, ctk.END)
    progress_bar.set(0.0)
    progress_label.configure(text="진행 중: 0% (크롤링 시작)")
    app.update()

    stock_data = scrape_kospi_stock_names(progress_bar, progress_label)
    
    if stock_data:
        # Display data in the text widget
        text_widget.delete(1.0, ctk.END)
        for i, (name, face_value, listed_shares) in enumerate(stock_data, 1):
            text_widget.insert(ctk.END, f"{i}. {name} | 액면가: {face_value} | 상장주식수: {listed_shares}\n")
        
        # Save to database in a separate thread
        db_thread = threading.Thread(target=save_to_database, args=(stock_data, progress_label))
        db_thread.start()
    else:
        progress_label.configure(text="크롤링된 데이터가 없습니다.")

    # After scraping, display the contents of the database
    show_db_thread = threading.Thread(target=show_data_from_database, args=(text_widget, progress_label))
    show_db_thread.start()

# GUI setup
app = ctk.CTk()
app.title("종목 정보")
app.geometry("800x600")

# Progress bar
progress_bar = ctk.CTkProgressBar(master=app, mode="determinate")
progress_bar.pack(pady=5, fill="x", padx=10)
progress_bar.set(0.0)

# Progress label
progress_label = ctk.CTkLabel(master=app, text="진행 중: 0%")
progress_label.pack(pady=5)

# Buttons container
button_frame = ctk.CTkFrame(master=app, fg_color="transparent")
button_frame.pack(pady=10)

# Recreate Table button
recreate_button = ctk.CTkButton(master=button_frame, text="데이터베이스 테이블 초기화", command=lambda: threading.Thread(target=recreate_database_table, args=(progress_label,)).start(), fg_color="#e74c3c", hover_color="#c0392b")
recreate_button.pack(side="left", padx=5)

# Scrape and Save button
scrape_button = ctk.CTkButton(master=button_frame, text="종목 정보 크롤링 및 저장", command=start_scraping_and_saving, fg_color="#2ecc71", hover_color="#27ae60")
scrape_button.pack(side="left", padx=5)

# Show Database button
show_db_button = ctk.CTkButton(master=button_frame, text="데이터베이스 종목 보기", command=lambda: threading.Thread(target=show_data_from_database, args=(text_widget, progress_label)).start(), fg_color="#3498db", hover_color="#2980b9")
show_db_button.pack(side="left", padx=5)

# Scrolled text widget
text_widget = scrolledtext.ScrolledText(master=app, width=40, height=20)
text_widget.pack(pady=10, padx=10, fill="both", expand=True)

# Run the app
app.mainloop()

위 웹 크롤링 파이썬 코드는 AI가 코딩 한것 입니다. 십여번의 시도 끝에 코딩을 완료 하였습니다.  프로그램 개발 목적이 있다면, 비-전무가도 빠르게 자신에게 필요한 프로그램을 개발할 수 있습니다. 제3자가 만든 코드를 AI에게 해설을 요청하고, 해설 내용을 수정하여 자신의 프로그램으로 변경할 수 있습니다. 

파이썬은 컴파일 과정이 필요 없습니다. 다양한 라이브러리를 연결할 수 있습니다. AI로 다양한 프로그램을 만들어 보세요. 경험이 쌓이면 좋은 프로그램을 만들수 있습니다. 당장 경험을 해보십시요.

by korealionkk@gmail.com


반응형