分类: Python

  • 股票Garch模型python实现代码-计算结果

    这份输出是一个GARCH(1,1)模型对股票收益率的拟合结果,下面是对关键部分的解释:

    模型概述

    • Dep. Variable: 因变量是log_returns(对数收益率)
    • Mean Model: 使用了常数均值模型(Constant Mean),即假设收益率的均值为常数
    • Vol Model: 使用GARCH(1,1)模型拟合波动率
    • Distribution: 假设收益率服从正态分布
    • No. Observations: 338个观测值

    均值模型参数

                     coef    std err          t      P>|t|  95.0% Conf. Int.
    ------------------------------------------------------------------------
    mu            -0.0335  9.835e-02     -0.340      0.734 [ -0.226,  0.159]
    • mu: 收益率的均值估计值为-0.0335
    • t统计量p值显示该参数在统计上不显著(p=0.734 > 0.05)
    • 置信区间包含0,进一步验证了不显著性

    波动率模型参数

                     coef    std err          t      P>|t|     95.0% Conf. Int.
    ---------------------------------------------------------------------------
    omega          0.5262      0.926      0.569      0.570    [ -1.288,  2.340]
    alpha[1]       0.0877  6.055e-02      1.449      0.147 [-3.095e-02,  0.206]
    beta[1]        0.7641      0.283      2.703  6.873e-03    [  0.210,  1.318]
    • omega: 长期波动率水平参数,估计值0.5262,但不显著(p=0.570)
    • alpha[1]: ARCH项系数,反映过去收益率对当前波动率的影响,估计值0.0877(p=0.147)
    • beta[1]: GARCH项系数,反映过去波动率对当前波动率的影响,估计值0.7641(高度显著)

    模型诊断

    • ARCH+GARCH系数之和: α+β = 0.0877 + 0.7641 = 0.8518,接近1,说明波动率具有较强的持续性
    • 信息准则: AIC=1377.55,BIC=1392.84,可用于模型比较
    • 对数似然值: -684.774,可用于模型比较

    实际意义

    • 该GARCH模型表明,股票收益率的波动率具有明显的聚类效应(β显著)
    • 过去的波动对当前波动有较强的预测能力
    • 虽然ARCH项系数不显著,但仍保留在模型中可能是因为理论上的需要或样本量限制
    • 常数均值模型的不显著性可能暗示股票收益率存在时间序列相关性,需要考虑更复杂的均值模型

    建议

    • 考虑使用t分布替代正态分布,更适合金融收益率的尖峰厚尾特性
    • 检查残差的自相关性和ARCH效应,验证模型拟合效果
    • 考虑加入外部变量或使用EGARCH等非对称模型捕捉杠杆效应

    GARCH Model python代码

    #volatility forcast波动性预测
    #实现DARCH模型的python代码:
    import akshare as ak
    import pandas as pd
    from arch import arch_model
    import matplotlib.pyplot as plt
    
    # 获取股票数据
    def get_stock_data(stock_code, start_date, end_date):
        stock_df = ak.stock_zh_a_hist(symbol=stock_code, period="daily", start_date=start_date, end_date=end_date)
        stock_df['日期'] = pd.to_datetime(stock_df['日期'])
        stock_df.set_index('日期', inplace=True)
        return stock_df
    
    # 计算对数收益率
    def calculate_log_returns(stock_data):
        stock_data['log_returns'] = 100 * stock_data['收盘'].pct_change().dropna()
        return stock_data.dropna()
    
    # 拟合 GARCH 模型
    def fit_garch_model(returns):
        model = arch_model(returns, vol='Garch', p=1, q=1)
        results = model.fit(disp='off')
        return results
    
    # 主函数
    def main():
        stock_code = "000937"  # 股票代码,这里以平安银行为例
        start_date = "20240101"
        end_date = "20250530"
    
        # 获取数据
        stock_data = get_stock_data(stock_code, start_date, end_date)
        # 计算对数收益率
        stock_data = calculate_log_returns(stock_data)
        # 提取对数收益率
        returns = stock_data['log_returns']
    
        # 拟合 GARCH 模型
        results = fit_garch_model(returns)
    
        # 输出模型结果
        print(results.summary())
    
        # 计算历史波动率
        historical_volatility = results.conditional_volatility
    
        # 绘制波动率预测图
        forecasts = results.forecast(horizon=1)
        # 提取最后一个预测方差并转换为标量
        last_forecast_value = forecasts.variance.iloc[-1].values[0]
        # 获取最后一个观测日期的下一天作为预测日期
        forecast_date = returns.index[-1] + pd.DateOffset(days=1)
    
        plt.figure(figsize=(12, 6))
        # 绘制历史波动率
        plt.plot(historical_volatility, label='Historical Volatility')
        # 绘制预测波动率
        plt.plot([forecast_date], [last_forecast_value], marker='o', color='red', label='Forecast Volatility')
        plt.title('GARCH(1,1) Volatility Forecast')
        plt.xlabel('Date')
        plt.ylabel('Volatility')
        plt.grid(True)
        plt.legend()
        plt.show()
    
    if __name__ == "__main__":
        main()
    

    C:\Users\czliu> & “C:/Program Files/Python313/python.exe” c:/Users/czliu/Documents/python/stock_GARCH_model.py

    Constant Mean – GARCH Model Results

    Dep. Variable: log_returns R-squared: 0.000
    Mean Model: Constant Mean Adj. R-squared: 0.000
    Vol Model: GARCH Log-Likelihood: -684.774
    Distribution: Normal AIC: 1377.55
    Method: Maximum Likelihood BIC: 1392.84
    No. Observations: 338
    Date: Fri, May 30 2025 Df Residuals: 337
    Time: 23:15:55 Df Model: 1

    Mean Model

    coef std err t P>|t| 95.0% Conf. Int.

    mu -0.0335 9.835e-02 -0.340 0.734 [ -0.226, 0.159]

    Volatility Model波动性模型

    coef std err t P>|t| 95.0% Conf. Int.

    omega 0.5262 0.926 0.569 0.570 [ -1.288, 2.340]
    alpha[1] 0.0877 6.055e-02 1.449 0.147 [-3.095e-02, 0.206]

    beta[1] 0.7641 0.283 2.703 6.873e-03 [ 0.210, 1.318]

    Covariance estimator: robust

  • 股票组合VaR计算器-蒙特卡洛模拟法

    akshare获取数据

    #股票组合VaR计算器,蒙特卡洛模拟法,akshare获取数据
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import akshare as ak
    from datetime import datetime, timedelta
    import seaborn as sns
    
    # 设置中文显示
    plt.rcParams["font.family"] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'KaiTi']
    plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
    
    class StockVaRCalculator:
        """A股股票组合VaR计算器,使用蒙特卡洛模拟法和akshare获取数据"""
    
        def __init__(self, stock_tickers, weights=None, investment_value=1000000,
                     simulation_days=10, num_simulations=10000, confidence_level=0.99):
            """
            初始化股票组合VaR计算器
    
            参数:
                stock_tickers: 股票代码列表,使用akshare格式(例如:'sh600000' 表示浦发银行)
                weights: 各股票权重,默认为等权重
                investment_value: 投资组合总价值(元)
                simulation_days: 模拟的天数
                num_simulations: 蒙特卡洛模拟次数
                confidence_level: VaR计算的置信水平
            """
            self.stock_tickers = stock_tickers
            self.num_stocks = len(stock_tickers)
    
            # 处理权重
            if weights is None:
                self.weights = np.ones(self.num_stocks) / self.num_stocks
            else:
                self.weights = np.array(weights)
                # 归一化权重
                self.weights = self.weights / np.sum(self.weights)
    
            self.investment_value = investment_value
            self.simulation_days = simulation_days
            self.num_simulations = num_simulations
            self.confidence_level = confidence_level
    
            self.stock_data = None
            self.returns = None
            self.mean_returns = None
            self.cov_matrix = None
            self.simulation_results = None
    
        def fetch_stock_data(self, start_date=None, end_date=None):
            """使用akshare获取股票历史数据"""
            if end_date is None:
                end_date = datetime.now().strftime('%Y%m%d')
            if start_date is None:
                end_date_obj = datetime.strptime(end_date, '%Y%m%d')
                start_date_obj = end_date_obj - timedelta(days=365)
                start_date = start_date_obj.strftime('%Y%m%d')
    
            print(f"正在获取{self.stock_tickers}的历史数据,时间范围:{start_date}至{end_date}")
    
            all_data = pd.DataFrame()
    
            for ticker in self.stock_tickers:
                try:
                    # 使用akshare获取股票日线数据
                    stock_data = ak.stock_zh_a_hist_tx(symbol=ticker,
                                                  start_date=start_date, end_date=end_date,
                                                  adjust="")  # 使用前复权价格
                    '''
                    在使用 akshare 获取 A 股历史行情数据时,可以通过设置adjust参数来获取除权后数据。这个参数支持三种取值:
                    "qfq":前复权(最常用,保持当前价格不变,调整历史价格)
                    "hfq":后复权(保持历史价格不变,调整当前价格)
                    "":不复权(原始价格)
                    '''
    
                    # 重命名列并添加股票代码
                    stock_data = stock_data.rename(columns={'日期': 'date', '收盘': 'close'})
                    stock_data['ticker'] = ticker
                    stock_data = stock_data[['date', 'ticker', 'close']]
    
                    # 转换日期格式
                    stock_data['date'] = pd.to_datetime(stock_data['date'])
    
                    # 添加到总数据中
                    if all_data.empty:
                        all_data = stock_data
                    else:
                        all_data = pd.concat([all_data, stock_data], ignore_index=True)
    
                    print(f"成功获取{ticker}的{len(stock_data)}天数据")
    
                except Exception as e:
                    print(f"获取{ticker}数据失败: {e}")
    
            # 重塑数据为宽格式
            self.stock_data = all_data.pivot(index='date', columns='ticker', values='close')
    
            # 检查数据完整性
            if self.stock_data.isnull().any().any():
                print("警告:数据包含缺失值,正在进行前向填充")
                self.stock_data = self.stock_data.fillna(method='ffill')
            # 打印最后一天的股票价格(确认是否合理)
            print("最后一天股票价格:")
            print(var_calculator.stock_data.iloc[-1])
            print(f"数据处理完成,最终数据集包含{len(self.stock_data)}天数据")
    
    
            # 验证价格合理性(示例:假设A股价格通常<500元)
            if (self.stock_data > 500).any().any():
                print("警告:检测到异常高价,可能数据单位错误,尝试除以10000")
                self.stock_data = self.stock_data / 10000  # 假设单位为"万元"
                print(f"**修正后最后一天价格: {self.stock_data.iloc[-1]}")
    
            return self.stock_data
    
        def calculate_returns(self):
            """计算股票收益率"""
            if self.stock_data is None:
                raise ValueError("请先获取股票数据")
    
            # 计算对数收益率
            self.returns = np.log(self.stock_data / self.stock_data.shift(1)).dropna()
    
            # 新增:打印收益率极值(正常应在±10%以内)
            print("收益率极值检测:")
            print(f"最小日收益率: {self.returns.min().min():.2%}")
            print(f"最大日收益率: {self.returns.max().max():.2%}")
            # 计算均值和协方差矩阵
            self.mean_returns = self.returns.mean()
            self.cov_matrix = self.returns.cov()
    
            return self.returns
    
        def run_monte_carlo_simulation(self):
            """运行蒙特卡洛模拟"""
            if self.returns is None:
                raise ValueError("请先计算收益率")
    
            print(f"正在运行蒙特卡洛模拟,模拟次数:{self.num_simulations},预测天数:{self.simulation_days}")
    
            # 计算日均值和协方差矩阵
            daily_mean = self.mean_returns
            daily_cov = self.cov_matrix
    
            # 调整portfolio_sims的维度顺序为 [股票数, 模拟次数, 天数]
            portfolio_sims = np.full(shape=(self.num_stocks, self.num_simulations, self.simulation_days),
                                    fill_value=0.0)
    
            initial_prices = self.stock_data.iloc[-1].values  # 使用最后一天的价格作为初始价格
    
            for s in range(self.num_simulations):
                # 生成相关的随机收益率
                daily_returns = np.random.multivariate_normal(
                    daily_mean,
                    daily_cov,
                    self.simulation_days
                ).T  # [股票数, 天数]
    
                # 计算累积收益率(注意:这里使用指数函数将对数收益率转换回价格比率)
                cumulative_returns = np.exp(np.cumsum(daily_returns, axis=1))
    
                # 计算模拟价格路径
                portfolio_sims[:, s, :] = initial_prices.reshape(-1, 1) * cumulative_returns
    
            self.simulation_results = portfolio_sims
            return portfolio_sims
    
        def calculate_portfolio_var(self):
            """计算投资组合的VaR"""
            if self.simulation_results is None:
                raise ValueError("请先运行蒙特卡洛模拟")
    
            # 计算模拟结束时的投资组合价值
            final_values = np.zeros(self.num_simulations)
    
            for s in range(self.num_simulations):
                # 获取所有股票在最后一天的价格 [num_stocks]
                stock_values = self.simulation_results[:, s, -1]
    
                # 计算每只股票的价值 = 权重 * 初始投资 * 模拟价格/初始价格
                portfolio_value = np.sum(self.weights * self.investment_value * stock_values /
                                        self.stock_data.iloc[-1].values)
                final_values[s] = portfolio_value
    
            # 计算投资组合价值变化
            portfolio_changes = final_values - self.investment_value
    
            # 计算VaR
            var_percentile = 100 * (1 - self.confidence_level)
            var = -np.percentile(portfolio_changes, var_percentile)
    
            # 计算CVaR (条件VaR)
            cvar_mask = portfolio_changes <= -var
            cvar = -np.mean(portfolio_changes[cvar_mask]) if np.sum(cvar_mask) > 0 else 0
    
            # 新增:打印模拟结果统计
            print("\n模拟结果统计:")
            print(f"初始投资组合价值: {self.investment_value:,}元")
            print(f"模拟后组合价值范围: 最小值={np.min(final_values):,.2f}元, 最大值={np.max(final_values):,.2f}元")
            print(f"组合价值变化范围: 最小值={np.min(portfolio_changes):,.2f}元, 最大值={np.max(portfolio_changes):,.2f}元")
    
            return {
                'VaR': var,
                'CVaR': cvar,
                'VaR_percentage': var / self.investment_value * 100,
                'CVaR_percentage': cvar / self.investment_value * 100,
                'portfolio_changes': portfolio_changes,
                'final_values': final_values
            }
    
        def plot_simulation_results(self, var_results):
            """可视化模拟结果"""
            plt.figure(figsize=(16, 10))
    
            # 1. 绘制模拟价格路径 - 修正维度顺序
            plt.subplot(2, 2, 1)
            for i, ticker in enumerate(self.stock_tickers):
                plt.plot(self.simulation_results[i, :, :].T, alpha=0.1)  # 转置以正确显示时间轴
                plt.title(f"{ticker} 价格模拟路径")
                plt.xlabel("天数")
                plt.ylabel("价格 (元)")
    
            # 2. 绘制投资组合价值分布
            plt.subplot(2, 2, 2)
            sns.histplot(var_results['final_values'], bins=50, kde=True)
            plt.axvline(x=self.investment_value, color='r', linestyle='--', label='初始价值')
            plt.axvline(x=self.investment_value - var_results['VaR'], color='g', linestyle='--', label=f'VaR@{self.confidence_level}')
            plt.title('投资组合价值分布')
            plt.xlabel('组合价值 (元)')
            plt.ylabel('频率')
            plt.legend()
    
            # 3. 绘制投资组合收益分布
            plt.subplot(2, 2, 3)
            sns.histplot(var_results['portfolio_changes'], bins=50, kde=True)
            var_percentile = 100 * (1 - self.confidence_level)
            var_value = -var_results['VaR']
            plt.axvline(x=0, color='r', linestyle='--', label='无收益')
            plt.axvline(x=var_value, color='g', linestyle='--', label=f'VaR@{self.confidence_level}')
            plt.title('投资组合收益分布')
            plt.xlabel('收益 (元)')
            plt.ylabel('频率')
            plt.legend()
    
            # 4. 绘制风险因子热图
            plt.subplot(2, 2, 4)
            sns.heatmap(self.cov_matrix, annot=True, cmap='coolwarm', fmt='g')
            plt.title('股票收益率协方差矩阵')
    
            plt.tight_layout()
            plt.show()
    
        def calculate_risk_contribution(self, var_results):
            """计算各股票对投资组合风险的贡献"""
            if self.simulation_results is None:
                raise ValueError("请先运行蒙特卡洛模拟")
    
            # 计算每只股票的VaR贡献
            risk_contributions = {}
    
            for i, ticker in enumerate(self.stock_tickers):
                # 计算该股票的权重和波动率
                weight = self.weights[i]
                volatility = np.sqrt(self.cov_matrix.iloc[i, i])
    
                # 计算边际VaR (假设正态分布)
                portfolio_volatility = np.sqrt(np.dot(self.weights, np.dot(self.cov_matrix, self.weights)))
                marginal_var = (weight * volatility * self.cov_matrix.iloc[i, :].dot(self.weights)) / (portfolio_volatility ** 2)
    
                # 计算成分VaR
                component_var = marginal_var * var_results['VaR']
    
                # 计算百分比贡献
                pct_contribution = (component_var / var_results['VaR']) * 100
    
                risk_contributions[ticker] = {
                    'weight': weight,
                    'volatility': volatility,
                    'marginal_var': marginal_var,
                    'component_var': component_var,
                    'pct_contribution': pct_contribution
                }
    
            return risk_contributions
    
    # 使用示例
    if __name__ == "__main__":
        # 设置要分析的股票代码(使用akshare格式)
        #stock_tickers = ['sh600000', 'sz000001', 'sh601318', 'sz000858', 'sh600519']  # 浦发银行、平安银行、中国平安、五粮液、贵州茅台
        stock_tickers = ['sh600938', f'sz000937', 'sh601288', 'sz002555', f'sz000538']  # 浦发银行、平安银行、中国平安、五粮液、贵州茅台
    
        # 设置权重(如果不设置,默认为等权重)
        #weights = [0.2, 0.2, 0.2, 0.2, 0.2]
        weights = [0.4,0.2,0.2,0.1,0.1]
        '''
        股票	    代码	    持仓	    持股比例
        中国海油	600938	 ¥89,775.00 	0.399
        农业银行	601288	 ¥48,276.00 	0.215
        云南白药	000538	 ¥34,020.00 	0.151
        冀中能源	000937	 ¥52,806.00 	0.235
        三七互娱	002555	 ¥14,650.00 	0.065
    
        '''
    
        # 初始化计算器
        var_calculator = StockVaRCalculator(
            stock_tickers=stock_tickers,
            weights=weights,
            investment_value=1000000,  # 100万元投资组合
            simulation_days=10,        # 预测10天
            num_simulations=10000,     # 10000次模拟
            confidence_level=0.99      # 99%置信水平
        )
    
        # 获取数据
        var_calculator.fetch_stock_data()
    
        # 计算收益率
        var_calculator.calculate_returns()
    
        # 运行蒙特卡洛模拟
        var_calculator.run_monte_carlo_simulation()
    
        # 计算VaR
        var_results = var_calculator.calculate_portfolio_var()
    
        # 计算风险贡献
        risk_contributions = var_calculator.calculate_risk_contribution(var_results)
    
        # 打印收益率统计特征(正常A股日收益率均值约0.03%,标准差约2%)
        print("收益率统计:")
        print(var_calculator.returns.describe())
        # 打印结果
        print("\n===== 风险价值计算结果 =====")
        print(f"投资组合总价值: {var_calculator.investment_value:,}元")
        print(f"{var_calculator.simulation_days}天持有期,{var_calculator.confidence_level*100}%置信水平下:")
        print(f"VaR: {var_results['VaR']:,.2f}元 ({var_results['VaR_percentage']:.2f}%)")
        print(f"CVaR: {var_results['CVaR']:,.2f}元 ({var_results['CVaR_percentage']:.2f}%)")
    
        print("\n===== 各股票风险贡献 =====")
        for ticker, contrib in risk_contributions.items():
            print(f"{ticker}: 权重={contrib['weight']:.2%}, 风险贡献={contrib['pct_contribution']:.2f}%")
    
        # 可视化结果
        var_calculator.plot_simulation_results(var_results)
    
        '''
    成功获取sh600938的243天数据
    成功获取sz000937的243天数据
    成功获取sh601288的243天数据
    成功获取sz002555的243天数据
    成功获取sz000538的243天数据
    最后一天股票价格:
    ticker
    sh600938    25.79
    sh601288     5.53
    sz000538    56.48
    sz000937     6.70
    sz002555    14.67
    Name: 2025-05-30 00:00:00, dtype: float64
    数据处理完成,最终数据集包含243天数据
    收益率极值检测:
    最小日收益率: -10.46%
    最大日收益率: 9.55%
    正在运行蒙特卡洛模拟,模拟次数:10000,预测天数:10
    
    模拟结果统计:
    初始投资组合价值: 1,000,000元
    模拟后组合价值范围: 最小值=855,834.29元, 最大值=1,143,978.61元
    组合价值变化范围: 最小值=-144,165.71元, 最大值=143,978.61元
    收益率统计:
    ticker    sh600938    sh601288    sz000538    sz000937    sz002555
    count   242.000000  242.000000  242.000000  242.000000  242.000000
    mean     -0.000572    0.000954    0.000252   -0.000769    0.000091
    std       0.019611    0.013706    0.013784    0.018998    0.025516
    min      -0.100268   -0.047677   -0.076738   -0.086876   -0.104602
    25%      -0.009177   -0.007230   -0.006349   -0.009977   -0.015922
    50%       0.000350    0.002075    0.000086   -0.001723   -0.000382
    75%       0.008776    0.008864    0.006127    0.008332    0.013346
    max       0.078906    0.040574    0.077164    0.074801    0.095478
    
    ===== 风险价值计算结果 =====
    投资组合总价值: 1,000,000元
    10天持有期,99.0%置信水平下:
    VaR: 87,123.09元 (8.71%)
    CVaR: 99,360.77元 (9.94%)
    
    ===== 各股票风险贡献 =====
    sh600938: 权重=40.00%, 风险贡献=1.08%
    sz000937: 权重=20.00%, 风险贡献=0.14%
    sh601288: 权重=20.00%, 风险贡献=0.20%
    sz002555: 权重=10.00%, 风险贡献=0.20%
    sz000538: 权重=10.00%, 风险贡献=0.24%
        '''
  • pythond的facker库生成测试用伪造数据

    [TOC] {.toc max_depth=2}

    faker 是一个用于生成伪造数据的 Python 库,非常适合用于测试、填充数据库、生成样例数据等场景。下面介绍它的基本用法和常见功能。

    1. 安装 Faker

    使用 pip 安装:

    pip install faker

    2. 基本使用流程

    1. 创建 Faker 实例: from faker import Faker
      fake = Faker() # 默认生成英文数据
    2. 生成伪造数据: print(fake.name()) # 随机姓名
      print(fake.address()) # 随机地址
      print(fake.email()) # 随机邮箱
      print(fake.phone_number()) # 随机电话号码

    3. 常用数据类型

    3.1 个人信息

    fake.name()          # 姓名:'Dr. Elizabeth Johnson'
    fake.first_name()    # 名字:'Jason'
    fake.last_name()     # 姓氏:'Williams'
    fake.email()         # 邮箱:'jason.williams@example.com'
    fake.phone_number()  # 电话:'(986) 337-3401'
    fake.ssn()           # 社保号:'642-91-6290'

    3.2 地址信息

    fake.address()       # 完整地址(含换行)
    fake.street_address()  # 街道地址:'9119 Michelle Tunnel'
    fake.city()          # 城市:'North Matthew'
    fake.state()         # 州:'New York'
    fake.country()       # 国家:'China'
    fake.postcode()      # 邮编:'100001'

    3.3 文本数据

    fake.text()          # 段落文本
    fake.sentence()      # 单句:'Provident cupiditate voluptatem.'
    fake.word()          # 单词:'explicabo'
    fake.paragraphs(3)   # 3 个段落的列表

    3.4 网络与金融

    fake.url()           # URL:'https://www.evans.com/'
    fake.ipv4()          # IP 地址:'192.168.1.1'
    fake.credit_card_number()  # 信用卡号:'4532 3090 8290 3028'

    3.5 日期与时间

    fake.date_of_birth(minimum_age=18, maximum_age=90)  # 生日:'1985-03-12'
    fake.date_this_decade()  # 近十年内的日期
    fake.time()              # 时间:'14:30:25'

    4. 本地化支持

    生成特定语言/地区的数据:

    fake = Faker('zh_CN')  # 中文数据
    print(fake.name())     # 姓名:'张伟'
    print(fake.address())  # 地址:'江苏省苏州市虎丘区长安路e座 215000'
    
    fake = Faker('en_US')  # 美国英语数据
    fake = Faker('ja_JP')  # 日本语数据

    5. 生成唯一数据

    使用 unique 确保生成的数据不重复:

    unique_names = [fake.unique.name() for _ in range(5)]
    print(unique_names)  # 5 个不同的姓名

    6. 生成批量数据

    结合循环生成多条数据,例如创建用户列表:

    users = []
    for _ in range(3):
        user = {
            'name': fake.name(),
            'email': fake.email(),
            'address': fake.address().replace('\n', ', '),
            'phone': fake.phone_number()
        }
        users.append(user)
    
    # 打印结果
    for user in users:
        print(f"{user['name']} | {user['email']} | {user['address']}")

    7. 自定义 Provider

    扩展 Faker 以生成特定领域的数据:

    from faker import Faker
    from faker.providers import BaseProvider
    
    fake = Faker()
    
    # 创建自定义 Provider
    class MyProvider(BaseProvider):
        def game_console(self):
            consoles = ['PlayStation 5', 'Xbox Series X', 'Nintendo Switch']
            return self.random_element(consoles)
    
    # 添加到 Faker 实例
    fake.add_provider(MyProvider)
    
    # 使用自定义 Provider
    print(fake.game_console())  # 输出:'Xbox Series X'

    8. 高级用法

    • 种子固定:使用 seed() 确保生成的随机数据可重现。 fake.seed(42)
      print(fake.name()) # 每次运行结果相同
    • 随机元素选择: fake.random_element([‘apple’, ‘banana’, ‘cherry’]) # 随机选择一个

    总结

    faker 库功能强大且易于使用,适合各种测试和数据生成场景。通过本地化和自定义 Provider,还能满足特定需求。官方文档提供了更详细的用法:Faker Documentation

    #`faker` 是一个用于生成伪造数据的 #python 库,非常适合用于测试、填充数据库、
    # 生成样例数据等场景。下面介绍它的基本用法和常见功能。
    
    -----------------------
    
    ### **1. 安装 Faker**
    # 使用 `pip` 安装:
    # bash
    # pip install faker
    #
    ### **2. 基本使用流程**
    # 1. **创建 Faker 实例**:
    #    #python
    from faker import Faker
    fake = Faker()  # 默认生成英文数据
    
    print("--------**1. 创建 Faker 实例**”----------")
    print("--------**2. 生成伪造数据**”----------")
    # 2. **生成伪造数据**:
    #    #python
    print(fake.name())       # 随机姓名
    print(fake.address())    # 随机地址
    print(fake.email())      # 随机邮箱
    print(fake.phone_number())  # 随机电话号码
    print(fake.text())       # 随机文本段落
    
    print("--------**3. 常用数据类型**”----------")
    
    ### **3. 常用数据类型**”
    #### **3.1 个人信息**
    # #python
    print(fake.name() ,         # 姓名:'Dr. Elizabeth Johnson'
    fake.first_name(),    # 名字:'Jason'
    fake.last_name(),     # 姓氏:'Williams'
    fake.email() ,        # 邮箱:'jason.williams@example.com'
    fake.phone_number(),  # 电话:'(986) 337-3401'
    fake.ssn() ,          # 社保号:'642-91-6290'
    fake.job() ,end="  ")          # 职业:'Software Engineer'
    
    print("--------3.2 地址信息*----------")
    #### **3.2 地址信息**
    
    # #python
    print(
    fake.address(),       # 完整地址(含换行)
    fake.street_address(),  # 街道地址:'9119 Michelle Tunnel'
    fake.city() ,         # 城市:'North Matthew'
    fake.state() ,        # 州:'New York'
    fake.country() ,      # 国家:'China'
    fake.postcode()  )    # 邮编:'100001'
    print("-------**3.3 文本数据**-----------")
    #### **3.3 文本数据**
    # #python
    print(fake.text() ,         # 段落文本
    fake.sentence() ,     # 单句:'Provident cupiditate voluptatem.'
    fake.word() ,         # 单词:'explicabo'
    fake.paragraphs(3)  ) # 3 个段落的列表
    
    print("-------3.4 网络与金融**-----------")
    #### **3.4 网络与金融**
    #python
    print(fake.url(),           # URL:'https://www.evans.com/'
    fake.ipv4(),          # IP 地址:'192.168.1.1'
    fake.credit_card_number() ) # 信用卡号:'4532 3090 8290 3028'
    print("---------**3.5 日期与时间**---------")
    
    #### **3.5 日期与时间**
    #python
    print(fake.date_of_birth(minimum_age=18, maximum_age=90) ,  # 生日:'1985-03-12'
    fake.date_this_decade(),  # 近十年内的日期
    fake.time()  )            # 时间:'14:30:25'
    
    print("--------### **4. 本地化支持**----------")
    ### **4. 本地化支持**
    #生成特定语言/地区的数据:
    #python
    fake = Faker('zh_CN')  # 中文数据
    print(fake.name())     # 姓名:'张伟'
    print(fake.address())  # 地址:'江苏省苏州市虎丘区长安路e座 215000'
    print("-------日语伪造信息-----------")
    
    #fake = Faker('en_US')  # 美国英语数据
    fake = Faker('ja_JP')  # 日本语数据
    
    print("------### **5. 生成唯一数据**------------")
    
    
    ### **5. 生成唯一数据**
    ##使用 `unique` 确保生成的数据不重复:
    #python
    unique_names = [fake.unique.name() for _ in range(5)]
    print(unique_names)  # 5 个不同的姓名
    
    print("-------### **6. 生成批量数据** for循环-----------")
    
    ### **6. 生成批量数据**
    #结合循环生成多条数据,例如创建用户列表:
    #python
    users = []
    for _ in range(3):
        user = {
            'name': fake.name(),
            'email': fake.email(),
            'address': fake.address().replace('\n', ', '),
            'phone': fake.phone_number()
        }
        users.append(user)
    
    # 打印结果
    for user in users:
        print(f"{user['name']} | {user['email']} | {user['address']}")
        print("------------------")
    
    ### **7. 自定义 Provider**
    #扩展 Faker 以生成特定领域的数据:
    #python
    from faker import Faker
    from faker.providers import BaseProvider
    
    fake = Faker()
    
    # 创建自定义 Provider
    class MyProvider(BaseProvider):
        def game_console(self):
            consoles = ['PlayStation 5', 'Xbox Series X', 'Nintendo Switch']
            return self.random_element(consoles)
    
    # 添加到 Faker 实例
    fake.add_provider(MyProvider)
    
    # 使用自定义 Provider
    print(fake.game_console())  # 输出:'Xbox Series X'
    print("------------------")
    
    
    ### **8. 高级用法**
    #- **种子固定**:使用 `seed()` 确保生成的随机数据可重现。
    #python
    fake.seed_instance(42)
    print(fake.name())  # 每次运行结果相同
    print("------------------")
    
    #- **随机元素选择**:
    #python
    fake.random_element(['apple', 'banana', 'cherry'])  # 随机选择一个
    
    
    
    ### **总结**
    #`faker` 库功能强大且易于使用,适合各种测试和数据生成场景。通过本地化和自定义 Provider,
    # 还能满足特定需求。官方文档提供了更详细的用法:[Faker Documentation](https://faker.readthedocs.io/)。
  • 中国大陆使用 Backtrader 分析股票

    在中国大陆使用 Backtrader 分析股票时,你可以通过 Tushare 或 Baostock 获取免费的股票数据。下面是一个完整的 Python 脚本,演示了如何使用 Backtrader 进行股票分析和回测。— tushare and skdhare 都调通了

    import backtrader as bt
    import pandas as pd
    import tushare as ts
    import datetime
    import akshare as ak
    import matplotlib.pyplot as plt
    from matplotlib.font_manager import FontProperties
    
    # Set Chinese display, use Microsoft YaHei to avoid missing glyphs
    plt.rcParams["font.family"] = ["Microsoft YaHei"]
    
    # 设置 Tushare token(需要在 Tushare 官网注册获取)
    ts.set_token('866b8c22a26277af04f65c453d61300c740217a4ee2e57b33c7cd199')
    pro = ts.pro_api()
    
    class ChinaStockAnalyzer:
        def __init__(self, stock_code, start_date, end_date, initial_cash=00000):
            """
            初始化股票分析器
    
            参数:
                stock_code: 股票代码,格式为 '000001.SZ' 或 '600000.SH'
                start_date: 开始日期,格式为 '2020-01-01'
                end_date: 结束日期,格式为 '2021-01-01'
                initial_cash: 初始资金
            """
            self.stock_code = stock_code
            self.start_date = start_date
            self.end_date = end_date
            self.initial_cash = initial_cash
            self.cerebro = bt.Cerebro()
            self.cerebro.broker.set_cash(initial_cash)
            # 设置手续费为万三
            self.cerebro.broker.setcommission(commission=0.0003)
    
        def get_data(self, source='tushare'):
            """获取股票数据"""
            if source == 'tushare':
                # 使用 Tushare 获取数据
                df = pro.daily(ts_code=self.stock_code, start_date=self.start_date.replace('-', ''),
                              end_date=self.end_date.replace('-', ''))
                # 按日期升序排列
                df = df.sort_values('trade_date')
                # 转换日期格式
                df['trade_date'] = pd.to_datetime(df['trade_date'])
                # 重命名列名以符合 Backtrader 的要求
                df = df.rename(columns={
                    'trade_date': 'datetime',
                    'open': 'open',
                    'high': 'high',
                    'low': 'low',
                    'close': 'close',
                    'vol': 'volume',
                    'amount':  'amount'
                })
                # 设置日期为索引
                df = df.set_index('datetime')
                # 选择 Backtrader 需要的列
                df = df[['open', 'high', 'low', 'close', 'volume', 'amount']]
    
            elif source == 'akshare':
                # 转换股票代码为akshare格式
                symbol = self.stock_code
                if symbol.endswith('.SH'):
                    symbol = 'sh' + symbol[:6]
                elif symbol.endswith('.SZ'):
                    symbol = 'sz' + symbol[:6]
                # Use akshare to get data
                stock_zh_a_hist_df = ak.stock_zh_a_hist_tx(
                    symbol=symbol,
                    start_date=self.start_date,
                    end_date=self.end_date,
                    adjust=""
                )
                # 按实际英文列名重命名
                df = stock_zh_a_hist_df.rename(columns={
                    'date': 'datetime',
                    'open': 'open',
                    'high': 'high',
                    'low': 'low',
                    'close': 'close',
                    'amount': 'amount'
                })
                df['datetime'] = pd.to_datetime(df['datetime'])
                df = df.set_index('datetime')
                # 若无volume列则用amount填充
                if 'volume' not in df.columns:
                    df['volume'] = df['amount']
                # 调整列顺序
                df = df[['open', 'high', 'low', 'close', 'volume', 'amount']]
    
            else:
                raise ValueError("数据源必须是 'tushare' 或 'akshare'")
    
            return df
    
        def add_data(self, source='tushare'):
            """将数据添加到 Backtrader"""
            df = self.get_data(source)
            data = bt.feeds.PandasData(dataname=df)
            self.cerebro.adddata(data)
    
        def add_strategy(self, strategy_class, **kwargs):
            """添加交易策略"""
            self.cerebro.addstrategy(strategy_class, **kwargs)
    
        def add_analyzers(self):
            """添加分析器"""
            self.cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
            self.cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
            self.cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
            self.cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trade_analyzer')
    
        def run(self):
            """运行回测"""
            self.add_analyzers()
            results = self.cerebro.run()
            self.strategy = results[0]
            return results
    
        def plot(self):
            """绘制回测结果"""
            self.cerebro.plot(style='candlestick', barup='red', bardown='green')
    
        def print_results(self):
            """打印回测结果"""
            print(f"初始资金: {self.initial_cash:.2f}")
            print(f"最终资金: {self.cerebro.broker.getvalue():.2f}")
            print(f"总收益率: {(self.cerebro.broker.getvalue() / self.initial_cash - 1) * 100:.2f}%")
            print(f"夏普比率: {self.strategy.analyzers.sharpe.get_analysis()['sharperatio']:.3f}")
            print(f"年化收益率: {self.strategy.analyzers.returns.get_analysis()['rnorm100']:.2f}%")
            print(f"最大回撤: {self.strategy.analyzers.drawdown.get_analysis()['max']['drawdown']:.2f}%")
    
            # 打印交易分析
            trade_analysis = self.strategy.analyzers.trade_analyzer.get_analysis()
            print("\n交易分析:")
            print(f"总交易次数: {trade_analysis.total.closed}")
            print(f"盈利交易次数: {trade_analysis.won.total}")
            print(f"亏损交易次数: {trade_analysis.lost.total}")
            print(f"胜率: {trade_analysis.won.total / trade_analysis.total.closed * 100:.2f}%")
            print(f"平均盈利: {trade_analysis.won.pnl.average:.2f}")
            print(f"平均亏损: {trade_analysis.lost.pnl.average:.2f}")
            print(f"盈亏比: {-trade_analysis.won.pnl.average / trade_analysis.lost.pnl.average:.2f}")
    
    # 示例策略:双均线策略
    class SmaCross(bt.Strategy):
        params = (
            ('fast', 5),  # 短期均线周期
            ('slow', 20),  # 长期均线周期
            ('printlog', False),  # 是否打印交易日志
        )
    
        def __init__(self):
            # 初始化技术指标
            self.dataclose = self.datas[0].close
            self.fast_sma = bt.indicators.SMA(self.dataclose, period=self.params.fast)
            self.slow_sma = bt.indicators.SMA(self.dataclose, period=self.params.slow)
            self.crossover = bt.indicators.CrossOver(self.fast_sma, self.slow_sma)
    
        def next(self):
            # 如果没有持仓
            if not self.position:
                # 快线上穿慢线,买入
                if self.crossover > 0:
                    size = int(self.broker.getcash() / self.dataclose[0] * 0.95)  # 使用95%的资金买入
                    self.buy(size=size)
                    if self.params.printlog:
                        print(f'买入信号: 价格={self.dataclose[0]:.2f}, 数量={size}')
            # 有持仓
            else:
                # 快线下穿慢线,卖出
                if self.crossover < 0:
                    self.close()
                    if self.params.printlog:
                        print(f'卖出信号: 价格={self.dataclose[0]:.2f}')
    
        def log(self, txt, dt=None, doprint=False):
            """日志函数"""
            if self.params.printlog or doprint:
                dt = dt or self.datas[0].datetime.date(0)
                print(f'{dt.isoformat()} {txt}')
    
        def notify_order(self, order):
            if order.status in [order.Submitted, order.Accepted]:
                # 订单提交或接受
                return
    
            # 检查订单是否完成
            if order.status in [order.Completed]:
                if order.isbuy():
                    self.log(f'买入执行: 价格={order.executed.price:.2f}, 成本={order.executed.value:.2f}, 手续费={order.executed.comm:.2f}')
                else:  # 卖出
                    self.log(f'卖出执行: 价格={order.executed.price:.2f}, 成本={order.executed.value:.2f}, 手续费={order.executed.comm:.2f}')
    
            elif order.status in [order.Canceled, order.Margin, order.Rejected]:
                self.log('订单取消/保证金不足/拒绝')
    
            # 记录无挂单
            self.order = None
    
        def notify_trade(self, trade):
            if not trade.isclosed:
                return
    
            self.log(f'交易利润, 毛利润={trade.pnl:.2f}, 净利润={trade.pnlcomm:.2f}')
    
    if __name__ == "__main__":
        # 初始化分析器
        analyzer = ChinaStockAnalyzer(
            stock_code=f'601288.SH',  # tushare格式为 '601288.SH',akshare格式为 'sh601288','sz000001'为上证指数
            start_date='20230101',
            end_date='20250523',
            initial_cash=250000
        )
    
        # 添加数据
        analyzer.add_data(source='tushare')
        #analyzer.add_data(source='akshare')
        # 添加策略
        analyzer.add_strategy(SmaCross, fast=5, slow=20)
    
        # 运行回测
        results = analyzer.run()
    
        # 打印结果
        analyzer.print_results()
    
        # 绘制结果
        analyzer.plot()

    这个脚本提供了一个完整的框架,用于分析中国 A 股市场的股票。主要功能包括:

    1. 数据获取:支持使用 Tushare 或 akshare 获取股票数据
    2. 双均线策略:实现了基于短期和长期移动平均线交叉的交易策略
    3. 绩效分析:计算夏普比率、年化收益率、最大回撤等关键指标
    4. 交易分析:统计交易次数、胜率、盈亏比等
    5. 可视化:绘制股票价格和回测结果图表

    使用前请确保安装了所需的库:pip install backtrader tushare akshare matplotlib pandas,并在 Tushare 官网注册获取 token。你可以通过修改参数来分析不同的股票和时间段,也可以扩展策略类来实现自定义的交易策略。

    c:/Users/liumt/Documents/backtrader_chinese_stocks.py
    初始资金: 100000.00
    最终资金: 131331.44
    总收益率: 31.33%
    夏普比率: 0.583
    年化收益率: 12.66%
    最大回撤: 23.12%

    交易分析:
    总交易次数: 17
    盈利交易次数: 10
    亏损交易次数: 7
    胜率: 58.82%
    平均盈利: 6853.56
    平均亏损: -5570.25
    盈亏比: 1.23

    这些输出结果是基于双均线策略(5 日和 20 日均线)对贵州茅台(600519.SH)在 2020 年 1 月 1 日至 2023 年 1 月 1 日期间的回测分析。下面是关键指标的详细解释:
    核心绩效指标
    初始资金与最终资金
    初始资金: 100,000.00 元
    最终资金: 131,331.44 元
    结论: 策略在三年间实现了 31.33% 的总收益,表现优于持有现金。
    收益率与风险指标
    总收益率: 31.33%
    三年累计收益,未考虑时间价值。
    年化收益率: 12.66%
    将总收益按复利方式换算为每年的平均收益,便于跨周期比较。
    夏普比率: 0.583
    每承担 1 单位风险可获得的超额回报(相对于无风险利率)。通常认为夏普比率 > 1 为优秀,此处表明策略风险调整后收益一般。
    最大回撤: 23.12%
    策略在测试期间的最大亏损幅度(从高点到低点的最大跌幅)。这意味着投资者可能经历过 23.12% 的账面亏损,需评估自身风险承受能力。
    交易分析指标
    交易频率与胜率
    总交易次数: 17 笔
    三年间平均每年交易约 6 次,属于中低频交易。
    胜率: 58.82%(10 胜 / 7 负)
    超过半数的交易盈利,但需结合盈亏比综合评估。
    盈亏比分析
    平均盈利: 6,853.56 元
    平均亏损: -5,570.25 元
    盈亏比: 1.23
    平均每笔盈利是亏损的 1.23 倍,表明策略在 “大赚小赔” 方面表现一般(理想情况应 > 2)。
    综合评估
    优势:策略实现了正收益,年化 12.66% 优于多数稳健理财,但需承担 23.12% 的最大回撤风险。
    改进方向:
    优化参数(如调整均线周期)提高夏普比率和盈亏比。
    增加止损机制控制单笔亏损幅度。
    考虑组合投资,分散单一股票风险。
    注意事项
    回测陷阱:历史表现不代表未来,实际交易中可能面临滑点、流动性不足等问题。
    市场适应性:双均线策略在趋势明显的市场表现较好,但在震荡市可能频繁止损。
    风险提示:23.12% 的最大回撤意味着 10 万元本金可能亏损至 7.69 万元,需谨慎使用杠杆。
    如需进一步优化策略,可以尝试调整参数、更换股票或引入多因子模型。

  • python股票股价、正态分布检验,绘制直方图和正态分布曲线

    #股价移动平均线系统,股价正态分布检验,绘制直方图和正态分布曲线
    import akshare as ak
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import matplotlib.font_manager as fm
    from scipy import stats
    
    # 设置字体
    font_path = 'C:/Windows/Fonts/simhei.ttf'  # 黑体字体路径,根据实际情况修改
    my_font = fm.FontProperties(fname=font_path)
    plt.rcParams['font.family'] = my_font.get_name()
    # 解决负号显示问题
    plt.rcParams['axes.unicode_minus'] = False
    
    # Get Kweichow Moutai data
    stock_code = "600938"  # 贵州茅台
    start_date = "20241201"
    end_date = "20250523"
    # ma_periods=[5, 10, 20, 30]
    df = ak.stock_zh_a_hist(symbol=stock_code, period="daily", start_date=start_date, end_date=end_date, adjust="qfq")
    df['日期'] = pd.to_datetime(df['日期'])
    df.set_index('日期', inplace=True)
    # Check if data retrieval failed or the DataFrame is empty
    if df is None or df.empty:
        print("Failed to retrieve data or data is empty")
        # Exit the program early if data retrieval fails
        quit()
    
    print(df.tail())
    print(df.columns)
    print(df['收盘'].describe())
    
    # Calculate moving averages
    windows = [5, 10, 20, 60, 120]
    for w in windows:
        df[f'MA{w}'] = df['收盘'].rolling(window=w).mean()
    
    df.fillna(method='ffill', inplace=True)
    
    # Create a canvas
    plt.figure(figsize=(12, 6))
    
    # Plot the price and moving averages
    plt.plot(df.index, df['收盘'], label='收盘价', alpha=0.5)
    plt.plot(df.index, df['MA5'], label='5日均线', linestyle='--', linewidth=1)
    plt.plot(df.index, df['MA10'], label='10日均线', linestyle=':', linewidth=1.5)
    plt.plot(df.index, df['MA20'], label='20日均线', color='purple')
    plt.plot(df.index, df['MA60'], label='60日均线', color='orange')
    plt.plot(df.index, df['MA120'], label='120日均线', color='red')
    
    # Add chart elements
    plt.title(f'{stock_code} 股票价格移动平均线系统')
    plt.xlabel('日期')
    plt.ylabel('价格')
    plt.legend(loc='upper left')
    plt.grid(True, alpha=0.3)
    
    # Automatically rotate date labels
    plt.gcf().autofmt_xdate()
    
    # Display the chart
    plt.show()
    
    # 检验收盘价是否正态分布
    _, p_value = stats.normaltest(df['收盘'])
    alpha = 0.05
    if p_value < alpha:
        print("收盘价不服从正态分布,进行对数变换")
        data = np.log(df['收盘'])
    else:
        print("收盘价服从正态分布")
        data = df['收盘']
    
    # 绘制直方图叠加正态分布曲线
    plt.figure(figsize=(12, 6))
    n, bins, patches = plt.hist(data, bins=30, density=True, alpha=0.5, label='直方图')
    
    # 拟合正态分布
    mu, std = stats.norm.fit(data)
    xmin, xmax = plt.xlim()
    x = np.linspace(xmin, xmax, 100)
    p = stats.norm.pdf(x, mu, std)
    plt.plot(x, p, 'k', linewidth=2, label='正态分布曲线')
    
    # Add chart elements
    plt.title(f'{stock_code} 股票价格{"对数" if p_value < alpha else ""}收盘价分布')
    plt.xlabel(f'{"对数" if p_value < alpha else ""}股价')
    plt.ylabel('概率密度')
    plt.legend(loc='upper left')
    plt.grid(True, alpha=0.1)
    
    # Automatically rotate date labels
    plt.gcf().autofmt_xdate()
    
    # Display the chart
    plt.show()
    
  • python股票均线系统应用

    通道线、均线交叉、葛兰碧Granville买卖八法标注

    #股票均线系统应用
    import akshare as ak
    import pandas as pd
    import matplotlib.pyplot as plt
    import numpy as np
    
    # 获取股票数据
    def get_stock_data(stock_code, start_date, end_date):
        stock_df = ak.stock_zh_a_hist(symbol=stock_code, period="daily", start_date=start_date, end_date=end_date, adjust="qfq")
        stock_df['日期'] = pd.to_datetime(stock_df['日期'])
        stock_df.set_index('日期', inplace=True)
        return stock_df
    
    # 计算均线
    def calculate_ma(data, ma_periods=[5, 10, 20, 30, 60, 120]):
        for period in ma_periods:
            data[f'MA{period}'] = data['收盘'].rolling(window=period).mean()
        return data
    
    # 识别均线多头排列
    def identify_bullish_arrangement(data, ma_periods=[5, 10, 20, 30, 60, 120]):
        ma_columns = [f'MA{period}' for period in ma_periods]
        data['bullish_arrangement'] = (data[ma_columns].diff(axis=1).iloc[:, 1:] > 0).all(axis=1)
        return data
    
    # 识别均线交叉信号
    def identify_ma_cross(data, short_ma=5, long_ma=20):
        data['ma_cross'] = np.where(data[f'MA{short_ma}'] > data[f'MA{long_ma}'], 1, -1)
        data['ma_cross_signal'] = data['ma_cross'].diff()
        return data
    
    # 计算葛兰碧八大法则买卖点
    def granville_rules(data):
        close = data['收盘']
        ma = data['MA20']
        data['buy_signal'] = 0
        data['sell_signal'] = 0
    
        # 法则 1: 均线从下降逐渐走平且略向上方抬头,而股价从均线下方向上方突破,为买进信号
        data.loc[(ma.shift(1) < ma) & (close > ma) & (close.shift(1) <= ma.shift(1)), 'buy_signal'] = 1
        # 法则 2: 股价位于均线之上运行,回档时未跌破均线后又再度上升时为买进时机
        data.loc[(close > ma) & (close.shift(1) < ma) & (close > close.shift(1)), 'buy_signal'] = 1
        # 法则 3: 股价位于均线之上运行,回档时跌破均线,但短期均线继续呈上升趋势,此时为买进时机
        data.loc[(close < ma) & (close.shift(1) > ma) & (ma.shift(1) < ma), 'buy_signal'] = 1
        # 法则 4: 股价位于均线以下运行,突然暴跌,距离均线太远,极有可能向均线靠近(物极必反,下跌反弹),此时为买进时机
        data.loc[(close < ma) & ((ma - close) / ma > 0.1), 'buy_signal'] = 1
    
        # 法则 5: 股价位于均线之上运行,连续数日大涨,离均线愈来愈远,说明近期内购买股票者获利丰厚,随时都会产生获利回吐的卖压,应暂时卖出持股
        data.loc[(close > ma) & ((close - ma) / ma > 0.1), 'sell_signal'] = 1
        # 法则 6: 均线从上升逐渐走平,而股价从均线上方向下跌破均线时说明卖压渐重,应卖出所持股票
        data.loc[(ma.shift(1) > ma) & (close < ma) & (close.shift(1) >= ma.shift(1)), 'sell_signal'] = 1
        # 法则 7: 股价位于均线下方运行,反弹时未突破均线,且均线跌势减缓,趋于水平后又出现下跌趋势,此时为卖出时机
        data.loc[(close < ma) & (close.shift(1) > ma) & (ma.shift(1) > ma), 'sell_signal'] = 1
        # 法则 8: 股价反弹后在均线上方徘徊,而均线却继续下跌,宜卖出所持股票
        data.loc[(close > ma) & (ma.shift(1) > ma), 'sell_signal'] = 1
    
        return data
    
    # 计算均线通道策略
    def calculate_ma_channel(data, ma_period=20):
        ma = data[f'MA{ma_period}']
        std = data['收盘'].rolling(window=ma_period).std()
        data['upper_channel'] = ma + 2 * std
        data['lower_channel'] = ma - 2 * std
        return data
    
    # 绘制图形
    def plot_data(data, ma_periods, stock_code):  # 添加 stock_code 参数
        plt.figure(figsize=(16, 10))
        plt.plot(data['收盘'], label='Close Price')
        for period in ma_periods:
            plt.plot(data[f'MA{period}'], label=f'MA{period}')
        plt.plot(data['upper_channel'], label='Upper Channel', linestyle='--', color='r')
        plt.plot(data['lower_channel'], label='Lower Channel', linestyle='--', color='g')
    
        # 绘制均线交叉信号
        buy_cross = data[data['ma_cross_signal'] == 2].index
        sell_cross = data[data['ma_cross_signal'] == -2].index
        plt.scatter(buy_cross, data.loc[buy_cross, '收盘'], marker='^', color='g', label='MA Cross Buy Signal')
        plt.scatter(sell_cross, data.loc[sell_cross, '收盘'], marker='v', color='r', label='MA Cross Sell Signal')
    
        # 绘制葛兰碧八大法则买卖点
        buy_points = data[data['buy_signal'] == 1].index
        sell_points = data[data['sell_signal'] == 1].index
        plt.scatter(buy_points, data.loc[buy_points, '收盘'], marker='o', color='b', label='Granville Buy Signal')
        plt.scatter(sell_points, data.loc[sell_points, '收盘'], marker='x', color='m', label='Granville Sell Signal')
    
        plt.title(f'Stock Price with MA and Channel - Stock Code: {stock_code}')  # 修改标题
        plt.xlabel('Date')
        plt.ylabel('Price')
        plt.legend()
        plt.grid(True)
        plt.show()
    
    if __name__ == "__main__":
        stock_code = "600938"  # 贵州茅台
        start_date = "20241201"
        end_date = "20250523"
        ma_periods=[5, 10, 20, 30]
        # 获取数据
        stock_data = get_stock_data(stock_code, start_date, end_date)
    
        # 计算均线
        stock_data = calculate_ma(stock_data)
    
        # 识别均线多头排列
        stock_data = identify_bullish_arrangement(stock_data)
    
        # 识别均线交叉信号
        stock_data = identify_ma_cross(stock_data)
    
        # 计算葛兰碧八大法则买卖点
        stock_data = granville_rules(stock_data)
    
        # 计算均线通道策略
        stock_data = calculate_ma_channel(stock_data)
    
        # 绘制图形
        plot_data(stock_data, ma_periods, stock_code)  # 传递 stock_code 参数
  • A股配置优化:均数-方差模型应用实战

    import akshare as ak
    import pandas as pd
    import numpy as np
    from scipy.optimize import minimize
    
    # 定义获取股票数据的函数
    def get_stock_data(codes, start_date, end_date):
        """
        Fetch historical closing price data for multiple A - share stocks.
    
        Args:
            codes (list): A list containing stock codes, each code is a 6 - digit string.
            start_date (str): The start date of the data in 'YYYYMMDD' format.
            end_date (str): The end date of the data in 'YYYYMMDD' format.
    
        Returns:
            pandas.DataFrame: A DataFrame where each column represents the closing price of a stock,
                              and the index is the trading date.
        """
        # Initialize an empty dictionary to store closing price data for each stock
        data_dict = {}
        # Iterate through each stock code in the list
        for code in codes:
            try:
                # Use the akshare library to get daily historical data for the current stock
                df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=start_date, end_date=end_date)
                # Check if the DataFrame is empty, indicating no data was retrieved
                if df.empty:
                    # Print a warning message if no data is found
                    print(f"Warning: No data found for stock code {code}")
                    # Skip to the next stock code
                    continue
                # Convert the '日期' (date) column to pandas datetime format
                df['日期'] = pd.to_datetime(df['日期'])
                # Set the '日期' (date) column as the index of the DataFrame
                df.set_index('日期', inplace=True)
                # Add the closing price data of the current stock to the dictionary
                data_dict[code] = df['收盘']
            except Exception as e:
                # Print an error message if an exception occurs during data retrieval
                print(f"Error fetching data for stock code {code}: {e}")
        # Convert the dictionary to a DataFrame and return it
        return pd.DataFrame(data_dict)
    
    # 定义计算均值、协方差矩阵和年化收益率的函数
    def calculate_statistics(returns):
        """
        Calculate the annualized mean returns and the annualized covariance matrix of the given returns.
    
        Args:
            returns (pandas.DataFrame): A DataFrame containing the daily returns of multiple assets.
    
        Returns:
            tuple: A tuple containing two elements:
                - mean_returns (pandas.Series): A Series containing the annualized mean returns of each asset.
                - cov_matrix (pandas.DataFrame): A DataFrame representing the annualized covariance matrix of the assets.
        """
        # Calculate the annualized mean returns by multiplying the daily mean returns by 252,
        # assuming there are 252 trading days in a year.
        mean_returns = returns.mean() * 252  # 年化平均收益率
        # Calculate the annualized covariance matrix by multiplying the daily covariance matrix by 252,
        # assuming there are 252 trading days in a year.
        cov_matrix = returns.cov() * 252  # 年化协方差矩阵
        return mean_returns, cov_matrix
    
    # 定义计算组合收益率和风险的函数
    def portfolio_performance(weights, mean_returns, cov_matrix):
        returns = np.sum(mean_returns * weights)
        std = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
        return returns, std
    
    # 定义最小化风险的目标函数
    def minimize_risk(weights, mean_returns, cov_matrix):
        return portfolio_performance(weights, mean_returns, cov_matrix)[1]
    
    # 定义优化函数
    def optimize_portfolio(mean_returns, cov_matrix, num_assets):
        constraints = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})  # 权重之和为 1
        bounds = tuple((0, 1) for asset in range(num_assets))  # 权重范围在 0 到 1 之间
        initial_guess = np.array(num_assets * [1. / num_assets])
        result = minimize(minimize_risk, initial_guess, args=(mean_returns, cov_matrix),
                          method='SLSQP', bounds=bounds, constraints=constraints)
        return result
    
    # 主函数
    def main():
        # 选择要分析的股票代码,去掉 sh 和 sz 前缀
        stock_codes = ['600519', '000858', '601318', '002594']
        start_date = '20240101'
        end_date = '20240523'
    
        # 获取股票数据
        stock_data = get_stock_data(stock_codes, start_date, end_date)
    
        if stock_data.empty:
            print("No valid stock data obtained. Exiting...")
            return
    
        # 计算日收益率,注意每日涨跌率和日收益率的区别!
        # 日收益率 = (当日收盘价 - 前一日收盘价) / 前一日收盘价
        # 涨跌率 = 当日收盘价 / 前一日收盘价 - 1
    
        returns = stock_data.pct_change()
        # # 手动计算日收益率并验证
        # manual_returns = pd.DataFrame(index=returns.index, columns=returns.columns)
        # for code in stock_codes:
        #     for i in range(1, len(stock_data)):
        #         prev_close = stock_data[code].iloc[i - 1]
        #         current_close = stock_data[code].iloc[i]
        #         manual_returns[code].iloc[i] = (current_close - prev_close) / prev_close
        # manual_returns = manual_returns.dropna()
    
        print("收盘价数据:")
        print(stock_data.tail())
        print("自动计算的日收益率(%):")
        print(returns.tail() * 100)
        # print("手动计算的日收益率(%):")
        # print(manual_returns.tail() * 100)
    
        returns = returns.dropna()
    
        # 计算均值和协方差矩阵
        mean_returns, cov_matrix = calculate_statistics(returns)
    
        num_assets = len(stock_codes)
    
        # 进行投资组合优化
        optimal_result = optimize_portfolio(mean_returns, cov_matrix, num_assets)
    
        # 输出最优权重
        optimal_weights = optimal_result.x
        optimal_returns, optimal_std = portfolio_performance(optimal_weights, mean_returns, cov_matrix)
    
        print("最优权重:")
        for i in range(num_assets):
            print(f"{stock_codes[i]}: {optimal_weights[i]:.4f}")
        print(f"最优组合年化收益率: {optimal_returns:.4f}")
        print(f"最优组合年化风险(标准差): {optimal_std:.4f}")
    
    if __name__ == "__main__":
        main()
    

    收盘价数据:
    600519 000858 601318 002594
    日期
    2024-05-17 1715.00 156.41 45.20 219.59
    2024-05-20 1709.00 157.30 45.46 222.87
    2024-05-21 1705.00 156.54 45.40 218.78
    2024-05-22 1697.71 155.12 45.40 216.92
    2024-05-23 1692.01 153.60 44.78 214.95
    自动计算的日收益率(%):
    600519 000858 601318 002594
    日期
    2024-05-17 0.616016 1.531970 4.994193 0.586322
    2024-05-20 -0.349854 0.569017 0.575221 1.493693
    2024-05-21 -0.234055 -0.483153 -0.131984 -1.835151
    2024-05-22 -0.427566 -0.907116 0.000000 -0.850169
    2024-05-23 -0.335746 -0.979887 -1.365639 -0.908169
    最优权重:
    600519: 0.7324
    000858: 0.0000
    601318: 0.2205
    002594: 0.0472
    最优组合年化收益率: 0.1205
    最优组合年化风险(标准差): 0.1801

  • 绘制某只股票的K线图、收益率、计算VaR并绘图

    # -*- coding: utf-8 -*-
    """
    获取某只股票的数据绘制K线图、计算日收益率、绘制收益率直方图、计算VaR
    并绘制VaR图
    Created on Sun Sep 15 13:15:50 2024
    @author: 24384
    """
    
    import matplotlib.pyplot as plt
    import matplotlib.font_manager as fm
    import akshare as ak
    import numpy as np
    import pandas as pd
    import mplfinance as mpf  # 需先 pip install mplfinance
    import seaborn as sns
    import time
    from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
    
    # # 列出系统所有可用字体
    # fonts = fm.findSystemFonts()
    # font_names = [fm.FontProperties(fname=font).get_name() for font in fonts]
    # print("系统可用字体:")
    # for font_name in font_names:
    #     print(font_name)
    
    # 手动选择一个支持中文的字体
    font_name = 'Microsoft YaHei'  # 根据输出结果修改
    try:
        plt.rcParams['font.family'] = font_name
        plt.rcParams['axes.unicode_minus'] = False  # 负号正常显示
        print(f"成功使用字体: {font_name}")
    except Exception as e:
        print(f"加载字体时发生错误: {e},使用默认字体。")
    
    def get_stock_data(stock_code, start_date, end_date, max_retries=3, retry_delay=5):
        """获取股票数据,添加重试机制和错误处理"""
        for attempt in range(max_retries):
            try:
                print(f"尝试获取 {stock_code} 的数据,尝试次数: {attempt + 1}/{max_retries}")
                stock_df = ak.stock_zh_a_hist_tx(
                    symbol=stock_code,
                    start_date=start_date,
                    end_date=end_date,
                    adjust=""
                )
                if stock_df is None or stock_df.empty:
                    print(f"警告:获取到的 {stock_code} 数据为空")
                    return None
                if 'date' in stock_df.columns:
                    stock_df = stock_df.set_index('date')
                    stock_df.index = pd.to_datetime(stock_df.index)
                else:
                    print(f"错误:未找到日期列。可用列名: {stock_df.columns.tolist()}")
                    return None
                return stock_df['close']
            except (ConnectionError, Timeout, TooManyRedirects) as e:
                print(f"网络连接错误: {e}")
                if attempt < max_retries - 1:
                    print(f"将在 {retry_delay} 秒后重试...")
                    time.sleep(retry_delay)
                else:
                    print("达到最大重试次数,获取数据失败")
                    return None
            except Exception as e:
                print(f"发生未知错误: {e}")
                return None
    
    def plot_kline(stock_code, start_date, end_date):
        try:
            stock_df = ak.stock_zh_a_hist_tx(
                symbol=stock_code,
                start_date=start_date,
                end_date=end_date,
                adjust=""
            )
            if stock_df is None or stock_df.empty:
                print("K线数据为空,无法绘制K线图")
                return
            stock_df = stock_df.rename(columns={'date': 'Date', 'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'amount': 'Volume'})
            stock_df['Date'] = pd.to_datetime(stock_df['Date'])
            stock_df.set_index('Date', inplace=True)
            title = f'{stock_code} K线图'
            mpf.plot(stock_df[['Open', 'High', 'Low', 'Close', 'Volume']], type='candle', volume=True, style='yahoo', title=title)
        except RequestException as e:
            print(f"获取数据时发生网络请求错误: {e}")
        except Exception as e:
            print(f"绘制K线图时发生错误: {e}")
    
    
    def monte_carlo_simulations(close_prices, num_simulations=10000, days=1):
        """返回蒙特卡洛模拟的所有收益率数组"""
        returns = close_prices.pct_change().dropna()
        mean_return = returns.mean()
        std_return = returns.std()
        simulations = np.zeros(num_simulations)
        for i in range(num_simulations):
            daily_returns = np.random.normal(mean_return, std_return, days)
            final_return = np.prod(1 + daily_returns) - 1
            simulations[i] = final_return
        return simulations
    
    def monte_carlo_var(simulations, confidence_level=0.95):
        """根据模拟结果计算VaR"""
        var = np.percentile(simulations, 100 * (1 - confidence_level))
        return var
    
    def plot_var_results(simulations, var_value, confidence_level=0.95, days=1):
        """绘制蒙特卡洛模拟收益率分布和VaR"""
        plt.figure(figsize=(10, 6))
        sns.histplot(simulations, bins=30, kde=True, stat="density", color='skyblue')
        plt.axvline(x=var_value, color='red', linestyle='--',
                    label=f"{int(confidence_level*100)}% VaR: {var_value:.2%}")
        plt.title(f"蒙特卡洛模拟股票收益率分布与VaR({days}天)")
        plt.xlabel("收益率")
        plt.ylabel("密度")
        plt.legend()
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()
    
    def plot_price_history(stock_prices):
        plt.figure(figsize=(10, 4))
        sns.lineplot(data=stock_prices)
        plt.title("历史股票价格走势")
        plt.xlabel("日期")
        plt.ylabel("价格")
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()
    
    if __name__ == "__main__":
        # 参数设置
        stock_code = 'sh600938'  # 中国海油
        start_date = '20240101'
        end_date = '20250521'
        num_simulations = 10000
        days = 10
        confidence_level = 0.95
    
        # 获取股票数据
        close_prices = get_stock_data(stock_code, start_date, end_date)
        if close_prices is None:
            print("无法获取股票数据,程序退出")
            exit()
    
        # 绘制历史价格
        plot_price_history(close_prices)
    
        # 蒙特卡洛模拟
        simulations = monte_carlo_simulations(close_prices, num_simulations, days)
        var = monte_carlo_var(simulations, confidence_level)
    
        print(f"单只股票 {stock_code} 在 {confidence_level * 100:.0f}% 置信水平下,{days} 天的 VaR 为: {var:.2%}")
    
        # 绘制模拟结果与VaR
        plot_var_results(simulations, var, confidence_level, days)
        # 绘制K线图
        plot_kline(stock_code, start_date, end_date)
    
  • python单只股票蒙特卡洛风险计算及绘图方法

    import akshare as ak
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import time
    
    # 字体设置
    plt.rcParams["font.family"] = ["simkai.ttf", "Microsoft YaHei", "SimHei"]
    try:
        import matplotlib.font_manager as fm
        font = fm.FontProperties(fname=r"C:\Windows\Fonts\msyh.ttc")
        plt.rcParams["font.family"] = font.get_name()
    except Exception:
        plt.rcParams["font.family"] = ["SimHei", "simkai.ttf"]
    
    from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
    
    def get_stock_data(stock_code, start_date, end_date, max_retries=3, retry_delay=5):
        """获取股票数据,添加重试机制和错误处理"""
        for attempt in range(max_retries):
            try:
                print(f"尝试获取 {stock_code} 的数据,尝试次数: {attempt + 1}/{max_retries}")
                stock_df = ak.stock_zh_a_hist_tx(
                    symbol=stock_code,
                    start_date=start_date,
                    end_date=end_date,
                    adjust=""
                )
                if stock_df is None or stock_df.empty:
                    print(f"警告:获取到的 {stock_code} 数据为空")
                    return None
                if 'date' in stock_df.columns:
                    stock_df = stock_df.set_index('date')
                    stock_df.index = pd.to_datetime(stock_df.index)
                else:
                    print(f"错误:未找到日期列。可用列名: {stock_df.columns.tolist()}")
                    return None
                return stock_df['close']
            except (ConnectionError, Timeout, TooManyRedirects) as e:
                print(f"网络连接错误: {e}")
                if attempt < max_retries - 1:
                    print(f"将在 {retry_delay} 秒后重试...")
                    time.sleep(retry_delay)
                else:
                    print("达到最大重试次数,获取数据失败")
                    return None
            except Exception as e:
                print(f"发生未知错误: {e}")
                return None
    
    def monte_carlo_simulations(close_prices, num_simulations=10000, days=1):
        """返回蒙特卡洛模拟的所有收益率数组"""
        returns = close_prices.pct_change().dropna()
        mean_return = returns.mean()
        std_return = returns.std()
        simulations = np.zeros(num_simulations)
        for i in range(num_simulations):
            daily_returns = np.random.normal(mean_return, std_return, days)
            final_return = np.prod(1 + daily_returns) - 1
            simulations[i] = final_return
        return simulations
    
    def monte_carlo_var(simulations, confidence_level=0.95):
        """根据模拟结果计算VaR"""
        var = np.percentile(simulations, 100 * (1 - confidence_level))
        return var
    
    def plot_var_results(simulations, var_value, confidence_level=0.95, days=1):
        """绘制蒙特卡洛模拟收益率分布和VaR"""
        plt.figure(figsize=(10, 6))
        sns.histplot(simulations, bins=30, kde=True, stat="density", color='skyblue')
        plt.axvline(x=var_value, color='red', linestyle='--',
                    label=f"{int(confidence_level*100)}% VaR: {var_value:.2%}")
        plt.title(f"蒙特卡洛模拟股票收益率分布与VaR({days}天)")
        plt.xlabel("收益率")
        plt.ylabel("密度")
        plt.legend()
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()
    
    def plot_price_history(stock_prices):
        """绘制历史价格走势"""
        plt.figure(figsize=(10, 4))
        sns.lineplot(data=stock_prices)
        plt.title("历史股票价格走势")
        plt.xlabel("日期")
        plt.ylabel("价格")
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()
    
    if __name__ == "__main__":
        # 参数设置
        stock_code = 'sh600938'  # 中国海油
        start_date = '20240101'
        end_date = '20250521'
        num_simulations = 1000
        days = 1
        confidence_level = 0.95
    
        # 获取股票数据
        close_prices = get_stock_data(stock_code, start_date, end_date)
        if close_prices is None:
            print("无法获取股票数据,程序退出")
            exit()
    
        # 绘制历史价格
        plot_price_history(close_prices)
    
        # 蒙特卡洛模拟
        simulations = monte_carlo_simulations(close_prices, num_simulations, days)
        var = monte_carlo_var(simulations, confidence_level)
    
        print(f"单只股票 {stock_code} 在 {confidence_level * 100:.0f}% 置信水平下,{days} 天的 VaR 为: {var:.2%}")
    
        # 绘制模拟结果与VaR
        plot_var_results(simulations, var, confidence_level, days)
    
    
  • python代码绘制某只股票的K线图、均线、量能图

    import akshare as ak
    import matplotlib.pyplot as plt
    import pandas as pd
    import mplfinance as mpf
    import matplotlib.dates as mdates
    import numpy as np
    import os
    from datetime import datetime, timedelta
    
    # 设置中文字体
    plt.rcParams["font.family"] = ["SimHei",  "Microsoft YaHei"]
    plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题
    
    def get_stock_data(stock_code, start_date=None, end_date=None, adjust="qfq"):
        """
        使用 AkShare 获取股票数据
    
        参数:
        stock_code (str): 股票代码,如 'sh000001' 或 '000001.SZ'
        start_date (str): 开始日期,格式 'YYYYMMDD',默认为 3 个月前
        end_date (str): 结束日期,格式 'YYYYMMDD',默认为今天
        adjust (str): 复权类型,'qfq' 为前复权,'hfq' 为后复权,None 为不复权
    
        返回:
        DataFrame: 包含股票数据的 DataFrame
        """
        # 处理默认日期
        if end_date is None:
            end_date = datetime.now().strftime('%Y%m%d')
    
        if start_date is None:
            start_date = (datetime.now() - timedelta(days=90)).strftime('%Y%m%d')
    
        # 格式化股票代码以适应 AkShare
        if not stock_code.startswith(('sh', 'sz')):
            market = 'sh' if stock_code.startswith(('6', '9')) else 'sz'
            stock_code = f'{market}{stock_code}'
    
        try:
            # 使用 AkShare 获取股票日线数据
            stock_data = ak.stock_zh_a_hist_tx(symbol=stock_code,
                                          start_date=start_date, end_date=end_date,
                                          adjust=adjust)
    
            # 重命名列以符合 mplfinance 要求
            # 检查并映射成交量列名
            volume_columns = ['成交量', '成交额', 'volume','amount']
            volume_col = next((col for col in volume_columns if col in stock_data.columns), None)
    
            if volume_col is None:
                print("警告: 数据中未找到成交量列,图表可能不完整")
                stock_data['volume'] = 0  # 添加默认成交量列
            else:
                stock_data = stock_data.rename(columns={volume_col: 'volume'})
    
            # 重命名其他必要列
            stock_data = stock_data.rename(columns={
                '日期': 'date', '开盘': 'open', '收盘': 'close',
                '最高': 'high', '最低': 'low'
            })
    
            # 转换日期格式
            stock_data['date'] = pd.to_datetime(stock_data['date'])
    
            # 确保数据按日期排序
            stock_data = stock_data.sort_values('date')
    
            # 检查是否有必要的列
            required_columns = ['date', 'open', 'high', 'low', 'close', 'volume']
            missing_columns = [col for col in required_columns if col not in stock_data.columns]
    
            if missing_columns:
                print(f"错误: 数据缺少必要的列: {', '.join(missing_columns)}")
                return None
    
            return stock_data
    
        except Exception as e:
            print(f"获取股票数据时出错: {e}")
            return None
    
    def plot_stock_daily_movement(stock_data, stock_name="股票", save_path=None, ma_periods=None):
        """
        绘制股票日变动情况,包括 K 线图和成交量及均线
    
        参数:
        stock_data (DataFrame): 包含股票数据的 DataFrame,必须包含 'date', 'open', 'high', 'low', 'close', 'volume' 列
        stock_name (str): 股票名称,用于图表标题
        save_path (str): 图表保存路径,默认为 None 不保存
        ma_periods (list): 均线周期列表,默认为 [5, 10, 20]
        """
        if stock_data is None or stock_data.empty:
            print("没有数据可绘制图表")
            return
    
        # 设置默认均线周期
        if ma_periods is None:
            ma_periods = [5, 10, 20]
    
        # 复制数据并准备 mplfinance 所需格式
        plot_data = stock_data.copy()
        plot_data = plot_data.set_index('date')
    
        # 添加均线
        for period in ma_periods:
            column_name = f'MA{period}'
            plot_data[column_name] = plot_data['close'].rolling(window=period).mean()
    
        # 设置 mplfinance 样式
        mc = mpf.make_marketcolors(up='r', down='g', inherit=True)
    
        # 自定义均线颜色
        ma_colors = ['blue', 'purple', 'orange', 'green', 'red', 'brown', 'gray']
        ma_colors = ma_colors[:len(ma_periods)]  # 确保颜色数量与均线周期数量匹配
    
        s = mpf.make_mpf_style(
            marketcolors=mc,
            gridstyle='--',
            y_on_right=False,
            rc={
                'font.family': ['SimHei', 'WenQuanYi Micro Hei', 'Heiti TC', 'Microsoft YaHei'],
                'lines.linewidth': 1.5  # 增加均线线条宽度
            }
        )
    
        # 绘制 K 线图和成交量
        fig, axes = mpf.plot(
            plot_data,
            type='candle',
            style=s,
            title=f'{stock_name} 日变动情况',
            ylabel='价格 (元)',
            volume=True,
            ylabel_lower='成交量 (手)',
            mav=tuple(ma_periods),  # 传递均线周期
            returnfig=True,
            figsize=(14, 10),
            update_width_config=dict(
                candle_linewidth=1.2,  # K线实体边框宽度
                candle_width=0.6,      # K线宽度
                volume_width=0.8,      # 成交量柱宽度
            )
        )
    
        # 调整标题位置
        fig.suptitle(f'{stock_name} 日变动情况', fontsize=16, y=0.98)
    
        # 添加均线图例
        ax = axes[0]  # K线图的轴
        for i, period in enumerate(ma_periods):
            ax.plot([], [], color=ma_colors[i], label=f'MA{period}', linewidth=1.5)
    
        ax.legend(loc='upper left')
    
        # 如果指定了保存路径,则保存图表
        if save_path:
            save_dir = os.path.dirname(save_path)
            if not os.path.exists(save_dir):
                os.makedirs(save_dir)
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"图表已保存至: {save_path}")
    
        plt.show()
    
    # 主函数
    if __name__ == "__main__":
        # 用户输入股票代码和名称
        stock_code = input("请输入股票代码(例如 000001 或 sh000001): ").strip()
        stock_name = input("请输入股票名称(例如 平安银行): ").strip()
    
        # 获取股票数据
        print("正在获取股票数据...")
        stock_data = get_stock_data(stock_code)
    
        if stock_data is not None and not stock_data.empty:
            print(f"成功获取 {len(stock_data)} 天的股票数据")
    
            # 显示数据列名,用于调试
            print(f"数据列名: {', '.join(stock_data.columns)}")
    
            # 可选:自定义均线周期
            ma_input = input("请输入均线周期(用逗号分隔,默认 5,10,20): ").strip()
            if ma_input:
                try:
                    ma_periods = [int(p.strip()) for p in ma_input.split(',')]
                except ValueError:
                    print("无效的均线周期,使用默认值")
                    ma_periods = [5, 10, 20]
            else:
                ma_periods = [5, 10, 20]
    
            # 绘制股票日变动情况
            plot_stock_daily_movement(stock_data, stock_name, ma_periods=ma_periods)
        else:
            print("未能获取股票数据,请检查股票代码是否正确")    
    000937股票最近90天K线图