|
最近有同学问怎么找各平台的热门股票榜单,正好最近我在研究东财实盘跟单与雪球组合跟单。 后面我分几期介绍一下,今天先讲雪球。
行情波动时,我每天也会看雪球的热门榜。它是个不错的风向标,能看出市场在关注什么。 其实你可以用 Python 自己做一个实时更新的监控面板,操作很简单,新手也能搞定。 一、准备工具 需要2个 Python 库: - • Requests:用来向雪球请求数据。
- • Pandas:用来处理数据。
安装命令: pip install streamlit requests pandas二、获取数据权限 直接爬雪球数据会被拦截,因为服务器能识别出是脚本访问。解决办法是模拟浏览器行为,主要靠两样东西: - 1. 请求头:包含浏览器的基本信息。
- 2. Cookie:相当于登录凭证,最关键。
具体获取步骤: - 1. 浏览器登录雪球账号。
- 2. 按 F12 打开开发者工具,切换到“网络”(Network)面板。
- 3. 刷新页面,找到对 xueqiu.com 的请求。
- 4. 在“请求头”里找到 Cookie 字段,复制其中 u= 和 xq_a_token= 后面的值备用。
三、代码实现 主要分三步: 第1步:请求数据
用 requests 库带上请求头和 Cookie,获取热门榜的原始数据。 第2步:处理数据
用 pandas 整理数据,提取股票名称、代码、价格、涨跌幅、热度等字段,并调整格式。 第3步:展示界面
完整代码已整理好,你可以直接复制使用。运行命令: streamlit run xueqiu.py
这样就能在本地浏览器看到一个实时更新的雪球热股监控面板了。 下期再介绍其他平台的数据获取方法。 界面版及PY源码阅读原文或通过网盘获取:雪球热门榜单.zip 链接: https://pan.baidu.com/s/1gaIzc5_XZ8H6WzIcxL4Dvg?pwd=zj41 提取码: zj41
- import os
- import threading
- import time
- import tkinter as tk
- from tkinter import ttk, messagebox
- import webbrowser
- import csv
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- from xueqiu_client import (
- XueqiuClient,
- XueqiuCookies,
- XueqiuAuthError,
- extract_required_cookies,
- load_config,
- save_config,
- )
- APP_TITLE = '雪球热门榜单--仓鼠量化'
- CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'xueqiu_config.json')
- HOT_STOCK_TYPE_ID = 20
- COLUMNS = [
- ('name', '名称', 180),
- ('symbol', '代码', 110),
- ('current', '现价', 90),
- ('percent', '涨跌幅(%)', 90),
- ('chg', '涨跌额', 90),
- ('exchange', '交易所', 80),
- ('value', '热度值', 110),
- ('rank_change', '排名变化', 80),
- ]
- class XueqiuHotStockGUI:
- def __init__(self, root: tk.Tk):
- self.root = root
- self.root.title(APP_TITLE)
- self.root.geometry('1200x800')
- self._auto_refresh = False
- self._auto_after_id: Optional[str] = None
- self._fetch_lock = threading.Lock()
- self._last_items: List[Dict[str, Any]] = []
- self._build_ui()
- self._load_config_into_ui()
- self._set_status('就绪')
- def _build_ui(self) -> None:
- top = tk.Frame(self.root)
- top.pack(fill=tk.X, padx=10, pady=10)
- title = tk.Label(top, text=APP_TITLE, font=('微软雅黑', 18, 'bold'))
- title.pack(side=tk.LEFT)
- self.status_var = tk.StringVar(value='')
- status = tk.Label(top, textvariable=self.status_var, anchor=tk.E)
- status.pack(side=tk.RIGHT, fill=tk.X, expand=True)
- body = tk.Frame(self.root)
- body.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10))
- self._build_left_panel(body)
- self._build_table(body)
- def _build_left_panel(self, parent: tk.Frame) -> None:
- left = tk.Frame(parent, width=380)
- left.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10))
- left.pack_propagate(False)
- cookie_frame = tk.LabelFrame(left, text='Cookie设置', padx=10, pady=10)
- cookie_frame.pack(fill=tk.X, pady=(0, 10))
- tk.Label(cookie_frame, text='Cookie整段(可选):').pack(anchor=tk.W)
- self.cookie_text = tk.Text(cookie_frame, height=5, width=40)
- self.cookie_text.pack(fill=tk.X, pady=(5, 10))
- row1 = tk.Frame(cookie_frame)
- row1.pack(fill=tk.X)
- tk.Label(row1, text='u:').pack(side=tk.LEFT)
- self.u_var = tk.StringVar(value='')
- tk.Entry(row1, textvariable=self.u_var).pack(side=tk.RIGHT, fill=tk.X, expand=True)
- row2 = tk.Frame(cookie_frame)
- row2.pack(fill=tk.X, pady=(6, 0))
- tk.Label(row2, text='xq_a_token:').pack(side=tk.LEFT)
- self.token_var = tk.StringVar(value='')
- tk.Entry(row2, textvariable=self.token_var).pack(side=tk.RIGHT, fill=tk.X, expand=True)
- btn_row = tk.Frame(cookie_frame)
- btn_row.pack(fill=tk.X, pady=(10, 0))
- tk.Button(btn_row, text='从整段Cookie解析', command=self.on_parse_cookie).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 6))
- tk.Button(btn_row, text='保存配置', command=self.on_save_config).pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=(6, 0))
- action_frame = tk.LabelFrame(left, text='获取与刷新', padx=10, pady=10)
- action_frame.pack(fill=tk.X, pady=(0, 10))
- interval_row = tk.Frame(action_frame)
- interval_row.pack(fill=tk.X, pady=(10, 0))
- tk.Label(interval_row, text='自动刷新间隔(秒):').pack(side=tk.LEFT)
- self.interval_var = tk.StringVar(value='30')
- ttk.Spinbox(interval_row, from_=10, to=600, textvariable=self.interval_var, width=8).pack(side=tk.RIGHT)
- btn_row2 = tk.Frame(action_frame)
- btn_row2.pack(fill=tk.X, pady=(10, 0))
- tk.Button(btn_row2, text='测试Cookie', command=self.on_test_cookie).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 6))
- tk.Button(btn_row2, text='手动刷新', command=self.on_refresh).pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=(6, 0))
- btn_row3 = tk.Frame(action_frame)
- btn_row3.pack(fill=tk.X, pady=(10, 0))
- self.auto_btn = tk.Button(btn_row3, text='开始自动刷新', command=self.on_toggle_auto)
- self.auto_btn.pack(fill=tk.X)
- export_frame = tk.LabelFrame(left, text='导出', padx=10, pady=10)
- export_frame.pack(fill=tk.X)
- tk.Button(export_frame, text='导出CSV', command=self.on_export_csv).pack(fill=tk.X)
- def _build_table(self, parent: tk.Frame) -> None:
- right = tk.Frame(parent)
- right.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
- table_frame = tk.Frame(right)
- table_frame.pack(fill=tk.BOTH, expand=True)
- self.tree = ttk.Treeview(table_frame, columns=[c[0] for c in COLUMNS], show='headings')
- vsb = ttk.Scrollbar(table_frame, orient='vertical', command=self.tree.yview)
- hsb = ttk.Scrollbar(table_frame, orient='horizontal', command=self.tree.xview)
- self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
- self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- vsb.pack(side=tk.RIGHT, fill=tk.Y)
- hsb.pack(side=tk.BOTTOM, fill=tk.X)
- for key, title, width in COLUMNS:
- self.tree.heading(key, text=title)
- self.tree.column(key, width=width, anchor=tk.W)
- self.tree.bind('<Double-1>', self.on_open_selected)
- menu = tk.Menu(self.root, tearoff=0)
- menu.add_command(label='打开雪球链接', command=self.on_open_selected)
- menu.add_command(label='复制代码', command=self.on_copy_symbol)
- self._context_menu = menu
- self.tree.bind('<Button-3>', self._show_context_menu)
- def _show_context_menu(self, event) -> None:
- item_id = self.tree.identify_row(event.y)
- if item_id:
- self.tree.selection_set(item_id)
- self._context_menu.tk_popup(event.x_root, event.y_root)
- def _set_status(self, msg: str) -> None:
- ts = datetime.now().strftime('%H:%M:%S')
- self.status_var.set(f'{ts} - {msg}')
- def _get_client(self) -> XueqiuClient:
- u = (self.u_var.get() or '').strip()
- token = (self.token_var.get() or '').strip()
- if not u or not token:
- raise ValueError('请先填写 u 和 xq_a_token,或粘贴整段Cookie后解析')
- return XueqiuClient(XueqiuCookies(u=u, xq_a_token=token))
- def on_parse_cookie(self) -> None:
- cookie_string = self.cookie_text.get('1.0', tk.END).strip()
- u, token = extract_required_cookies(cookie_string)
- if u:
- self.u_var.set(u)
- if token:
- self.token_var.set(token)
- if u and token:
- self._set_status('已从整段Cookie解析成功')
- else:
- messagebox.showwarning('提示', '未能解析到 u 或 xq_a_token,请确认Cookie内容')
- def _load_config_into_ui(self) -> None:
- cfg = load_config(CONFIG_FILE)
- if not isinstance(cfg, dict):
- return
- self.u_var.set(str(cfg.get('u', '') or ''))
- self.token_var.set(str(cfg.get('xq_a_token', '') or ''))
- self.interval_var.set(str(cfg.get('interval_seconds', '30') or '30'))
- def on_save_config(self) -> None:
- data = {
- 'u': (self.u_var.get() or '').strip(),
- 'xq_a_token': (self.token_var.get() or '').strip(),
- 'interval_seconds': int((self.interval_var.get() or '30').strip() or '30'),
- }
- save_config(CONFIG_FILE, data)
- self._set_status('配置已保存')
- def on_test_cookie(self) -> None:
- def worker():
- try:
- client = self._get_client()
- start = time.time()
- items = client.get_hot_stocks(type_id=HOT_STOCK_TYPE_ID, page=1, size=10)
- cost = int((time.time() - start) * 1000)
- self.root.after(0, lambda: self._set_status(f'Cookie有效,获取到 {len(items)} 条 (耗时 {cost}ms)'))
- except XueqiuAuthError as e:
- self.root.after(0, lambda: self._on_cookie_expired(str(e)))
- except Exception as e:
- self.root.after(0, lambda: messagebox.showerror('错误', str(e)))
- threading.Thread(target=worker, daemon=True).start()
- def _on_cookie_expired(self, msg: str) -> None:
- self._set_status('Cookie疑似失效')
- messagebox.showwarning('Cookie可能已失效', msg)
- def on_refresh(self) -> None:
- if not self._fetch_lock.acquire(blocking=False):
- return
- self._set_status('正在获取数据...')
- def worker():
- try:
- client = self._get_client()
- start = time.time()
- items = client.get_hot_stocks(type_id=HOT_STOCK_TYPE_ID, page=1, size=100)
- cost = int((time.time() - start) * 1000)
- self.root.after(0, lambda: self._update_table(items, cost))
- except XueqiuAuthError as e:
- self.root.after(0, lambda: self._on_cookie_expired(str(e)))
- except Exception as e:
- self.root.after(0, lambda: messagebox.showerror('错误', str(e)))
- self.root.after(0, lambda: self._set_status('获取失败'))
- finally:
- self._fetch_lock.release()
- threading.Thread(target=worker, daemon=True).start()
- def _update_table(self, items: List[Dict[str, Any]], cost_ms: int) -> None:
- self._last_items = items
- for iid in self.tree.get_children():
- self.tree.delete(iid)
- for item in items:
- values = []
- for key, _title, _w in COLUMNS:
- v = item.get(key)
- if v is None:
- values.append('')
- else:
- values.append(str(v))
- self.tree.insert('', tk.END, values=values)
- self._set_status(f'已更新 {len(items)} 条 (耗时 {cost_ms}ms)')
- def on_toggle_auto(self) -> None:
- if not self._auto_refresh:
- self._auto_refresh = True
- self.auto_btn.config(text='停止自动刷新')
- self._schedule_next_refresh(0)
- self._set_status('自动刷新已启动')
- else:
- self._auto_refresh = False
- self.auto_btn.config(text='开始自动刷新')
- if self._auto_after_id:
- self.root.after_cancel(self._auto_after_id)
- self._auto_after_id = None
- self._set_status('自动刷新已停止')
- def _schedule_next_refresh(self, delay_ms: int) -> None:
- if not self._auto_refresh:
- return
- self._auto_after_id = self.root.after(delay_ms, self._auto_tick)
- def _auto_tick(self) -> None:
- if not self._auto_refresh:
- return
- try:
- interval = int((self.interval_var.get() or '30').strip() or '30')
- except Exception:
- interval = 30
- self.on_refresh()
- self._schedule_next_refresh(interval * 1000)
- def _get_selected_symbol(self) -> Optional[str]:
- sel = self.tree.selection()
- if not sel:
- return None
- values = self.tree.item(sel[0], 'values')
- if not values:
- return None
- symbol_idx = next((i for i, c in enumerate(COLUMNS) if c[0] == 'symbol'), None)
- if symbol_idx is None:
- return None
- if symbol_idx >= len(values):
- return None
- return str(values[symbol_idx])
- def on_open_selected(self, _event=None) -> None:
- symbol = self._get_selected_symbol()
- if not symbol:
- return
- url = f'https://xueqiu.com/S/{symbol}'
- webbrowser.open(url)
- def on_copy_symbol(self) -> None:
- symbol = self._get_selected_symbol()
- if not symbol:
- return
- self.root.clipboard_clear()
- self.root.clipboard_append(symbol)
- self.root.update()
- self._set_status('已复制代码到剪贴板')
- def on_export_csv(self) -> None:
- if not self._last_items:
- messagebox.showinfo('提示', '没有数据可导出,请先获取数据')
- return
- from tkinter import filedialog
- filename = filedialog.asksaveasfilename(defaultextension='.csv', filetypes=[('CSV文件', '*.csv')])
- if not filename:
- return
- try:
- with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
- writer = csv.writer(f)
- writer.writerow([c[1] for c in COLUMNS] + ['链接'])
- for item in self._last_items:
- row = [item.get(c[0], '') for c in COLUMNS]
- symbol = item.get('symbol', '')
- row.append(f'https://xueqiu.com/S/{symbol}' if symbol else '')
- writer.writerow(row)
- self._set_status(f'已导出: {filename}')
- except Exception as e:
- messagebox.showerror('错误', f'导出失败: {e}')
- def main() -> None:
- root = tk.Tk()
- app = XueqiuHotStockGUI(root)
- root.mainloop()
- if __name__ == '__main__':
- main()
复制代码
|