分类: 股票

  • 股票分析长短期时间序列模型LSTMPython代码

    我需要查看stock_forcast.py文件的完整内容,了解其中LSTM模型的实现和使用情况,然后再对其进行解释。

    filePath: c:\Users\czliu\Documents\python\stock_forcast.py

    现在我已经查看了完整的stock_forcast.py文件,我将对其中的LSTM模型实现进行详细解释,包括模型定义、结构、参数、数据准备和训练过程等方面。# LSTM模型在股票预测程序中的实现分析

    1. LSTM模型定义

    stock_forcast.py文件中,LSTM模型通过LSTMStockPredictor类实现:

    class LSTMStockPredictor(nn.Module):
        def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1, dropout=0.2):
            super(LSTMStockPredictor, self).__init__()
            self.hidden_size = hidden_size
            self.num_layers = num_layers
    
            # LSTM层
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
    
            # 全连接层
            self.fc = nn.Linear(hidden_size, output_size)
    
            # Sigmoid激活函数,用于分类
            self.sigmoid = nn.Sigmoid()
    
        def forward(self, x):
            # 初始化隐藏状态和细胞状态
            h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
            c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
    
            # 前向传播LSTM
            out, _ = self.lstm(x, (h0, c0))
    
            # 取最后一个时间步的输出
            out = self.fc(out[:, -1, :])
    
            # 使用sigmoid激活函数得到概率
            out = self.sigmoid(out)
    
            return out

    2. 模型结构详解

    2.1 模型参数

    • input_size=1:输入特征维度,这里只使用股票收盘价作为输入
    • hidden_size=64:LSTM隐藏层大小,决定模型的学习能力
    • num_layers=2:LSTM堆叠层数,增加模型深度
    • output_size=1:输出维度,这里输出一个概率值
    • dropout=0.2: dropout比率,防止过拟合

    2.2 网络结构

    1. LSTM层:使用PyTorch内置的nn.LSTM实现,采用批量优先(batch_first=True)的输入格式
    2. 全连接层:将LSTM的输出映射到最终的预测值
    3. Sigmoid激活函数:将输出转换为0-1之间的概率值,表示上涨的可能性

    2.3 前向传播过程

    1. 初始化隐藏状态(h0)和细胞状态(c0)为全零张量
    2. 将输入数据通过LSTM层,得到输出序列和最终的隐藏状态
    3. 只取LSTM输出序列的最后一个时间步的输出,因为我们只需要预测下一天的趋势
    4. 将LSTM的输出通过全连接层和Sigmoid激活函数,得到最终的预测概率

    3. 数据准备与预处理

    模型使用prepare_data_for_lstm函数处理数据:

    def prepare_data_for_lstm(data, look_back=60, test_size=0.2):
        # 使用收盘价作为特征
        close_prices = data['Close'].values.reshape(-1, 1)
    
        # 数据归一化
        scaler = MinMaxScaler(feature_range=(0, 1))
        scaled_data = scaler.fit_transform(close_prices)
    
        # 创建数据集
        X, y = [], []
        for i in range(len(scaled_data) - look_back):
            X.append(scaled_data[i:(i + look_back), 0])
            # 预测下一天是涨(1)还是跌(0)
            y.append(1 if scaled_data[i + look_back, 0] > scaled_data[i + look_back - 1, 0] else 0)
    
        X, y = np.array(X), np.array(y)
    
        # 转换为LSTM需要的形状 [样本数, 时间步, 特征数]
        X = np.reshape(X, (X.shape[0], X.shape[1], 1))
    
        # 划分训练集和测试集
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, shuffle=False)
    
        # 转换为PyTorch张量
        X_train = torch.tensor(X_train, dtype=torch.float32)
        X_test = torch.tensor(X_test, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
        y_test = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
    
        return X_train, X_test, y_train, y_test, scaler, len(X_train)

    关键处理步骤:

    1. 使用收盘价作为特征:从股票数据中提取收盘价
    2. 数据归一化:使用MinMaxScaler将数据缩放到0-1之间,提高模型训练稳定性
    3. 构建时间序列数据集
    • 使用look_back=60个连续的收盘价作为输入
    • 预测第61天的涨跌情况(1表示涨,0表示跌)
    1. 调整数据形状:转换为LSTM需要的三维格式 [样本数, 时间步, 特征数]
    2. 划分训练集和测试集:按时间顺序划分,保持时间序列特性
    3. 转换为PyTorch张量:适配PyTorch框架的输入格式

    4. 模型训练过程

    模型训练通过train_lstm_model函数实现:

    def train_lstm_model(model, X_train, y_train, X_test, y_test, epochs=100, batch_size=32, learning_rate=0.001):
        # 定义损失函数和优化器
        criterion = nn.BCELoss()  # 二元交叉熵损失
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
        # 创建DataLoader
        train_dataset = TensorDataset(X_train, y_train)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
        # 训练模型
        train_losses = []
        test_losses = []
    
        for epoch in range(epochs):
            model.train()
            train_loss = 0.0
    
            for batch_X, batch_y in train_loader:
                # 前向传播
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
    
                # 反向传播和优化
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
    
                train_loss += loss.item()
    
            # 计算平均训练损失
            train_loss /= len(train_loader)
            train_losses.append(train_loss)
    
            # 在测试集上评估
            model.eval()
            with torch.no_grad():
                test_outputs = model(X_test)
                test_loss = criterion(test_outputs, y_test).item()
                test_losses.append(test_loss)
    
            # 打印训练进度
            if (epoch + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')
    
        return model, train_losses, test_losses

    训练关键设置:

    1. 损失函数:使用二元交叉熵损失(BCELoss),适合二分类问题
    2. 优化器:使用Adam优化器,学习率0.001
    3. 批次训练:使用batch_size=32进行小批量训练
    4. 训练轮次:训练100个epoch
    5. 训练监控:每10个epoch打印一次训练损失和测试损失

    5. 模型预测与评估

    5.1 预测过程

    def predict_stock_trend(model, X_test, y_test):
        model.eval()
        with torch.no_grad():
            predictions = model(X_test)
            # 转换为0/1分类
            predicted_classes = (predictions > 0.5).float()
    
            # 计算准确率
            accuracy = (predicted_classes == y_test).float().mean()
            print(f'模型准确率: {accuracy:.4f}')
    
        return predictions, predicted_classes, accuracy

    预测步骤:

    1. 将模型设置为评估模式(model.eval())
    2. 关闭梯度计算(with torch.no_grad())以提高效率
    3. 使用模型进行预测,得到每个样本的上涨概率
    4. 将概率转换为0/1分类结果(概率>0.5为涨,否则为跌)
    5. 计算模型在测试集上的准确率

    5.2 结果可视化

    程序提供了plot_prediction_results函数,用于可视化预测结果,包括:

    • 股票实际收盘价和测试集分割点
    • 模型预测的涨跌趋势

    6. LSTM模型的应用特点

    1. 时间序列建模:LSTM特别适合处理时间序列数据,能够捕捉股票价格的时序特征
    2. 长期依赖捕获:通过门控机制,LSTM能够记住长期的价格模式
    3. 二分类任务:这里LSTM被用于预测股票涨跌的二分类任务,而非直接预测价格
    4. 滑动窗口输入:使用60天的历史数据预测下一天的趋势,符合股票分析的常用方法

    7. 程序中的LSTM工作流程

    1. 数据获取:从akshare获取股票历史数据
    2. 数据预处理:计算技术指标,准备LSTM输入数据
    3. 模型初始化:创建LSTM模型实例
    4. 模型训练:使用训练集训练LSTM模型
    5. 模型评估:在测试集上评估模型性能
    6. 结果可视化:绘制价格走势图和预测结果图

    这个LSTM模型实现了一个基础的股票趋势预测系统,通过深度学习的方法尝试捕捉股票价格的短期波动模式。

    开始预测 中国海油 600938

    请输入股票代码(如 000001 或 600000):600938
    开始训练LSTM模型…
    Epoch [10/100], Train Loss: 0.6936, Test Loss: 0.6958
    Epoch [20/100], Train Loss: 0.6930, Test Loss: 0.6958
    Epoch [30/100], Train Loss: 0.6936, Test Loss: 0.6907
    Epoch [40/100], Train Loss: 0.6924, Test Loss: 0.6894
    Epoch [50/100], Train Loss: 0.6922, Test Loss: 0.6881
    Epoch [60/100], Train Loss: 0.6915, Test Loss: 0.6836
    Epoch [70/100], Train Loss: 0.6917, Test Loss: 0.6800
    Epoch [80/100], Train Loss: 0.6924, Test Loss: 0.6816
    Epoch [90/100], Train Loss: 0.6882, Test Loss: 0.7051
    Epoch [100/100], Train Loss: 0.6904, Test Loss: 0.6809
    模型准确率: 0.5676

    完整代码:

              
    # 程序用途及神经网络预测结果解释
    
    ## 一、程序主要用途
    
    该Python程序是一个综合的股票分析工具,主要功能包括:
    
    1. **股票数据获取**:使用akshare库从公开数据源获取A股股票的历史交易数据(默认获取近一年的日线数据)。
    
    2. **技术指标计算**:
       - 计算指数移动平均线(EMA,默认20日)
       - 计算价格波动标准差(STD,默认20日)
    
    3. **技术分析可视化**:
       - 绘制股票收盘价与EMA指标对比图
       - 绘制股票价格波动性(STD)走势图
    
    4. **LSTM神经网络预测**:
       - 使用长短期记忆网络(LSTM)预测股票次日涨跌趋势
       - 训练模型并评估预测准确率
       - 可视化预测结果与实际价格走势对比
    
    ## 二、神经网络预测结果解释
    
    ### 1. 训练过程指标
    - **Epoch**:训练轮次,程序默认训练100轮
    - **Train Loss**:训练集损失,反映模型在训练数据上的拟合程度
    - **Test Loss**:测试集损失,反映模型在未见过数据上的泛化能力
    
    从您提供的训练日志看:
    ```
    Epoch [90/100], Train Loss: 0.6398, Test Loss: 0.8248 
    Epoch [100/100], Train Loss: 0.6300, Test Loss: 0.9487
    ```
    训练损失逐渐下降,但测试损失在后期有所上升,说明模型可能出现了一定程度的过拟合(在训练数据上表现好,但对新数据预测能力下降)。
    
    ### 2. 模型准确率
    ```
    模型准确率: 0.5135
    ```
    准确率约为51.35%,表示模型预测正确的比例。这个准确率略高于随机猜测(50%),说明模型有一定的预测能力,但效果并不理想。
    
    ### 3. 预测结果可视化
    - **上半部分图表**:显示股票实际收盘价走势,并标注测试集开始位置
    - **下半部分图表**:显示模型对测试集的预测结果(0表示跌,1表示涨)
    
    ### 4. 模型局限性与改进方向
    - **数据量限制**:仅使用了近一年的日线数据,可考虑增加更长时间周期的数据
    - **特征单一**:目前仅使用收盘价作为特征,可加入成交量、换手率等更多指标
    - **模型结构**:可尝试调整LSTM层数、隐藏单元数量或添加更多网络层
    - **训练参数**:可调整学习率、批量大小、训练轮次等参数
    - **过拟合问题**:可添加正则化技术(如L2正则化、Dropout)或使用早停法
    
    ## 三、使用建议
    
    1. **数据选择**:建议选择交易活跃、数据质量好的股票进行分析
    2. **参数调整**:根据不同股票特性调整技术指标参数(如EMA窗口大小)和LSTM模型参数
    3. **结果解读**:股票价格受多种因素影响,模型预测仅供参考,不应作为投资决策的唯一依据
    4. **持续优化**:可根据实际预测效果持续调整模型结构和参数
    
    该程序提供了一个完整的股票分析框架,您可以在此基础上根据自己的需求进一步扩展功能或优化模型。
    
     ```{python}
     import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    # 设置中文显示
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "KaiTi", "FangSong"]
    plt.rcParams["axes.unicode_minus"] = False
    import akshare as ak
    from datetime import datetime, timedelta
    import torch
    import torch.nn as nn
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.model_selection import train_test_split
    from torch.utils.data import DataLoader, TensorDataset
    
    
    def fetch_stock_data(symbol, period='1y'):
        """获取股票数据"""
        end_date = datetime.now().strftime('%Y%m%d')
        start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
        
        # 处理股票代码格式,确保符合akshare要求
        if not symbol.startswith(('sh', 'sz')):
            if symbol.startswith(('6', '9')):
                symbol = f"sh{symbol}"
            else:
                symbol = f"sz{symbol}"
        
        # 使用更稳定的stock_zh_a_hist函数获取股票日线数据
        data = ak.stock_zh_a_hist(symbol=symbol[2:],  # 只传股票代码部分,不含sh/sz
                                 period="daily",
                                 start_date=start_date,
                                 end_date=end_date,
                                 adjust="qfq")  # 使用前复权价格
        
        # 检查数据获取是否成功
        if data.empty:
            print(f"获取股票{symbol}数据失败,请检查股票代码和网络连接")
            return None
        
        # 先转换日期格式
        if '日期' in data.columns:
            data['日期'] = pd.to_datetime(data['日期'])
            # 设置日期为索引
            data.set_index('日期', inplace=True)
            # 重命名收盘价列
            if '收盘' in data.columns:
                data = data.rename(columns={'收盘': 'Close'})
        else:
            print("数据格式异常,无法处理")
            return None
        
        return data
    
    
    def calculate_ema(data, window=20):
        """计算指数移动平均线 EMA"""
        ema = data['Close'].ewm(span=window, adjust=False).mean()
        return ema
    
    
    def calculate_std(data, window=20):
        """计算标准差 STD"""
        std = data['Close'].rolling(window=window).std()
        return std
    
    
    def plot_analysis(data, ema, std, symbol):
        """绘制图表分析结果"""
        plt.figure(figsize=(14, 8))
    
        plt.subplot(2, 1, 1)
        plt.plot(data['Close'], label='收盘价', color='blue')
        plt.plot(ema, label='EMA(20)', color='orange')
        plt.title(f'{symbol} 股票价格与EMA分析')
        plt.legend()
    
        plt.subplot(2, 1, 2)
        plt.plot(std, label='STD(20)', color='red')
        plt.title(f'{symbol} 股票价格波动性(STD)分析')
        plt.legend()
    
        plt.tight_layout()
        plt.show()
    
    
    # LSTM模型定义
    class LSTMStockPredictor(nn.Module):
        def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1, dropout=0.2):
            super(LSTMStockPredictor, self).__init__()
            self.hidden_size = hidden_size
            self.num_layers = num_layers
            
            # LSTM层
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
            
            # 全连接层
            self.fc = nn.Linear(hidden_size, output_size)
            
            # Sigmoid激活函数,用于分类
            self.sigmoid = nn.Sigmoid()
        
        def forward(self, x):
            # 初始化隐藏状态和细胞状态
            h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
            c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
            
            # 前向传播LSTM
            out, _ = self.lstm(x, (h0, c0))
            
            # 取最后一个时间步的输出
            out = self.fc(out[:, -1, :])
            
            # 使用sigmoid激活函数得到概率
            out = self.sigmoid(out)
            
            return out
    
    
    def prepare_data_for_lstm(data, look_back=60, test_size=0.2):
        """准备LSTM模型的数据"""
        # 使用收盘价作为特征
        close_prices = data['Close'].values.reshape(-1, 1)
        
        # 数据归一化
        scaler = MinMaxScaler(feature_range=(0, 1))
        scaled_data = scaler.fit_transform(close_prices)
        
        # 创建数据集
        X, y = [], []
        for i in range(len(scaled_data) - look_back):
            X.append(scaled_data[i:(i + look_back), 0])
            # 预测下一天是涨(1)还是跌(0)
            y.append(1 if scaled_data[i + look_back, 0] > scaled_data[i + look_back - 1, 0] else 0)
        
        X, y = np.array(X), np.array(y)
        
        # 转换为LSTM需要的形状 [样本数, 时间步, 特征数]
        X = np.reshape(X, (X.shape[0], X.shape[1], 1))
        
        # 划分训练集和测试集
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, shuffle=False)
        
        # 转换为PyTorch张量
        X_train = torch.tensor(X_train, dtype=torch.float32)
        X_test = torch.tensor(X_test, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
        y_test = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
        
        return X_train, X_test, y_train, y_test, scaler, len(X_train)
    
    
    def train_lstm_model(model, X_train, y_train, X_test, y_test, epochs=100, batch_size=32, learning_rate=0.001):
        """训练LSTM模型"""
        # 定义损失函数和优化器
        criterion = nn.BCELoss()  # 二元交叉熵损失
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        
        # 创建DataLoader
        train_dataset = TensorDataset(X_train, y_train)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        
        # 训练模型
        train_losses = []
        test_losses = []
        
        for epoch in range(epochs):
            model.train()
            train_loss = 0.0
            
            for batch_X, batch_y in train_loader:
                # 前向传播
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                
                # 反向传播和优化
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
            
            # 计算平均训练损失
            train_loss /= len(train_loader)
            train_losses.append(train_loss)
            
            # 在测试集上评估
            model.eval()
            with torch.no_grad():
                test_outputs = model(X_test)
                test_loss = criterion(test_outputs, y_test).item()
                test_losses.append(test_loss)
            
            # 打印训练进度
            if (epoch + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')
        
        return model, train_losses, test_losses
    
    
    def predict_stock_trend(model, X_test, y_test):
        """使用训练好的模型预测股票趋势"""
        model.eval()
        with torch.no_grad():
            predictions = model(X_test)
            # 转换为0/1分类
            predicted_classes = (predictions > 0.5).float()
            
            # 计算准确率
            accuracy = (predicted_classes == y_test).float().mean()
            print(f'模型准确率: {accuracy:.4f}')
        
        return predictions, predicted_classes, accuracy
    
    
    def plot_prediction_results(data, predicted_classes, look_back, n_train):
        """绘制预测结果"""
        plt.figure(figsize=(14, 8))
        
        # 获取测试集对应的原始数据
        test_start_index = n_train + look_back
        test_data = data.iloc[test_start_index:]
        
        # 创建预测结果的DataFrame
        prediction_df = pd.DataFrame(index=test_data.index)
        prediction_df['Actual'] = data['Close'].iloc[test_start_index:]
        prediction_df['Predicted_Trend'] = predicted_classes.numpy().flatten()
        
        # 绘制收盘价和预测趋势
        plt.subplot(2, 1, 1)
        plt.plot(data['Close'], label='实际收盘价', color='blue')
        if not test_data.empty:
            plt.axvline(x=test_data.index[0], color='red', linestyle='--', label='测试集开始')
        plt.title('股票价格与测试集分割')
        plt.legend()
        
        # 绘制预测趋势
        plt.subplot(2, 1, 2)
        if not test_data.empty:
            plt.plot(test_data.index, prediction_df['Predicted_Trend'], label='预测涨跌', color='green', marker='o', linestyle='')
        plt.title('LSTM模型预测股票涨跌')
        plt.yticks([0, 1], ['跌', '涨'])
        plt.legend()
        
        plt.tight_layout()
        plt.show()
    
    
    def main():
        symbol = input("请输入股票代码(如 000001 或 600000):")
        data = fetch_stock_data(symbol)
        
        # 检查数据获取是否成功
        if data is not None:
            # 计算技术指标
            ema = calculate_ema(data)
            std = calculate_std(data)
            
            # 绘制技术分析图表
            plot_analysis(data, ema, std, symbol)
            
            # 准备LSTM模型数据
            look_back = 60
            test_size = 0.2
            X_train, X_test, y_train, y_test, scaler, n_train = prepare_data_for_lstm(data, look_back, test_size)
            
            # 初始化模型
            model = LSTMStockPredictor(input_size=1, hidden_size=64, num_layers=2, output_size=1, dropout=0.2)
            
            # 训练模型
            print("开始训练LSTM模型...")
            model, train_losses, test_losses = train_lstm_model(model, X_train, y_train, X_test, y_test, epochs=100, batch_size=32, learning_rate=0.001)
            
            # 预测股票趋势
            predictions, predicted_classes, accuracy = predict_stock_trend(model, X_test, y_test)
            
            # 绘制预测结果
            plot_prediction_results(data, predicted_classes, look_back, n_train)
            
            # 绘制损失曲线
            plt.figure(figsize=(10, 6))
            plt.plot(range(1, len(train_losses) + 1), train_losses, label='训练损失')
            plt.plot(range(1, len(test_losses) + 1), test_losses, label='测试损失')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.title('LSTM模型训练损失曲线')
            plt.legend()
            plt.show()
    
    
    if __name__ == '__main__':
        main()
    
     ```
    
    
    # 问豆包:Python股票分析,akshare获取数据,可视化长度傲气记忆网络(LSTM),神经模型采用pytorch。预测股票涨跌​
  • 某只股票波动及ema指标分析Stock analysis

    
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    # 设置中文显示
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "KaiTi", "FangSong"]
    plt.rcParams["axes.unicode_minus"] = False
    import akshare as ak
    from datetime import datetime, timedelta
    
    
    def fetch_stock_data(symbol, period='1y'):
        """获取股票数据"""
        end_date = datetime.now().strftime('%Y%m%d')
        start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
        
        # 处理股票代码格式,确保符合akshare要求
        if not symbol.startswith(('sh', 'sz')):
            if symbol.startswith(('6', '9')):
                symbol = f"sh{symbol}"
            else:
                symbol = f"sz{symbol}"
        
        # 使用更稳定的stock_zh_a_hist函数获取股票日线数据
        data = ak.stock_zh_a_hist(symbol=symbol[2:],  # 只传股票代码部分,不含sh/sz
                                 period="daily",
                                 start_date=start_date,
                                 end_date=end_date,
                                 adjust="qfq")  # 使用前复权价格
        
        # 检查数据是否获取成功
        if data.empty:
            print(f"获取股票{symbol}数据失败,请检查股票代码和网络连接")
            return None
        
        # 先转换日期格式
        if '日期' in data.columns:
            data['日期'] = pd.to_datetime(data['日期'])
            # 设置日期为索引
            data.set_index('日期', inplace=True)
            # 重命名收盘价列
            if '收盘' in data.columns:
                data = data.rename(columns={'收盘': 'Close'})
        else:
            print("数据格式异常,无法处理")
            return None
        
        return data
    
    
    def calculate_ema(data, window=20):
        """计算指数移动平均线 EMA"""
        ema = data['Close'].ewm(span=window, adjust=False).mean()
        return ema
    
    
    def calculate_std(data, window=20):
        """计算标准差 STD"""
        std = data['Close'].rolling(window=window).std()
        return std
    
    
    def plot_analysis(data, ema, std, symbol):
        """绘制图表分析结果"""
        plt.figure(figsize=(14, 8))
    
        plt.subplot(2, 1, 1)
        plt.plot(data['Close'], label='收盘价', color='blue')
        plt.plot(ema, label='EMA(20)', color='orange')
        plt.title(f'{symbol} 股票价格与EMA分析')
        plt.legend()
    
        plt.subplot(2, 1, 2)
        plt.plot(std, label='STD(20)', color='red')
        plt.title(f'{symbol} 股票价格波动性(STD)分析')
        plt.legend()
    
        plt.tight_layout()
        plt.show()
    
    
    def main():
        symbol = input("请输入股票代码(如 000001 或 600000):")
        data = fetch_stock_data(symbol)
        
        # 检查数据获取是否成功
        if data is not None:
            ema = calculate_ema(data)
            std = calculate_std(data)
            plot_analysis(data, ema, std, symbol)
    
    
    if __name__ == '__main__':
        main()
    

    我来为您详细解释这个股票分析程序输出的图表和EMA指标的用法:

    程序输出的图表说明

    程序运行后会生成一个包含两个子图的图表:

    1. 股票价格与EMA分析(上图)

    • 蓝色曲线:股票的每日收盘价走势
    • 橙色曲线:20日指数移动平均线(EMA(20))
    • 标题:显示当前分析的股票代码和分析类型
    • 图例:标识不同曲线的含义

    2. 股票价格波动性(STD)分析(下图)

    • 红色曲线:20日收盘价的标准差(STD(20))
    • 标题:显示当前分析的股票代码和波动性分析类型
    • 图例:标识标准差曲线

    EMA指标详解

    什么是EMA?

    EMA(Exponential Moving Average)即指数移动平均线,是一种加权移动平均线,对最近的价格给予更多权重,反应更灵敏。

    EMA的计算方法

    程序中使用的计算公式:

    ema = data['Close'].ewm(span=window, adjust=False).mean()
    • span=window:设置计算窗口大小(这里是20天)
    • adjust=False:使用标准的指数移动平均计算公式,不进行调整

    数学公式:

    EMA(t) = Price(t) × k + EMA(t-1) × (1 - k)
    其中:k = 2 / (n + 1),n是窗口大小

    EMA与SMA的区别

    • SMA(简单移动平均):对所有周期价格给予相同权重,反应较迟缓
    • EMA(指数移动平均):对近期价格给予更高权重,能更快反映价格变化

    20日EMA的具体应用

    1. 判断趋势方向

    • 当价格在EMA上方:通常表示处于上升趋势
    • 当价格在EMA下方:通常表示处于下降趋势
    • EMA斜率向上:趋势可能继续上升
    • EMA斜率向下:趋势可能继续下降

    2. 支撑位与阻力位

    • 支撑位:价格下跌到EMA附近时可能获得支撑反弹
    • 阻力位:价格上涨到EMA附近时可能遇到阻力回调

    3. 交易信号

    • 金叉:短期EMA(如5日)上穿长期EMA(如20日),可能发出买入信号
    • 死叉:短期EMA下穿长期EMA,可能发出卖出信号

    4. 价格与EMA的偏离程度

    • 价格与EMA偏离过大时,通常会回归,这是一种超买超卖的判断依据

    结合标准差(STD)的分析

    • 标准差:衡量价格的波动性或风险程度
    • 高标准差:价格波动大,风险高,可能处于趋势反转或剧烈波动期
    • 低标准差:价格波动小,风险低,可能处于横盘整理期

    实际应用示例

    1. 趋势确认:如果价格持续在20日EMA上方运行,且EMA斜率向上,说明上升趋势强劲
    2. 入场时机:当价格回调到20日EMA附近获得支撑,并伴随成交量放大时,可能是较好的买入时机
    3. 止损设置:可以将止损位设置在20日EMA下方一定比例处,保护利润
    4. 风险评估:结合标准差曲线,如果价格上升但标准差下降,说明趋势稳定;如果标准差同时上升,说明波动加大,风险增加

    这个程序提供了基础的技术分析工具,您可以根据自己的交易策略进一步优化参数(如调整EMA的窗口大小)或结合其他技术指标(如MACD、RSI等)进行更全面的分析。

  • 股市每年暴跌节点

    股市大跌;12月中下旬。4月份上旬至五-日前。6月中下旬。8月下旬。

  • 采用akshare数据源python的TA-lib库计算股票多个指标

    readme.md

    requirementst.txt

    源代码:

    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    import talib as ta
    import akshare as ak
    import baostock as bs
    import datetime
    from matplotlib.gridspec import GridSpec
      
      # 设置中文显示
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "KaiTi", "FangSong"]
    plt.rcParams["axes.unicode_minus"] = False  # 正确显示负号
    
    class StockTechnicalAnalyzer:
        def __init__(self, data_source='baostock'):
            """
            初始化股票技术分析器
            data_source: 数据源,可选 'baostock' 或 'akshare'
            """
            self.data_source = data_source
            self.data = None
            self.stock_code = None
            self.start_date = None
            self.end_date = None
        
        def get_stock_data(self, stock_code, start_date, end_date):
            """从指定数据源获取股票数据"""
            self.stock_code = stock_code
            self.start_date = start_date
            self.end_date = end_date
            
            print(f"正在从{self.data_source}获取{stock_code}从{start_date}到{end_date}的数据...")
            
            if self.data_source == 'baostock':
                return self._get_baostock_data(stock_code, start_date, end_date)
            elif self.data_source == 'akshare':
                return self._get_akshare_data(stock_code, start_date, end_date)
            else:
                raise ValueError("数据源必须是 'baostock' 或 'akshare'")
        
        def _get_baostock_data(self, stock_code, start_date, end_date):
            """从baostock获取股票数据"""
            # 登录baostock
            lg = bs.login()
            if lg.error_code != '0':
                print(f"登录失败:{lg.error_msg}")
                return None
            
            # 获取股票数据
            rs = bs.query_history_k_data_plus(
                stock_code,
                "date,open,high,low,close,volume",
                start_date=start_date,
                end_date=end_date,
                frequency="d",
                adjustflag="2"  # 前复权
            )
            
            # 处理数据
            data_list = []
            while (rs.error_code == '0') & rs.next():
                data_list.append(rs.get_row_data())
            
            # 登出baostock
            bs.logout()
            
            # 转换为DataFrame并处理
            if not data_list:
                print("没有获取到数据")
                return None
                
            df = pd.DataFrame(data_list, columns=rs.fields)
            # 转换数据类型
            df['date'] = pd.to_datetime(df['date'])
            df['open'] = df['open'].astype(float)
            df['high'] = df['high'].astype(float)
            df['low'] = df['low'].astype(float)
            df['close'] = df['close'].astype(float)
            df['volume'] = df['volume'].astype(float)
            
            df.set_index('date', inplace=True)
            self.data = df
            print(f"成功获取{len(df)}条数据")
            return df
        
        def _get_akshare_data(self, stock_code, start_date, end_date):
            """从akshare获取股票数据"""
            try:
                # 对于A股,akshare的代码格式为 'sh600000' 或 'sz000000'
                # 转换baostock格式到akshare格式
                if stock_code.startswith('sh.'):
                    ak_code = 'sh' + stock_code[3:]
                elif stock_code.startswith('sz.'):
                    ak_code = 'sz' + stock_code[3:]
                else:
                    ak_code = stock_code
                    
                # 获取股票数据
                df = ak.stock_zh_a_daily(symbol=ak_code, start_date=start_date.replace('-', ''), 
                                       end_date=end_date.replace('-', ''), adjust="qfq")
                
                # 重命名列以统一格式
                if not df.empty:
                    df['date'] = pd.to_datetime(df['date'])
                    df.set_index('date', inplace=True)
                    self.data = df
                    print(f"成功获取{len(df)}条数据")
                    return df
                else:
                    print("没有获取到数据")
                    return None
            except Exception as e:
                print(f"获取数据时出错: {str(e)}")
                return None
        
        def calculate_indicators(self, rsi_period=14, macd_fast=12, macd_slow=26, macd_signal=9,
                               bb_period=20, bb_std=2, atr_period=14, cci_period=14, 
                               roc_period=12, wr_period=14, bias_period=6):
            """使用TA-Lib计算各种技术指标"""
            if self.data is None:
                print("请先获取股票数据")
                return None
            
            df = self.data.copy()
            
            # 计算RSI
            df['rsi'] = ta.RSI(df['close'], timeperiod=rsi_period)
            
            # 计算MACD
            df['macd'], df['macd_signal'], df['macd_hist'] = ta.MACD(
                df['close'], fastperiod=macd_fast, slowperiod=macd_slow, signalperiod=macd_signal
            )
            
            # 计算布林带
            df['bb_upper'], df['bb_middle'], df['bb_lower'] = ta.BBANDS(
                df['close'], timeperiod=bb_period, nbdevup=bb_std, nbdevdn=bb_std
            )
            
            # 计算ATR
            df['atr'] = ta.ATR(df['high'], df['low'], df['close'], timeperiod=atr_period)
            df['atr_pct'] = df['atr'] / df['close'] * 100
            
            # 计算MA
            df['ma5'] = ta.MA(df['close'], timeperiod=5)
            df['ma10'] = ta.MA(df['close'], timeperiod=10)
            df['ma20'] = ta.MA(df['close'], timeperiod=20)
            df['ma60'] = ta.MA(df['close'], timeperiod=60)
            
            # 计算成交量MA
            df['volume_ma5'] = ta.MA(df['volume'], timeperiod=5)
            df['volume_ma10'] = ta.MA(df['volume'], timeperiod=10)
            
            # 计算KDJ
            df['k'], df['d'] = ta.STOCH(df['high'], df['low'], df['close'])
            df['j'] = 3 * df['k'] - 2 * df['d']
            
            # 计算OBV
            df['obv'] = ta.OBV(df['close'], df['volume'])
            
            # 计算CCI (顺势指标)
            df['cci'] = ta.CCI(df['high'], df['low'], df['close'], timeperiod=cci_period)
            
            # 计算ROC (变化率)
            df['roc'] = ta.ROC(df['close'], timeperiod=roc_period)
            
            # 计算W&R (威廉指标)
            df['wr'] = -100 * ((df['high'].rolling(window=wr_period).max() - df['close']) / 
                              (df['high'].rolling(window=wr_period).max() - df['low'].rolling(window=wr_period).min()))
            
            # 计算BIAS (乖离率)
            df['bias'] = (df['close'] - df['close'].rolling(window=bias_period).mean()) / \
                         df['close'].rolling(window=bias_period).mean() * 100
            
            # 计算ENE指标 (轨道线)
            df['ene_middle'] = df['close'].rolling(window=10).mean()
            df['ene_upper'] = df['ene_middle'] * 1.1
            df['ene_lower'] = df['ene_middle'] * 0.9
            
            # 计算VI (波动率指标) - 基于真实波幅的移动平均
            df['vi_positive'] = ta.SMA(df['high'] - df['open'], timeperiod=14)
            df['vi_negative'] = ta.SMA(df['open'] - df['low'], timeperiod=14)
            
            # 计算DMI指标
            df['adx'] = ta.ADX(df['high'], df['low'], df['close'], timeperiod=14)
            df['plus_di'] = ta.PLUS_DI(df['high'], df['low'], df['close'], timeperiod=14)
            df['minus_di'] = ta.MINUS_DI(df['high'], df['low'], df['close'], timeperiod=14)
            
            # 计算PSY (心理线)
            df['price_change'] = df['close'].diff(1)
            df['positive_days'] = df['price_change'].apply(lambda x: 1 if x > 0 else 0)
            df['psy'] = df['positive_days'].rolling(window=12).sum() / 12 * 100
            
            # 清理临时列
            if 'price_change' in df.columns:
                df.drop('price_change', axis=1, inplace=True)
            if 'positive_days' in df.columns:
                df.drop('positive_days', axis=1, inplace=True)
            
            self.data = df
            return df
        
        def generate_signals(self):
            """基于技术指标生成交易信号"""
            if self.data is None:
                print("请先获取股票数据并计算指标")
                return None
            
            df = self.data.copy()
            df['signal'] = 0  # 0表示无信号,1表示买入,-1表示卖出
            
            # RSI超买超卖信号
            df.loc[df['rsi'] < 30, 'signal'] += 1  # RSI<30 考虑买入
            df.loc[df['rsi'] > 70, 'signal'] += -1  # RSI>70 考虑卖出
            
            # MACD金叉死叉信号
            macd_cross_up = (df['macd'] > df['macd_signal']) & (df['macd'].shift(1) <= df['macd_signal'].shift(1))
            macd_cross_down = (df['macd'] < df['macd_signal']) & (df['macd'].shift(1) >= df['macd_signal'].shift(1))
            df.loc[macd_cross_up, 'signal'] += 1  # MACD金叉 买入信号
            df.loc[macd_cross_down, 'signal'] += -1  # MACD死叉 卖出信号
            
            # 布林带突破信号
            bb_upper_break = (df['close'] > df['bb_upper']) & (df['close'].shift(1) <= df['bb_upper'].shift(1))
            bb_lower_break = (df['close'] < df['bb_lower']) & (df['close'].shift(1) >= df['bb_lower'].shift(1))
            df.loc[bb_upper_break, 'signal'] += 1  # 突破上轨 买入信号
            df.loc[bb_lower_break, 'signal'] += -1  # 跌破下轨 卖出信号
            
            # 简化信号:>0为买入,<0为卖出
            df['final_signal'] = 0
            df.loc[df['signal'] > 0, 'final_signal'] = 1
            df.loc[df['signal'] < 0, 'final_signal'] = -1
            
            self.data = df
            return df
        
        def plot_moving_averages(self):
            """单独绘制移动平均线图表"""
            if self.data is None:
                print("请先获取股票数据并计算指标")
                return None
            
            df = self.data.dropna()
            
            # 创建第二个图形
            fig = plt.figure(2, figsize=(16, 8))
            fig.clf()  # 清除图形内容但保留图形实例
            
            # 绘制价格和移动平均线
            plt.plot(df.index, df['close'], label='收盘价', linewidth=2)
            plt.plot(df.index, df['ma5'], label='MA5', alpha=0.7)
            plt.plot(df.index, df['ma10'], label='MA10', alpha=0.7)
            plt.plot(df.index, df['ma20'], label='MA20', alpha=0.7)
            plt.plot(df.index, df['ma60'], label='MA60', alpha=0.7)
            
            # 布林带
            plt.plot(df.index, df['bb_upper'], 'r--', label='上轨', alpha=0.6)
            plt.plot(df.index, df['bb_middle'], 'g--', label='中轨', alpha=0.6)
            plt.plot(df.index, df['bb_lower'], 'r--', label='下轨', alpha=0.6)
            plt.fill_between(df.index, df['bb_upper'], df['bb_lower'], alpha=0.1, color='gray')
            
            # ENE轨道线
            plt.plot(df.index, df['ene_upper'], 'c-.', label='ENE上轨', alpha=0.8)
            plt.plot(df.index, df['ene_middle'], 'm-.', label='ENE中轨', alpha=0.8)
            plt.plot(df.index, df['ene_lower'], 'c-.', label='ENE下轨', alpha=0.8)
            
            # 交易信号
            if 'final_signal' in df.columns:
                buy_signals = df[df['final_signal'] == 1]
                sell_signals = df[df['final_signal'] == -1]
                plt.scatter(buy_signals.index, buy_signals['close'], marker='^', color='g', 
                           label='买入信号', s=100, zorder=3)
                plt.scatter(sell_signals.index, sell_signals['close'], marker='v', color='r', 
                           label='卖出信号', s=100, zorder=3)
            
            plt.title(f'{self.stock_code} 价格与移动平均线 ({self.start_date} 至 {self.end_date})')
            plt.ylabel('价格')
            plt.grid(True, alpha=0.3)
            plt.legend()
            
            # 设置x轴日期格式
            plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=3))
            plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
            plt.xticks(rotation=45)
            
            plt.tight_layout()
            plt.show(block=True)  # 阻塞显示,等待用户关闭
            
            return fig
        
        def plot_all_indicators(self):
            """绘制所有技术指标图表(不含均线图)"""
            if self.data is None:
                print("请先获取股票数据并计算指标")
                return None
            
            df = self.data.dropna()  # 去除NaN值
            
            # 创建第三个图形
            fig = plt.figure(3, figsize=(16, 30))
            fig.clf()  # 清除图形内容但保留图形实例
            gs = GridSpec(9, 1, height_ratios=[1, 2, 2, 2, 2, 2, 2, 2, 2])
            
            # 1. 价格
            ax1 = plt.subplot(gs[0])
            ax1.plot(df.index, df['close'], label='收盘价', linewidth=2, color='blue')
            ax1.set_title(f'{self.stock_code} 价格')
            ax1.set_ylabel('价格')
            ax1.grid(True, alpha=0.3)
            
            # 2. 成交量
            ax2 = plt.subplot(gs[1], sharex=ax1)
            ax2.bar(df.index, df['volume'], label='成交量', alpha=0.6, color='gray')
            ax2.plot(df.index, df['volume_ma5'], label='成交量MA5', color='blue')
            ax2.plot(df.index, df['volume_ma10'], label='成交量MA10', color='red')
            ax2.set_ylabel('成交量')
            ax2.legend()
            ax2.grid(True, alpha=0.3)
            
            # 3. RSI指标
            ax3 = plt.subplot(gs[2], sharex=ax1)
            ax3.plot(df.index, df['rsi'], label=f'RSI', color='purple', linewidth=2)
            ax3.axhline(70, color='r', linestyle='--', alpha=0.7, label='超买线(70)')
            ax3.axhline(30, color='g', linestyle='--', alpha=0.7, label='超卖线(30)')
            ax3.axhline(50, color='b', linestyle='--', alpha=0.5)
            ax3.set_ylabel('RSI值')
            ax3.set_ylim(0, 100)
            ax3.legend()
            ax3.grid(True, alpha=0.3)
            
            # 4. MACD指标
            ax4 = plt.subplot(gs[3], sharex=ax1)
            ax4.plot(df.index, df['macd'], label='MACD', color='blue')
            ax4.plot(df.index, df['macd_signal'], label='信号线', color='red')
            ax4.bar(df.index, df['macd_hist'], label='柱状图', alpha=0.5, color='gray')
            ax4.axhline(0, color='black', linestyle='-', alpha=0.3)
            ax4.set_ylabel('MACD值')
            ax4.legend()
            ax4.grid(True, alpha=0.3)
            
            # 5. KDJ指标
            ax5 = plt.subplot(gs[4], sharex=ax1)
            ax5.plot(df.index, df['k'], label='K线', color='blue')
            ax5.plot(df.index, df['d'], label='D线', color='red')
            ax5.plot(df.index, df['j'], label='J线', color='green')
            ax5.axhline(80, color='r', linestyle='--', alpha=0.7, label='超买线(80)')
            ax5.axhline(20, color='g', linestyle='--', alpha=0.7, label='超卖线(20)')
            ax5.set_ylabel('KDJ值')
            ax5.set_ylim(0, 100)
            ax5.legend()
            ax5.grid(True, alpha=0.3)
            
            # 6. CCI指标
            ax6 = plt.subplot(gs[5], sharex=ax1)
            ax6.plot(df.index, df['cci'], label='CCI', color='orange', linewidth=2)
            ax6.axhline(100, color='r', linestyle='--', alpha=0.7, label='超买线(100)')
            ax6.axhline(-100, color='g', linestyle='--', alpha=0.7, label='超卖线(-100)')
            ax6.axhline(0, color='b', linestyle='--', alpha=0.5)
            ax6.set_ylabel('CCI值')
            ax6.legend()
            ax6.grid(True, alpha=0.3)
            
            # 7. W&R和BIAS指标
            ax7 = plt.subplot(gs[6], sharex=ax1)
            
            # W&R
            color1 = 'tab:red'
            ax7.set_ylabel('W&R(%)', color=color1)
            ax7.plot(df.index, df['wr'], color=color1, label='W&R')
            ax7.axhline(-20, color='r', linestyle='--', alpha=0.7, label='超买线(-20)')
            ax7.axhline(-80, color='g', linestyle='--', alpha=0.7, label='超卖线(-80)')
            ax7.tick_params(axis='y', labelcolor=color1)
            ax7.grid(True, alpha=0.3)
            
            # BIAS
            ax8 = ax7.twinx()
            color2 = 'tab:blue'
            ax8.set_ylabel('BIAS(%)', color=color2)
            ax8.plot(df.index, df['bias'], color=color2, label='BIAS')
            ax8.tick_params(axis='y', labelcolor=color2)
            
            # 合并图例
            lines1, labels1 = ax7.get_legend_handles_labels()
            lines2, labels2 = ax8.get_legend_handles_labels()
            ax7.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
            
            # 8. VI指标
            ax9 = plt.subplot(gs[7], sharex=ax1)
            ax9.plot(df.index, df['vi_positive'], 'g-', label='VI+', alpha=0.7)
            ax9.plot(df.index, df['vi_negative'], 'r-', label='VI-', alpha=0.7)
            ax9.set_ylabel('VI值')
            ax9.legend()
            ax9.grid(True, alpha=0.3)
            
            # 9. OBV指标
            ax10 = plt.subplot(gs[8], sharex=ax1)
            ax10.set_ylabel('OBV')
            ax10.plot(df.index, df['obv'], color='tab:blue', label='OBV')
            ax10.tick_params(axis='y')
            ax10.legend()
            ax10.grid(True, alpha=0.3)
            
            # 设置x轴日期格式
            ax10.xaxis.set_major_locator(mdates.MonthLocator(interval=3))
            ax10.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
            plt.xticks(rotation=45)
            
            plt.suptitle(f'{self.stock_code} 技术指标分析 ({self.start_date} 至 {self.end_date})', fontsize=16)
            plt.tight_layout()
            plt.subplots_adjust(top=0.97)
            plt.show(block=True)  # 阻塞显示,等待用户关闭
            
            return fig
        
        def plot_simplified(self):
            """绘制简化版图表(价格、RSI、MACD)"""
            if self.data is None:
                print("请先获取股票数据并计算指标")
                return None
            
            df = self.data.dropna()
            
            # 直接创建新图形,确保是第一个显示的图表
            fig = plt.figure(1, figsize=(16, 15))
            fig.clf()  # 清除图形内容但保留图形实例
            ax1 = plt.subplot(3, 1, 1)
            ax2 = plt.subplot(3, 1, 2, sharex=ax1)
            ax3 = plt.subplot(3, 1, 3, sharex=ax1)
            
            # 1. 价格和布林带
            ax1.plot(df.index, df['close'], label='收盘价', linewidth=2)
            ax1.plot(df.index, df['bb_upper'], 'r--', label='上轨')
            ax1.plot(df.index, df['bb_middle'], 'g--', label='中轨')
            ax1.plot(df.index, df['bb_lower'], 'r--', label='下轨')
            ax1.fill_between(df.index, df['bb_upper'], df['bb_lower'], alpha=0.1, color='gray')
            
            if 'final_signal' in df.columns:
                buy_signals = df[df['final_signal'] == 1]
                sell_signals = df[df['final_signal'] == -1]
                ax1.scatter(buy_signals.index, buy_signals['close'], marker='^', color='g', 
                           label='买入信号', s=100, zorder=3)
                ax1.scatter(sell_signals.index, sell_signals['close'], marker='v', color='r', 
                           label='卖出信号', s=100, zorder=3)
            
            ax1.set_title(f'{self.stock_code} 价格与布林带')
            ax1.set_ylabel('价格')
            ax1.legend()
            ax1.grid(True, alpha=0.3)
            
            # 2. RSI
            ax2.plot(df.index, df['rsi'], label='RSI', color='purple', linewidth=2)
            ax2.axhline(70, color='r', linestyle='--', alpha=0.7)
            ax2.axhline(30, color='g', linestyle='--', alpha=0.7)
            ax2.axhline(50, color='b', linestyle='--', alpha=0.5)
            ax2.set_ylabel('RSI值')
            ax2.set_ylim(0, 100)
            ax2.legend()
            ax2.grid(True, alpha=0.3)
            
            # 3. MACD
            ax3.plot(df.index, df['macd'], label='MACD', color='blue')
            ax3.plot(df.index, df['macd_signal'], label='信号线', color='red')
            ax3.bar(df.index, df['macd_hist'], label='柱状图', alpha=0.5)
            ax3.axhline(0, color='black', linestyle='-', alpha=0.3)
            ax3.set_ylabel('MACD值')
            ax3.legend()
            ax3.grid(True, alpha=0.3)
            
            # 设置x轴日期格式
            ax3.xaxis.set_major_locator(mdates.MonthLocator(interval=3))
            ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
            plt.xticks(rotation=45)
            
            plt.tight_layout()
            plt.show(block=True)  # 阻塞显示,等待用户关闭
            
            return fig
    
    def main():
        # 创建分析器实例
        analyzer = StockTechnicalAnalyzer(data_source='baostock')  # 可选 'baostock' 或 'akshare'
        
        # 设置股票代码和日期范围
        #stock_code = 'sh.600938'  # 中国海油
        stock_code = 'sz.000651'  # 格力电器
        end_date = datetime.datetime.now().strftime("%Y-%m-%d")
        start_date = (datetime.datetime.now() - datetime.timedelta(days=365*2)).strftime("%Y-%m-%d")  # 过去2年
        
        # 获取数据
        data = analyzer.get_stock_data(stock_code, start_date, end_date)
        if data is None or len(data) < 60:
            print("数据不足,无法进行分析")
            return
        
        # 计算技术指标
        analyzer.calculate_indicators()
        
        # 生成交易信号
        analyzer.generate_signals()
        
        # 绘制简化图表
        print("正在绘制简化图表...")
        analyzer.plot_simplified()
        
        # 单独绘制移动平均线图表
        print("正在绘制移动平均线图表...")
        analyzer.plot_moving_averages()
        
        # 绘制完整图表(包含所有指标,不含详细均线)
        print("正在绘制完整指标图表...")
        analyzer.plot_all_indicators()
        
        # 显示最新的指标值
        latest_data = analyzer.data.iloc[-1]
        print(f"\n最新指标值({latest_data.name.date()}):")
        print(f"RSI(14): {latest_data['rsi']:.2f}")
        print(f"MACD: {latest_data['macd']:.4f}, 信号线: {latest_data['macd_signal']:.4f}, 柱状图: {latest_data['macd_hist']:.4f}")
        print(f"KDJ: K={latest_data['k']:.2f}, D={latest_data['d']:.2f}, J={latest_data['j']:.2f}")
        print(f"ATR百分比: {latest_data['atr_pct']:.2f}%")
        print(f"CCI: {latest_data['cci']:.2f}")
        print(f"W&R: {latest_data['wr']:.2f}%")
        print(f"BIAS: {latest_data['bias']:.2f}%")
        print(f"ROC: {latest_data['roc']:.2f}%")
        print(f"PSY: {latest_data['psy']:.2f}%")
        print(f"ENE: 上轨={latest_data['ene_upper']:.2f}, 中轨={latest_data['ene_middle']:.2f}, 下轨={latest_data['ene_lower']:.2f}")
        print(f"VI: VI+={latest_data['vi_positive']:.2f}, VI-={latest_data['vi_negative']:.2f}")
    
    if __name__ == "__main__":
        main()
  • 股票标示顶底、量价背离python代码

    实现了利用DVI和MACD指标判断股票量价顶底背离的Python代码。主要完成了以下工作:

    1. 创建了 vi_macd_divergence.py 文件,实现了完整的分析系统
    2. 使用baostock作为数据源获取股票历史数据
    3. 实现了DVI(Directional Volume Indicator)指标的计算,该指标比普通VI更能反映成交量与价格方向的关系
    4. 计算了标准MACD指标作为辅助判断。开发了多种背离检测算法:
    • 价格与MACD的顶底背离
    • 价格与DVI的顶底背离
    • 量价背离(价格变动与成交量变化不一致的情况)
    1. 实现了可视化功能,绘制包含价格、成交量、DVI和MACD的综合图表,并在图上标记各种背离点
    2. 添加了详细的背离统计摘要输出功能代码已经成功运行,能够检测并显示各种背离信号,并将结果保存为图片文件。用户可以通过修改 main() 函数中的股票代码和时间范围来分析不同的股票。
    import baostock as bs
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    from datetime import datetime, timedelta
    import warnings
    warnings.filterwarnings('ignore')
    
    # 设置中文字体
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
    plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号
    
    class StockAnalyzer:
        def __init__(self):
            # 登录baostock
            self.bs = bs
            lg = bs.login()
            print(f'登录状态: {lg.error_code} {lg.error_msg}')
            self.data = None
            self.stock_code = None
            self.start_date = None
            self.end_date = None
        
        def logout(self):
            # 登出baostock
            bs.logout()
        
        def get_stock_data(self, stock_code, start_date, end_date):
            """
            从baostock获取股票数据
            """
            self.stock_code = stock_code
            self.start_date = start_date
            self.end_date = end_date
            
            # 获取K线数据
            rs = bs.query_history_k_data_plus(
                stock_code,
                'date,open,high,low,close,volume',
                start_date=start_date,
                end_date=end_date,
                frequency='d',
                adjustflag='2'  # 前复权
            )
            
            # 处理数据
            data_list = []
            while (rs.error_code == '0') & rs.next():
                data_list.append(rs.get_row_data())
            
            if data_list:
                self.data = pd.DataFrame(data_list, columns=rs.fields)
                # 转换数据类型
                for col in ['open', 'high', 'low', 'close', 'volume']:
                    self.data[col] = pd.to_numeric(self.data[col])
                self.data['date'] = pd.to_datetime(self.data['date'])
                self.data.set_index('date', inplace=True)
                
                print(f'成功获取 {stock_code} 的数据,共 {len(self.data)} 条记录')
                return True
            else:
                print(f'获取数据失败: {rs.error_code} {rs.error_msg}')
                return False
        
        def calculate_indicators(self):
            """
            计算DVI和MACD指标
            DVI (Directional Volume Indicator) 是一个量价指标,用于分析成交量和价格之间的关系
            """
            if self.data is None:
                print("请先获取股票数据")
                return False
            
            # 计算DVI指标
            # 1. 计算价格变化方向
            self.data['price_change'] = self.data['close'] - self.data['close'].shift(1)
            self.data['direction'] = np.where(self.data['price_change'] > 0, 1, 
                                            np.where(self.data['price_change'] < 0, -1, 0))
            
            # 2. 计算方向成交量
            self.data['directional_volume'] = self.data['volume'] * self.data['direction']
            
            # 3. 计算DVI的两个主要组件
            # DVI_MoneyFlow: 反映价格方向上的资金流动
            self.data['dvi_moneyflow'] = self.data['directional_volume'] * self.data['price_change'].abs()
            
            # 4. 计算DVI线
            # 短期移动平均 (通常使用20天)
            self.data['dvi_short'] = self.data['dvi_moneyflow'].rolling(window=20).sum()
            # 长期移动平均 (通常使用50天)
            self.data['dvi_long'] = self.data['dvi_moneyflow'].rolling(window=50).sum()
            # 标准化DVI
            self.data['dvi'] = (self.data['dvi_short'] - self.data['dvi_long']) / \
                              (self.data['dvi_short'].abs() + self.data['dvi_long'].abs() + 1e-10)
            
            # 5. 计算DVI信号线 (通常是DVI的10天移动平均)
            self.data['dvi_signal'] = self.data['dvi'].rolling(window=10).mean()
            
            # 计算MACD
            exp1 = self.data['close'].ewm(span=12, adjust=False).mean()
            exp2 = self.data['close'].ewm(span=26, adjust=False).mean()
            self.data['macd'] = exp1 - exp2
            self.data['signal'] = self.data['macd'].ewm(span=9, adjust=False).mean()
            self.data['histogram'] = self.data['macd'] - self.data['signal']
            
            return True
        
        def detect_divergence(self, lookback_period=20):
            """
            检测量价顶底背离
            - 顶背离: 价格创新高但指标不创新高
            - 底背离: 价格创新低但指标不创新低
            """
            if self.data is None or 'dvi' not in self.data.columns or 'macd' not in self.data.columns:
                print("请先获取股票数据并计算指标")
                return False
            
            # 初始化背离标记列
            self.data['price_top_divergence'] = 0     # 价格顶背离
            self.data['price_bottom_divergence'] = 0  # 价格底背离
            self.data['dvi_top_divergence'] = 0       # DVI顶背离
            self.data['dvi_bottom_divergence'] = 0    # DVI底背离
            self.data['macd_top_divergence'] = 0      # MACD顶背离
            self.data['macd_bottom_divergence'] = 0   # MACD底背离
            self.data['volume_price_divergence'] = 0  # 量价背离
            
            for i in range(lookback_period, len(self.data)):
                # 获取回顾期内的数据
                lookback_data = self.data.iloc[i-lookback_period:i]
                current_data = self.data.iloc[i]
                
                # 1. 价格与MACD顶底背离检测
                # 顶背离: 价格创新高,MACD未创新高
                if (current_data['high'] > lookback_data['high'].max() and 
                    current_data['macd'] < lookback_data['macd'].max()):
                    self.data['price_top_divergence'].iloc[i] = 1
                    self.data['macd_top_divergence'].iloc[i] = 1
                
                # 底背离: 价格创新低,MACD未创新低
                if (current_data['low'] < lookback_data['low'].min() and 
                    current_data['macd'] > lookback_data['macd'].min()):
                    self.data['price_bottom_divergence'].iloc[i] = 1
                    self.data['macd_bottom_divergence'].iloc[i] = 1
                
                # 2. 价格与DVI顶底背离检测
                # DVI顶背离: 价格创新高,DVI未创新高
                if (current_data['high'] > lookback_data['high'].max() and 
                    current_data['dvi'] < lookback_data['dvi'].max()):
                    self.data['dvi_top_divergence'].iloc[i] = 1
                
                # DVI底背离: 价格创新低,DVI未创新低
                if (current_data['low'] < lookback_data['low'].min() and 
                    current_data['dvi'] > lookback_data['dvi'].min()):
                    self.data['dvi_bottom_divergence'].iloc[i] = 1
                
                # 3. 量价背离检测
                # 上升趋势中的量价背离: 价格上涨但成交量减少
                if (current_data['close'] > current_data['open'] and i > 0 and 
                    current_data['volume'] < self.data['volume'].iloc[i-1] * 0.9):
                    self.data['volume_price_divergence'].iloc[i] = 1
                
                # 下降趋势中的量价背离: 价格下跌但成交量减少
                if (current_data['close'] < current_data['open'] and i > 0 and 
                    current_data['volume'] < self.data['volume'].iloc[i-1] * 0.9):
                    self.data['volume_price_divergence'].iloc[i] = -1
            
            return True
        
        def plot_indicators(self):
            """
            绘制价格、成交量、DVI和MACD指标图,并标记背离点
            """
            if self.data is None:
                print("请先获取股票数据")
                return False
            
            fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(15, 14), sharex=True)
            
            # 绘制价格图
            ax1.plot(self.data.index, self.data['close'], label='收盘价', color='blue')
            # 标记顶底背离
            top_div = self.data[self.data['price_top_divergence'] == 1]
            bottom_div = self.data[self.data['price_bottom_divergence'] == 1]
            ax1.scatter(top_div.index, top_div['high'], color='red', marker='v', s=150, label='顶背离', zorder=5)
            ax1.scatter(bottom_div.index, bottom_div['low'], color='green', marker='^', s=150, label='底背离', zorder=5)
            ax1.set_title(f'{self.stock_code} 价格与背离分析')
            ax1.set_ylabel('价格')
            ax1.legend()
            ax1.grid(True, alpha=0.3)
            
            # 绘制成交量图
            # 根据价格变动着色成交量柱
            colors = ['red' if close > open else 'green' for close, open in zip(self.data['close'], self.data['open'])]
            ax2.bar(self.data.index, self.data['volume'], color=colors, alpha=0.6, label='成交量')
            # 标记量价背离
            vol_div_pos = self.data[self.data['volume_price_divergence'] == 1]
            vol_div_neg = self.data[self.data['volume_price_divergence'] == -1]
            ax2.scatter(vol_div_pos.index, vol_div_pos['volume'], color='orange', marker='o', s=100, label='量价正背离', zorder=5)
            ax2.scatter(vol_div_neg.index, vol_div_neg['volume'], color='purple', marker='o', s=100, label='量价负背离', zorder=5)
            ax2.set_ylabel('成交量')
            ax2.legend()
            ax2.grid(True, alpha=0.3)
            
            # 绘制DVI指标图
            ax3.plot(self.data.index, self.data['dvi'], label='DVI', color='blue')
            ax3.plot(self.data.index, self.data['dvi_signal'], label='DVI Signal', color='red', linestyle='--')
            # 标记DVI背离
            dvi_top_div = self.data[self.data['dvi_top_divergence'] == 1]
            dvi_bottom_div = self.data[self.data['dvi_bottom_divergence'] == 1]
            ax3.scatter(dvi_top_div.index, dvi_top_div['dvi'], color='red', marker='v', s=100, label='DVI顶背离', zorder=5)
            ax3.scatter(dvi_bottom_div.index, dvi_bottom_div['dvi'], color='green', marker='^', s=100, label='DVI底背离', zorder=5)
            ax3.axhline(y=0, color='black', linestyle='--', alpha=0.3)
            ax3.set_ylabel('DVI指标')
            ax3.legend()
            ax3.grid(True, alpha=0.3)
            
            # 绘制MACD图
            ax4.plot(self.data.index, self.data['macd'], label='MACD', color='blue')
            ax4.plot(self.data.index, self.data['signal'], label='Signal', color='red')
            # 根据柱状图值着色
            histogram_colors = ['red' if x > 0 else 'green' for x in self.data['histogram']]
            ax4.bar(self.data.index, self.data['histogram'], label='Histogram', color=histogram_colors, alpha=0.3)
            # 标记MACD背离
            macd_top_div = self.data[self.data['macd_top_divergence'] == 1]
            macd_bottom_div = self.data[self.data['macd_bottom_divergence'] == 1]
            ax4.scatter(macd_top_div.index, macd_top_div['macd'], color='red', marker='v', s=100, label='MACD顶背离', zorder=5)
            ax4.scatter(macd_bottom_div.index, macd_bottom_div['macd'], color='green', marker='^', s=100, label='MACD底背离', zorder=5)
            ax4.axhline(y=0, color='black', linestyle='--', alpha=0.3)
            ax4.set_ylabel('MACD')
            ax4.set_xlabel('日期')
            ax4.legend()
            ax4.grid(True, alpha=0.3)
            
            # 设置日期格式
            locator = mdates.AutoDateLocator(minticks=10, maxticks=20)
            formatter = mdates.ConciseDateFormatter(locator)
            ax4.xaxis.set_major_locator(locator)
            ax4.xaxis.set_major_formatter(formatter)
            
            plt.tight_layout()
            plt.savefig(f'{self.stock_code}_dvi_macd_divergence.png', dpi=300, bbox_inches='tight')
            plt.show()
            
            return True
        
        def print_divergence_summary(self):
            """
            打印背离情况摘要
            """
            if self.data is None:
                print("请先获取股票数据")
                return False
            
            print(f"\n{self.stock_code} 背离分析摘要 ({self.start_date} 至 {self.end_date})")
            print(f"检测到的价格顶背离次数: {self.data['price_top_divergence'].sum()}")
            print(f"检测到的价格底背离次数: {self.data['price_bottom_divergence'].sum()}")
            print(f"检测到的DVI顶背离次数: {self.data['dvi_top_divergence'].sum()}")
            print(f"检测到的DVI底背离次数: {self.data['dvi_bottom_divergence'].sum()}")
            print(f"检测到的MACD顶背离次数: {self.data['macd_top_divergence'].sum()}")
            print(f"检测到的MACD底背离次数: {self.data['macd_bottom_divergence'].sum()}")
            print(f"检测到的量价正背离次数: {(self.data['volume_price_divergence'] == 1).sum()}")
            print(f"检测到的量价负背离次数: {(self.data['volume_price_divergence'] == -1).sum()}")
            
            # 最近的背离点
            recent_price_top = self.data[self.data['price_top_divergence'] == 1].index.max()
            recent_price_bottom = self.data[self.data['price_bottom_divergence'] == 1].index.max()
            recent_dvi_top = self.data[self.data['dvi_top_divergence'] == 1].index.max()
            recent_dvi_bottom = self.data[self.data['dvi_bottom_divergence'] == 1].index.max()
            
            if pd.notna(recent_price_top):
                print(f"\n最近的价格顶背离日期: {recent_price_top.strftime('%Y-%m-%d')}")
            if pd.notna(recent_price_bottom):
                print(f"最近的价格底背离日期: {recent_price_bottom.strftime('%Y-%m-%d')}")
            if pd.notna(recent_dvi_top):
                print(f"最近的DVI顶背离日期: {recent_dvi_top.strftime('%Y-%m-%d')}")
            if pd.notna(recent_dvi_bottom):
                print(f"最近的DVI底背离日期: {recent_dvi_bottom.strftime('%Y-%m-%d')}")
            
            return True
    
    def main():
        # 创建分析器实例
        analyzer = StockAnalyzer()
        
        try:
            # 设置参数
            stock_code = "sh.600938"  # 浦发银行
            end_date = datetime.now().strftime('%Y-%m-%d')
            start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
            
            # 获取数据
            if analyzer.get_stock_data(stock_code, start_date, end_date):
                # 计算指标
                analyzer.calculate_indicators()
                
                # 检测背离
                analyzer.detect_divergence()
                
                # 打印摘要
                analyzer.print_divergence_summary()
                
                # 绘制图形
                analyzer.plot_indicators()
        
        finally:
            # 登出
            analyzer.logout()
    
    if __name__ == "__main__":
        main()
  • 某只股票盘整分析pytthon程序解析

    格力电器 000651

     根据你提供的信息和代码片段,这个程序是一个股票盘整分析工具,主要用于识别股票价格的盘整区间并进行技术分析。下面是对程序功能的详细解释: 程序核心功能

    1. 股票数据获取
    • 程序使用baostock库获取股票的历史交易数据
    • 包括开盘价、收盘价、最高价、最低价和成交量等信息
    1. 技术指标计算
    • 计算布林带(Bollinger Bands)指标,包括上轨、中轨和下轨
    • 计算相对强弱指标(RSI),用于衡量股票的超买超卖状态
    • 计算平均真实波动幅度百分比(ATR%),用于衡量价格波动性
    1. 盘整区间识别
    • 程序能够自动识别股票价格的盘整区间
    • 盘整通常表现为价格在一定范围内波动,没有明显的上涨或下跌趋势
    • 可能通过比较价格与布林带的关系、RSI值的稳定性等来判断
    1. 数据可视化
    • 使用matplotlib创建图表展示分析结果
    • 包含至少两个子图:
      • 第一个子图显示股票价格和布林带指标
      • 第二个子图显示RSI指标和ATR百分比
    • 程序会在图表中标记识别出的盘整区间

    技术指标解释

    1. 布林带(Bollinger Bands)
    • 中轨(middle_band):通常是20日移动平均线
    • 上轨(upper_band):中轨加上2倍标准差
    • 下轨(lower_band):中轨减去2倍标准差
    • 当价格贴近上轨或下轨时,可能表示超买或超卖
    • 当布林带收窄时,通常预示着市场即将出现大的波动
    1. 相对强弱指标(RSI)
    • 取值范围0-100,通常以30和70作为超卖和超买的界限
    • RSI>70表示超买,可能面临回调
    • RSI<30表示超卖,可能即将反弹
    • 在盘整区间内,RSI通常在30-70之间波动
    1. 平均真实波动幅度百分比(ATR%)
    • 衡量价格波动性的指标,以百分比形式表示
    • 低ATR%值通常出现在盘整区间,表示波动性较小
    • 高ATR%值通常出现在趋势明显的市场,表示波动性较大

    程序结构推测

    基于常见的Python数据分析程序结构,该程序可能包含以下几个主要函数:

    1. 数据获取函数:负责从baostock获取股票数据
    2. 指标计算函数:计算布林带、RSI、ATR等技术指标
    3. 盘整识别函数:根据设定的条件识别盘整区间
    4. 可视化函数:创建图表展示分析结果
    5. 主函数:协调各个功能模块的执行流程

    实际应用价值

    1. 交易决策辅助:通过识别盘整区间,可以帮助交易者判断突破时机
    2. 风险控制:了解股票的波动特性,有助于设置合理的止损和止盈位
    3. 趋势判断:结合多个技术指标,可以更准确地判断股票的未来走势
    4. 量化分析基础:为进一步的量化交易策略开发提供技术支持

    这个程序适合对股票技术分析感兴趣的投资者和交易者使用,通过可视化的方式直观地展示股票的盘整特性和技术指标状态,帮助做出更明智的投资决策。

    python源代码

    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import baostock as bs
    import time
    import matplotlib.font_manager as fm
    
    # 获取股票数据
    def get_stock_data(stock_code, start_date, end_date):
        # 登录baostock
        lg = bs.login()
        if lg.error_code != '0':
            print(f"登录失败: {lg.error_msg}")
            # 如果登录失败,使用模拟数据
            return generate_sample_data(start_date, end_date)
        
        # 获取日线数据
        rs = bs.query_history_k_data_plus(
            stock_code,
            "date,open,high,low,close,volume",
            start_date=start_date,
            end_date=end_date,
            frequency="d",
            adjustflag="3"  # 不复权
        )
        
        # 处理数据
        if rs.error_code != '0':
            print(f"获取数据失败: {rs.error_msg}")
            bs.logout()
            # 如果获取数据失败,使用模拟数据
            return generate_sample_data(start_date, end_date)
        
        # 转化为DataFrame
        data_list = []
        while (rs.error_code == '0') & rs.next():
            data_list.append(rs.get_row_data())
        
        if not data_list:
            print("未获取到数据,使用模拟数据")
            bs.logout()
            return generate_sample_data(start_date, end_date)
        
        df = pd.DataFrame(data_list, columns=rs.fields)
        
        # 数据类型转换
        df['date'] = pd.to_datetime(df['date'])
        df['open'] = df['open'].astype(float)
        df['high'] = df['high'].astype(float)
        df['low'] = df['low'].astype(float)
        df['close'] = df['close'].astype(float)
        df['volume'] = df['volume'].astype(float)
        
        # 设置索引并排序
        df = df.sort_values('date')
        df.set_index('date', inplace=True)
        
        # 登出baostock
        bs.logout()
        
        print(f"成功获取{stock_code}的{start_date}至{end_date}数据,共{len(df)}条")
        return df
    
    # 生成模拟数据(当无法获取真实数据时使用)
    def generate_sample_data(start_date, end_date):
        print("使用模拟数据进行演示")
        dates = pd.date_range(start=start_date, end=end_date)
        days = len(dates)
        
        # 生成一个有趋势和盘整的价格序列
        np.random.seed(42)
        base_price = 100
        trend = base_price + np.random.normal(0, 2, days).cumsum()
        
        # 在中间部分添加盘整行情
        consolidation_start = int(days * 0.3)
        consolidation_end = int(days * 0.7)
        consolidation_mean = trend[consolidation_start]
        trend[consolidation_start:consolidation_end] = consolidation_mean + np.random.normal(0, 1.5, consolidation_end-consolidation_start)
        
        df = pd.DataFrame({
            'open': trend - np.random.uniform(0, 1, days),
            'high': trend + np.random.uniform(0.5, 2, days),
            'low': trend - np.random.uniform(0.5, 2, days),
            'close': trend,
            'volume': np.random.randint(10000, 100000, days)
        }, index=dates)
        
        return df
    
    # 手动计算布林带宽度(不使用TA-Lib)
    def calculate_bollinger_band_width(df, window=20):
        # 计算移动平均线
        df['middle_band'] = df['close'].rolling(window=window).mean()
        # 计算标准差
        rolling_std = df['close'].rolling(window=window).std()
        # 计算上下布林带
        df['upper_band'] = df['middle_band'] + (rolling_std * 2)
        df['lower_band'] = df['middle_band'] - (rolling_std * 2)
        # 计算布林带宽度
        df['bb_width'] = (df['upper_band'] - df['lower_band']) / df['middle_band']
        return df
    
    # 手动计算RSI(不使用TA-Lib)
    def calculate_rsi(df, window=14):
        delta = df['close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        
        # 避免除零错误
        rs = gain / loss.replace(0, np.nan)
        df['rsi'] = 100 - (100 / (1 + rs))
        return df
    
    # 手动计算ATR(不使用TA-Lib)
    def calculate_atr(df, window=14):
        # 计算每日真实波动幅度
        df['tr1'] = df['high'] - df['low']
        df['tr2'] = abs(df['high'] - df['close'].shift(1))
        df['tr3'] = abs(df['low'] - df['close'].shift(1))
        df['tr'] = df[['tr1', 'tr2', 'tr3']].max(axis=1)
        
        # 计算ATR(移动平均真实波动幅度)
        df['atr'] = df['tr'].rolling(window=window).mean()
        
        # 清理临时列
        df.drop(['tr1', 'tr2', 'tr3', 'tr'], axis=1, inplace=True)
        
        # 计算ATR百分比(相对于收盘价)
        df['atr_pct'] = df['atr'] / df['close'] * 100
        return df
    
    # 判断是否处于盘整状态
    def detect_consolidation(df, lookback_period=20):
        # 计算各种技术指标
        df = calculate_bollinger_band_width(df)
        df = calculate_rsi(df)
        df = calculate_atr(df)
        
        # 移除NaN值
        df = df.dropna()
        
        if len(df) < lookback_period:
            print(f"数据不足{lookback_period}天,无法进行盘整判断")
            return False, [], {}
        
        # 计算最近一段时间的平均指标值
        recent_df = df.tail(lookback_period)
        avg_bb_width = recent_df['bb_width'].mean()
        avg_rsi = recent_df['rsi'].mean()
        avg_atr_pct = recent_df['atr_pct'].mean()
        
        # 计算价格变动范围
        price_range_pct = (recent_df['close'].max() - recent_df['close'].min()) / recent_df['close'].min() * 100
        
        # 设置盘整判断阈值
        bb_width_threshold = 0.05  # 布林带宽度阈值
        rsi_threshold = 15  # RSI波动范围阈值
        atr_pct_threshold = 1.5  # ATR百分比阈值
        price_range_threshold = 8  # 价格变动范围阈值
        
        # 判断是否处于盘整状态
        is_consolidating = False
        signals = []
        
        if avg_bb_width < bb_width_threshold:
            signals.append("布林带收窄")
        if abs(avg_rsi - 50) < rsi_threshold:
            signals.append("RSI在中间区域")
        if avg_atr_pct < atr_pct_threshold:
            signals.append("波动率低")
        if price_range_pct < price_range_threshold:
            signals.append("价格波动范围小")
        
        # 如果满足多个条件,则判断为盘整
        if len(signals) >= 3:
            is_consolidating = True
        
        return is_consolidating, signals, {
            'avg_bb_width': avg_bb_width,
            'avg_rsi': avg_rsi,
            'avg_atr_pct': avg_atr_pct,
            'price_range_pct': price_range_pct
        }
    
    # 可视化结果
    # 在导入matplotlib后添加简单的字体设置
    import matplotlib.pyplot as plt
    # 只保留最基本的Windows中文字体设置
    plt.rcParams["font.family"] = ["SimHei"]
    plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题
    
    def plot_consolidation(df, consolidation_periods, stock_code):
        plt.figure(figsize=(14, 8))
        
        # 绘制价格和布林带
        plt.subplot(2, 1, 1)
        plt.plot(df.index, df['close'], label='收盘价')
        plt.plot(df.index, df['upper_band'], 'r--', label='上布林带')
        plt.plot(df.index, df['middle_band'], 'g--', label='中布林带')
        plt.plot(df.index, df['lower_band'], 'r--', label='下布林带')
        
        # 标记盘整区间
        for period in consolidation_periods:
            if len(period) > 0:
                plt.axvspan(period[0], period[-1], color='gray', alpha=0.3)
        
        plt.title(f'{stock_code} 股票价格与布林带')
        plt.legend()
        
        # 绘制RSI和ATR百分比
        plt.subplot(2, 1, 2)
        plt.plot(df.index, df['rsi'], label='RSI')
        plt.axhline(70, color='r', linestyle='--')
        plt.axhline(30, color='g', linestyle='--')
        plt.axhline(50, color='b', linestyle='--', alpha=0.5)
        plt.ylabel('RSI')
        plt.legend(loc='upper left')
        
        plt.twinx()
        plt.plot(df.index, df['atr_pct'], 'c-', label='ATR百分比')
        plt.ylabel('ATR百分比 (%)', color='c')
        plt.grid(True, alpha=0.3)
        plt.title('RSI与ATR百分比')
        plt.legend(loc='upper right')
        
        plt.tight_layout()
        plt.show()
    
    # 主函数
    def main():
        # 获取股票数据
        #stock_code = 'sh.600938'  # 中国海油
        stock_code = 'sz.000651' # 格力电器
        start_date = '2024-01-01'
        end_date = '2025-12-31'
        
        df = get_stock_data(stock_code, start_date, end_date)
        
        # 检测最近一段时间是否处于盘整状态
        lookback_period = 20
        is_consolidating, signals, metrics = detect_consolidation(df, lookback_period)
        
        print(f"股票代码: {stock_code}")
        print(f"当前是否处于盘整状态: {'是' if is_consolidating else '否'}")
        print(f"盘整信号: {', '.join(signals) if signals else '无'}")
        if metrics:
            print(f"指标值: 布林带宽度={metrics['avg_bb_width']:.4f}, RSI={metrics['avg_rsi']:.2f}")
            print(f"        ATR百分比={metrics['avg_atr_pct']:.2f}%, 价格波动范围={metrics['price_range_pct']:.2f}%")
        
        # 检测整个时间序列中的盘整区间
        window_size = 20
        step = 5
        consolidation_periods = []
        
        # 先预处理数据以避免重复计算
        df = calculate_bollinger_band_width(df)
        df = calculate_rsi(df)
        df = calculate_atr(df)
        df = df.dropna()
        
        if len(df) >= window_size:
            for i in range(0, len(df) - window_size + 1, step):
                window_df = df.iloc[i:i+window_size][['close', 'high', 'low', 'bb_width', 'rsi', 'atr_pct']]
                
                # 计算窗口内的指标
                avg_bb_width = window_df['bb_width'].mean()
                avg_rsi = window_df['rsi'].mean()
                avg_atr_pct = window_df['atr_pct'].mean()
                price_range_pct = (window_df['close'].max() - window_df['close'].min()) / window_df['close'].min() * 100
                
                # 判断是否盘整
                signals_count = 0
                if avg_bb_width < 0.05: signals_count += 1
                if abs(avg_rsi - 50) < 15: signals_count += 1
                if avg_atr_pct < 1.5: signals_count += 1
                if price_range_pct < 8: signals_count += 1
                
                if signals_count >= 3:
                    consolidation_periods.append(window_df.index)
        
        # 可视化结果
        plot_consolidation(df, consolidation_periods, stock_code)
    
    if __name__ == "__main__":
        main()
    
  • 中国海油10月16号分红

    分红登记日当天股价26块4毛1。除权后25块7毛4。每股分红6块6毛6。本人两个账号持股5000股。分红总计3000多元。

  • 某只股票5、20均线策略python代码

    import baostock as bs
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    from matplotlib.ticker import MaxNLocator
    import datetime
    
    # 设置中文显示
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "KaiTi", "FangSong"]
    plt.rcParams["axes.unicode_minus"] = False  # 正确显示负号
    
    def get_stock_data(code, start_date, end_date):
        """从baostock获取股票数据"""
        # 登录baostock
        lg = bs.login()
        if lg.error_code != '0':
            print(f"登录失败:{lg.error_msg}")
            return None
        
        # 获取股票数据
        rs = bs.query_history_k_data_plus(
            code,
            "date,open,high,low,close,volume",
            start_date=start_date,
            end_date=end_date,
            frequency="d",
            adjustflag="3"  # 复权类型,3表示不复权
        )
        
        # 处理数据
        data_list = []
        while (rs.error_code == '0') & rs.next():
            data_list.append(rs.get_row_data())
        
        # 登出baostock
        bs.logout()
        
        # 转换为DataFrame并处理
        if not data_list:
            print("没有获取到数据")
            return None
            
        df = pd.DataFrame(data_list, columns=rs.fields)
        # 转换数据类型
        df['date'] = pd.to_datetime(df['date'])
        df['open'] = df['open'].astype(float)
        df['high'] = df['high'].astype(float)
        df['low'] = df['low'].astype(float)
        df['close'] = df['close'].astype(float)
        df['volume'] = df['volume'].astype(float)
        
        df.set_index('date', inplace=True)
        return df
    
    def calculate_moving_averages(data, short_window=5, long_window=20):
        """计算移动平均线"""
        # 计算短期均线(5日均线)
        data['short_ma'] = data.loc[:, 'close'].rolling(window=short_window).mean()
        # 计算长期均线(20日均线)
        data['long_ma'] = data.loc[:, 'close'].rolling(window=long_window).mean()
        return data
    
    def generate_signals(data):
        """生成交易信号:5日均线上穿20日均线买入,下穿卖出"""
        # 初始化信号:0表示无信号,1表示买入,-1表示卖出
        data['signal'] = 0
        
        # 计算均线差,用于判断金叉死叉
        data['ma_diff'] = data['short_ma'] - data['long_ma']
        
        # 创建信号列
        # 金叉:短期均线上穿长期均线(ma_diff从负转正)
        data.loc[(data['ma_diff'] > 0) & (data['ma_diff'].shift(1) <= 0), 'signal'] = 1
        # 死叉:短期均线下穿长期均线(ma_diff从正转负)
        data.loc[(data['ma_diff'] < 0) & (data['ma_diff'].shift(1) >= 0), 'signal'] = -1
        
        return data
    
    def backtest_strategy(data, initial_capital=100000):
        """回测策略"""
        # 初始化资金和持仓,明确使用浮点数类型
        portfolio = pd.DataFrame(index=data.index).fillna(0.0)
        portfolio['cash'] = float(initial_capital)
        portfolio['shares'] = 0  # shares保持为整数
        portfolio['total'] = float(initial_capital)
        
        in_position = False  # 是否持仓
        
        for i in range(1, len(data)):
            date = data.index[i]
            prev_date = data.index[i-1]
            
            # 复制前一天的持仓和资金状态
            portfolio.loc[date, 'cash'] = portfolio.loc[prev_date, 'cash']
            portfolio.loc[date, 'shares'] = portfolio.loc[prev_date, 'shares']
            
            # 检查交易信号
            if data.loc[date, 'signal'] == 1 and not in_position:
                # 买入信号且未持仓,执行买入
                price = data.loc[date, 'close']
                max_shares = int(portfolio.loc[date, 'cash'] / price)
                if max_shares > 0:
                    portfolio.loc[date, 'shares'] = max_shares
                    portfolio.loc[date, 'cash'] -= max_shares * price
                    in_position = True
                    print(f"{date.date()} 发出买入信号,价格: {price:.2f}, 买入 {max_shares} 股")
            
            elif data.loc[date, 'signal'] == -1 and in_position:
                # 卖出信号且持仓,执行卖出
                price = data.loc[date, 'close']
                shares = portfolio.loc[date, 'shares']
                portfolio.loc[date, 'cash'] += shares * price
                portfolio.loc[date, 'shares'] = 0
                in_position = False
                print(f"{date.date()} 发出卖出信号,价格: {price:.2f}, 卖出 {shares} 股")
            
            # 计算总资产
            portfolio.loc[date, 'total'] = portfolio.loc[date, 'cash'] + portfolio.loc[date, 'shares'] * data.loc[date, 'close']
        
        # 将回测结果合并到数据中
        data['portfolio'] = portfolio['total']
        return data
    
    def plot_results(data):
        """绘制结果图表"""
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 14), sharex=True)
        
        # 价格和均线图
        ax1.plot(data.index, data['close'], label='收盘价', linewidth=2)
        ax1.plot(data.index, data['short_ma'], label='5日均线', color='blue', linewidth=1.5)
        ax1.plot(data.index, data['long_ma'], label='20日均线', color='orange', linewidth=1.5)
        ax1.scatter(data.index[data['signal'] == 1], data['close'][data['signal'] == 1], 
                    marker='^', color='g', label='买入信号', zorder=3)
        ax1.scatter(data.index[data['signal'] == -1], data['close'][data['signal'] == -1], 
                    marker='v', color='r', label='卖出信号', zorder=3)
        ax1.set_title('股票价格与均线策略交易信号')
        ax1.set_ylabel('价格 (元)')
        ax1.legend()
        ax1.grid(True)
        
        # 资金曲线
        ax2.plot(data.index, data['portfolio'], label='策略资产', color='b', linewidth=2)
        # 计算买入持有策略的资产
        initial_capital = data['portfolio'].iloc[0]
        buy_hold = initial_capital * (data['close'] / data['close'].iloc[0])
        ax2.plot(data.index, buy_hold, label='买入持有', color='gray', linestyle='--', linewidth=2)
        ax2.set_title('策略表现与买入持有对比')
        ax2.set_xlabel('日期')
        ax2.set_ylabel('资产 (元)')
        ax2.legend()
        ax2.grid(True)
        
        # 设置x轴日期格式
        ax2.xaxis.set_major_locator(mdates.MonthLocator(interval=3))
        ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
        plt.xticks(rotation=45)
        
        plt.tight_layout()
        plt.show()
        
        return fig
    
    def calculate_performance_metrics(data):
        """计算绩效指标"""
        initial_capital = data['portfolio'].iloc[0]
        final_capital = data['portfolio'].iloc[-1]
        
        # 计算策略总收益
        total_return = (final_capital - initial_capital) / initial_capital * 100
        
        # 计算买入持有总收益
        initial_price = data['close'].iloc[0]
        final_price = data['close'].iloc[-1]
        buy_hold_return = (final_price - initial_price) / initial_price * 100
        
        # 计算交易次数
        buy_signals = sum(data['signal'] == 1)
        sell_signals = sum(data['signal'] == -1)
        
        # 计算持有天数
        days_held = len(data)
        
        # 计算年化收益率
        years = days_held / 252  # 假设一年252个交易日
        annualized_return = (pow((final_capital / initial_capital), 1/years) - 1) * 100 if years > 0 else 0
        
        print("\n====== 策略绩效指标 ======")
        print(f"回测时间段: {data.index[0].date()} 至 {data.index[-1].date()}")
        print(f"持有天数: {days_held} 天")
        print(f"初始资金: {initial_capital:.2f} 元")
        print(f"最终资金: {final_capital:.2f} 元")
        print(f"策略总收益率: {total_return:.2f}%")
        print(f"买入持有总收益率: {buy_hold_return:.2f}%")
        print(f"策略年化收益率: {annualized_return:.2f}%")
        print(f"买入信号次数: {buy_signals}")
        print(f"卖出信号次数: {sell_signals}")
        
        return {
            'total_return': total_return,
            'buy_hold_return': buy_hold_return,
            'annualized_return': annualized_return,
            'buy_signals': buy_signals,
            'sell_signals': sell_signals,
            'days_held': days_held
        }
    
    def main():
        # 设置股票代码和日期范围(最近3年数据)
        stock_code = "sh.600938"  # 600938的证券代码
        end_date = datetime.datetime.now().strftime("%Y-%m-%d")
        start_date = (datetime.datetime.now() - datetime.timedelta(days=2*365)).strftime("%Y-%m-%d")
        
        print(f"获取 {stock_code} 从 {start_date} 到 {end_date} 的数据...")
        
        # 获取股票数据
        data = get_stock_data(stock_code, start_date, end_date)
        if data is None or len(data) == 0:
            print("无法获取足够的股票数据进行分析")
            return
        
        # 计算移动平均线
        data = calculate_moving_averages(data, short_window=5, long_window=20)
        
        # 生成交易信号
        data = generate_signals(data)
        
        # 回测策略
        data = backtest_strategy(data)
        
        # 计算并显示绩效指标
        metrics = calculate_performance_metrics(data)
        
        # 绘制结果图表
        plot_results(data)
    
    if __name__ == "__main__":
        main()

    2023-11-28 发出买入信号,价格: 19.23, 买入 5200 股
    2023-12-19 发出卖出信号,价格: 19.71, 卖出 5200 股
    2023-12-20 发出买入信号,价格: 19.84, 买入 5166 股
    2024-01-19 发出卖出信号,价格: 20.50, 卖出 5166 股
    2024-01-25 发出买入信号,价格: 22.32, 买入 4744 股
    2024-03-28 发出卖出信号,价格: 28.15, 卖出 4744 股
    2024-03-29 发出买入信号,价格: 29.23, 买入 4569 股
    2024-04-24 发出卖出信号,价格: 28.68, 卖出 4569 股
    2024-05-06 发出买入信号,价格: 29.22, 买入 4484 股
    2024-05-08 发出卖出信号,价格: 29.06, 卖出 4484 股
    2024-05-29 发出买入信号,价格: 30.44, 买入 4281 股
    2024-07-17 发出卖出信号,价格: 32.25, 卖出 4281 股
    2024-08-19 发出买入信号,价格: 29.00, 买入 4761 股
    2024-08-23 发出卖出信号,价格: 27.93, 卖出 4761 股
    2024-08-27 发出买入信号,价格: 29.39, 买入 4524 股
    2024-09-04 发出卖出信号,价格: 26.75, 卖出 4524 股
    2024-09-25 发出买入信号,价格: 28.37, 买入 4266 股
    2024-10-23 发出卖出信号,价格: 28.28, 卖出 4266 股
    2024-12-04 发出买入信号,价格: 27.49, 买入 4389 股
    2025-01-15 发出卖出信号,价格: 28.72, 卖出 4389 股
    2025-01-16 发出买入信号,价格: 29.45, 买入 4280 股
    2025-01-21 发出卖出信号,价格: 27.74, 卖出 4280 股
    2025-03-18 发出买入信号,价格: 25.66, 买入 4627 股
    2025-04-07 发出卖出信号,价格: 23.23, 卖出 4627 股
    2025-04-25 发出买入信号,价格: 25.29, 买入 4250 股
    2025-06-26 发出卖出信号,价格: 26.37, 卖出 4250 股
    2025-07-30 发出买入信号,价格: 26.49, 买入 4231 股
    2025-08-06 发出卖出信号,价格: 26.01, 卖出 4231 股
    2025-08-08 发出买入信号,价格: 26.21, 买入 4198 股
    2025-08-14 发出卖出信号,价格: 25.80, 卖出 4198 股
    2025-09-03 发出买入信号,价格: 26.24, 买入 4128 股

    ====== 策略绩效指标 ======
    回测时间段: 2023-10-16 至 2025-10-14
    持有天数: 484 天
    初始资金: 100000.00 元
    最终资金: 109489.57 元
    策略总收益率: 9.49%
    买入持有总收益率: 27.07%
    策略年化收益率: 4.83%
    买入信号次数: 16
    卖出信号次数: 15