4S仓位管理算法编程,总资金按照评分高低,自动分配个股持仓权重。

发布时间:2026/6/21 22:49:18

4S仓位管理算法编程,总资金按照评分高低,自动分配个股持仓权重。 严格聚焦 4S 仓位管理算法总资金按评分高低自动分配个股持仓权重4S 仓位管理算法基于评分的个股资金分配一、实际应用场景描述在量化选股系统中选股Stock Selection 和 仓位管理Position Sizing 是两个独立问题。即使 AI 模型给出了评分如果不科学地分配资金依然会典型场景场景 问题5 只候选股评分接近 平均分配 → 好公司和一般公司拿一样多的钱评分差距很大90 分 vs 60 分 等权 → 浪费资金在弱信号上资金量不同10 万 vs 1000 万 固定手数 → 小资金过度分散新资金流入 不知道如何在现有持仓基础上增量分配回测中 仓位管理缺失 → 收益被稀释、波动被放大核心矛盾AI 模型给出了谁好谁差但没有回答好多少该给多少。二、引入痛点问题结构化层级 痛点 后果数据层 评分分布未知不做归一化 极端值主导资金分配算法层 等权分配浪费评分信息 策略区分度丧失工程层 不考虑个股市值 / 流动性约束 小盘股分到大资金 → 冲击成本爆炸风控层 单只个股无上限 → 过度集中 一只炸雷 → 全盘崩教学层 讲义只讲选股不讲分钱 学生以为选出来就结束了本质结论选股是找好标的仓位管理是把好钢用在刀刃上。两者缺一不可。三、核心逻辑讲解3.1 仓位管理的核心问题给定- 总资金 M- N 只候选股每只有一个评分 s_i ∈ [0, 1]- 每只股票的市值 cap_i可选约束求每只股票分配多少资金 w_i使得1. Σ w_i M全部分配2. w_i 与 s_i 正相关评分越高拿越多3. w_i ≤ cap_i不超过个股市值约束4. 某种公平性或集中度约束3.2 四种分配算法对比算法 公式 特点 适用场景等权Equal Weight w_i M/N 最简单忽略评分 基准对比线性加权Linear w_i M × s_i / Σs_j 评分直接映射资金 评分分布均匀时指数加权Exponential w_i M × e^(α·s_i) / Σe^(α·s_j) 放大头部差异 评分差距大时凯利改进Kelly-Modified w_i ∝ s_i² / σ_i² 高分 低波动 → 更多资金 有波动率数据时指数加权的 α 是核心超参数- α 0 → 退化为等权- α 1 → 温和放大- α 3 → 极度集中冠军拿大头3.3 工程化增强三层约束┌─────────────────────────────────────────────────────┐│ 仓位分配三大约束 │├─────────────────────────────────────────────────────┤│ ││ 约束 1单只上限 ││ ───────────────────────────────────────── ││ w_i ≤ M × max_single_pct如 30% ││ → 防止全仓一只的灾难性风险 ││ ││ 约束 2最小持仓门槛 ││ ───────────────────────────────────────── ││ w_i ≥ M × min_single_pct如 5% ││ → 防止分 0.5% 给某只的过度分散 ││ ││ 约束 3流动性约束可选 ││ ───────────────────────────────────────── ││ w_i ≤ daily_amount_i × liquidity_pct如 10% ││ → 确保分配的资金能买得进 ││ │└─────────────────────────────────────────────────────┘3.4 增量分配新资金流入当已有持仓新资金到来时不是重新分配全部资金而是只在现有持仓 新候选中做增量调整新增资金 ΔM 的分配1. 保留现有持仓不变2. 对 ΔM 执行仓位分配算法3. 叠加到现有持仓上4. 若超上限 → 溢出到次优标的四、项目结构position_manager/├── README.md├── requirements.txt├── config.yaml├── data/│ ├── scores.csv # 个股评分数据│ └── liquidity.csv # 成交额数据可选├── src/│ ├── allocator.py # ★ 核心仓位分配算法│ ├── constraint_engine.py # ★ 约束引擎上限/下限/流动性│ ├── incremental_allocator.py # 增量分配│ ├── position_backtester.py # 仓位管理回测│ └── visualizer.py # 可视化├── main.py└── output/└── position_allocation.csv五、完整代码模块化 清晰注释requirements.txtpandas1.5numpy1.21matplotlib3.5seaborn0.12scipy1.9pyyaml6.0config.yaml# 4S 仓位管理配置# ★ 分配算法allocator:method: exponential # equal / linear / exponential / kellyexponential_alpha: 2.0 # 指数加权参数0等权越大越集中# 等权模式不需要额外参数# ★ 约束constraints:max_single_pct: 0.30 # 单只最多 30%min_single_pct: 0.05 # 单只最少 5%防止过度分散enforce_min: true # 是否执行最小持仓约束liquidity_cap_pct: 0.10 # 单只持仓不超过日成交额的 10%# 回测backtest:total_capital: 1000000 # 总资金 100 万n_candidates: 10 # 每次选 10 只commission_rate: 0.0003stamp_tax_rate: 0.001# 输出output:path: output/position_allocation.csvsrc/allocator.py★ 核心模块allocator.py★ 仓位分配算法按评分分配资金支持四种算法1. equal — 等权分配2. linear — 评分线性加权3. exponential — 指数加权放大头部4. kelly — 凯利改进评分² / 波动率²import pandas as pdimport numpy as npfrom typing import Dict, List, Tuple, Optionalimport logginglogging.basicConfig(levellogging.INFO, format%(asctime)s [%(levelname)s] %(message)s)logger logging.getLogger(__name__)class PositionAllocator:★ 仓位分配器核心方法allocate(capital, scores, volatilitiesNone)→ 返回每只股票的分配权重VALID_METHODS [equal, linear, exponential, kelly]def __init__(self,method: str exponential,exponential_alpha: float 2.0):参数:method: 分配算法equal / linear / exponential / kellyexponential_alpha: 指数加权参数越大头部越集中if method not in self.VALID_METHODS:raise ValueError(f未知算法: {method}支持: {self.VALID_METHODS})self.method methodself.alpha exponential_alphalogger.info(f仓位分配器初始化: 算法{method})if method exponential:logger.info(f 指数加权 α{exponential_alpha})def allocate(self,capital: float,scores: pd.Series,volatilities: Optional[pd.Series] None) - pd.Series:★ 核心方法将总资金按评分分配到各标的参数:capital: 总资金元scores: 个股评分 Seriesindexcode, value0~1volatilities: 波动率 Series仅 kelly 模式需要返回:pd.Series: indexcode, value分配金额元if len(scores) 0:return pd.Series(dtypefloat)scores scores.dropna()if len(scores) 0:return pd.Series(dtypefloat)# 归一化评分到 [0, 1]s_min, s_max scores.min(), scores.max()if s_max - s_min 1e-10:# 所有评分相同 → 等权return self._equal(capital, scores)norm_scores (scores - s_min) / (s_max - s_min)if self.method equal:weights self._equal(capital, scores)elif self.method linear:weights self._linear(capital, norm_scores)elif self.method exponential:weights self._exponential(capital, norm_scores)elif self.method kelly:weights self._kelly(capital, norm_scores, volatilities)else:raise ValueError(f未实现算法: {self.method})# 记录分配详情self._log_allocation(scores, weights)return weightsdef _equal(self, capital: float, scores: pd.Series) - pd.Series:等权分配w_i M / Nn len(scores)return pd.Series(capital / n, indexscores.index)def _linear(self, capital: float, norm_scores: pd.Series) - pd.Series:线性加权w_i M × s_i / Σs_jtotal norm_scores.sum()if total 1e-10:return self._equal(capital, norm_scores)return capital * norm_scores / totaldef _exponential(self, capital: float, norm_scores: pd.Series) - pd.Series:★ 指数加权w_i M × e^(α·s_i) / Σe^(α·s_j)关键性质- α → 0 时退化为等权- α 越大第一名分到的资金越多exp_scores np.exp(self.alpha * norm_scores)total exp_scores.sum()if total 1e-10:return self._equal(capital, norm_scores)return capital * exp_scores / totaldef _kelly(self,capital: float,norm_scores: pd.Series,volatilities: Optional[pd.Series]) - pd.Series:★ 凯利改进w_i ∝ s_i² / σ_i²逻辑- 评分越高 → 越值得重仓- 波动率越高 → 越应该少配- 两者平衡 → 夏普比率最大化方向if volatilities is None:logger.warning(凯利模式未提供波动率退化为线性加权)return self._linear(capital, norm_scores)# 对齐 indexvol volatilities.reindex(norm_scores.index).fillna(0.2) # 缺失 → 默认 20% 波动vol vol.replace(0, 0.01) # 防止除零# w_i ∝ score² / vol²score_sq norm_scores ** 2vol_sq vol ** 2raw score_sq / vol_sqtotal raw.sum()if total 1e-10:return self._equal(capital, norm_scores)return capital * raw / totaldef _log_allocation(self, scores: pd.Series, weights: pd.Series):打印分配详情logger.debug(f\n 仓位分配明细{self.method}:)logger.debug(f {代码:8} {评分:8} {分配金额:15} {占比:8})logger.debug(f {─*50})sorted_idx weights.sort_values(ascendingFalse).indexfor code in sorted_idx:pct weights[code] / weights.sum() * 100logger.debug(f {code:8} {scores.get(code, 0):.3f} f¥{weights[code]:12,.0f} {pct:5.1f}%)# 集中度指标赫芬达尔指数 HHIshares weights / weights.sum()hhi (shares ** 2).sum()logger.debug(f 集中度 HHI: {hhi:.4f}1/N{1/len(scores):.4f})def allocate_batch(self,capital: float,scores_df: pd.DataFrame,score_col: str score) - pd.DataFrame:批量分配多日期参数:capital: 总资金scores_df: DataFrame含 date, code, score 列score_col: 评分列名返回:DataFrame: 新增 allocation 列results []for date, group in scores_df.groupby(date):scores pd.Series(group[score_col].values,indexgroup[code].values)alloc self.allocate(capital, scores)for code, amount in alloc.items():results.append({date: date,code: code,score: scores[code],allocation: amount,allocation_pct: amount / capital * 100})return pd.DataFrame(results)src/constraint_engine.py★ 约束引擎constraint_engine.py★ 仓位约束引擎上限 / 下限 / 流动性约束import pandas as pdimport numpy as npfrom typing import Dict, Optionalimport logginglogger logging.getLogger(__name__)class ConstraintEngine:★ 仓位约束引擎三层约束1. 单只上限max_single_pct2. 单只下限min_single_pct3. 流动性上限liquidity_cap_pct × 日均成交额def __init__(self,max_single_pct: float 0.30,min_single_pct: float 0.05,enforce_min: bool True,liquidity_cap_pct: Optional[float] 0.10):参数:max_single_pct: 单只最多占总资金的百分比min_single_pct: 单只最少占总资金的百分比enforce_min: 是否执行最小持仓约束liquidity_cap_pct: 持仓不超过日成交额的百分比None 不限制self.max_pct max_single_pctself.min_pct min_single_pctself.enforce_min enforce_minself.liq_pct liquidity_cap_pctlogger.info(f约束引擎初始化:)logger.info(f 单只上限: {max_single_pct*100:.0f}%)logger.info(f 单只下限: {min_single_pct*100:.0f}%{开 if enforce_min else 关})logger.info(f 流动性约束: {f{liquidity_cap_pct*100:.0f}% if liquidity_cap_pct else 关})def apply(self,capital: float,raw_allocations: pd.Series,daily_amounts: Optional[pd.Series] None) - pd.Series:★ 核心方法对原始分配施加约束参数:capital: 总资金raw_allocations: 原始分配金额daily_amounts: 日均成交额可选用于流动性约束返回:约束后的分配金额alloc raw_allocations.copy()if len(alloc) 0:return alloc# 约束 1单只上限 max_amount capital * self.max_pctcapped (alloc max_amount).sum()if capped 0:logger.debug(f 约束1: {capped} 只超过上限 {self.max_pct*100:.0f}%已截断)alloc alloc.clip(uppermax_amount)# 约束 2最小持仓可选if self.enforce_min:min_amount capital * self.min_pct# 小于下限的 → 提升到下限从超额的里面匀under_min alloc min_amountif under_min.sum() 0:# 简单处理设为零不分配多余资金再分配alloc[under_min] 0logger.debug(f 约束2: {under_min.sum()} 只低于下限已归零)# 约束 3流动性上限 if self.liq_pct is not None and daily_amounts is not None:liq_cap daily_amounts * self.liq_pctliq_violations (alloc liq_cap).sum()if liq_violations 0:logger.debug(f 约束3: {liq_violations} 只超过流动性上限已截断)alloc alloc.clip(upperliq_cap.reindex(alloc.index).fillna(float(inf)))# 重新归一化确保总和 capital total alloc.sum()if total 0 and abs(total - capital) / capital 0.01:alloc alloc / total * capitallogger.debug(f 重新归一化: {total:,.0f} → {capital:,.0f})elif total 0:# 全被过滤了 → 等权兜底logger.warning( 所有标的被约束过滤退化为等权)alloc pd.Series(capital / len(raw_allocations), indexraw_allocations.index)return allocdef apply_batch(self,capital: float,allocations_df: pd.DataFrame,amounts_df: Optional[pd.DataFrame] None) - pd.DataFrame:批量施加约束results []for date, group in allocations_df.groupby(date):raw pd.Series(group[allocation].values,indexgroup[code].values)daily_amts Noneif amounts_df is not None:day_data amounts_df[amounts_df[date] date]if len(day_data) 0:daily_amts pd.Series(day_data[amount].values,indexday_data[code].values)constrained self.apply(capital, raw, daily_amts)for code, amount in constrained.items():results.append({date: date,code: code,raw_allocation: raw.get(code, 0),final_allocation: amount,final_pct: amount / capital * 100,cap_pct: min(amount / capital * 100, self.max_pct * 100)})return pd.DataFrame(results)src/incremental_allocator.pyincremental_allocator.py增量分配新资金到来时在现有持仓基础上增量调整import pandas as pdimport numpy as npfrom src.allocator import PositionAllocatorfrom src.constraint_engine import ConstraintEnginefrom typing import Dictimport logginglogger logging.getLogger(__name__)class IncrementalAllocator:★ 增量仓位分配器场景已有持仓 P1, P2, ... Pn新资金 ΔM 到来策略1. 不对原有持仓做任何变动2. 只对 ΔM 执行分配算法3. 叠加到现有持仓上4. 若叠加后超上限 → 溢出重分配def __init__(self,allocator: PositionAllocator,constraint_engine: ConstraintEngine):self.allocator allocatorself.constraint constraint_enginelogger.info(增量分配器初始化)def allocate_incremental(self,existing_positions: Dict[str, float],new_capital: float,scores: pd.Series,daily_amounts: pd.Series None) - Dict[str, float]:★ 核心方法增量分配参数:existing_positions: {code: 当前持仓金额}new_capital: 新增资金元scores: 候选股评分daily_amounts: 日均成交额可选返回:{code: 新增分配金额}if new_capital 0:return {}# 只对新增资金做分配new_alloc self.allocator.allocate(new_capital, scores, None)# 叠加到现有持仓total_positions {}for code in set(list(existing_positions.keys()) list(new_alloc.index)):existing existing_positions.get(code, 0)new new_alloc.get(code, 0)total_positions[code] existing new# 检查约束total_capital sum(existing_positions.values()) new_capitaltotal_series pd.Series(total_positions)constrained self.constraint.apply(total_capital, total_series, daily_amounts)# 计算最终增量可能被约束削减final_incremental {}for code in new_alloc.index:existing existing_positions.get(code, 0)final_total constrained.get(code, 0)incremental max(0, final_total - existing)if incremental 0:final_incremental[code] incrementallogger.info(f增量分配: ¥{new_capital:,.0f} → f{len(final_incremental)} 只标的f实际分配 ¥{sum(final_incremental.values()):,.0f})return final_incrementalsrc/position_backtester.pyposition_backtester.py仓位管理回测对比不同分配算法的资金曲线import pandas as pdimport numpy as npfrom src.allocator import PositionAllocatorfrom src.constraint_engine import ConstraintEnginefrom typing import Dict, Listdef run_position_backtest(scores_df: pd.DataFrame,price_df: pd.DataFrame,total_capital: float 1_000_000,methods: List[str] None,commission_rate: float 0.0003,stamp_tax_rate: float 0.001) - Dict:对比不同仓位分配算法的回测参数:scores_df: 含 date, code, score 列price_df: 含 date, code, close 列用于模拟持仓变化total_capital: 总资金methods: 要对比的算法列表返回:{method: {nav: 净值 Series, allocations: 分配明细}}if methods is None:methods [equal, linear, exponential]results {}for method in methods:alloc PositionAllocator(methodmethod)constraint ConstraintEngine()nav [total_capital]dates []all_allocs []for date, group in scores_df.groupby(date):scores pd.Series(group[score].values,indexgroup[code].values)# 分配weights alloc.allocate(total_capital, scores)weights constraint.apply(total_capital, weights)# 模拟按收盘价计算持仓价值day_prices price_df[price_df[date] date]portfolio_val 0for code, amount in weights.items():px day_prices[day_prices[code] code][close]if len(px) 0 and px.values[0] 0:shares int(amount / px.values[0] / 100) * 100portfolio_val shares * px.values[0]nav.append(portfolio_val)dates.append(date)for code, amount in weights.items():all_allocs.append({date: date,code: code,allocation: amount,allocation_pct: amount / total_capital * 100,method: method})results[method] {nav: pd.Series(nav, index[scores_df[date].iloc[0]] dates),allocations: pd.DataFrame(all_allocs)}return resultsdef calc_position_metrics(nav_dict: Dict) - pd.DataFrame:计算各算法的评价指标records []for method, data in nav_dict.items():nav data[nav].dropna()if len(nav) 2:continueret nav.pct_change().dropna()days (nav.index[-1] - nav.index[0]).daysyrs days / 365.25tot nav.iloc[-1] / nav.iloc[0] - 1ann (1 tot) ** (1 / yrs) - 1 if yrs 0 else 0vol ret.std() * 252 ** 0.5sharpe (ann - 0.025) / vol if vol 0 else 0dd (nav - nav.cummax()) / nav.cummax()records.append({method: method,total_ret_pct: round(tot * 100, 2),本文代码仅供学习与技术交流不构成任何投资建议股市有风险入市需谨慎利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛

相关新闻