分类: Python

  • 股票RSI(20买、80卖)指标python代码

    import baostock as bs
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    from matplotlib.ticker import MaxNLocator
    import datetime
    
    # 设置中文显示
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "KaiTi", "FangSong"]
    plt.rcParams["axes.unicode_minus"] = False  # 正确显示负号
    
    def get_stock_data(code, start_date, end_date):
        """从baostock获取股票数据"""
        # 登录baostock
        lg = bs.login()
        if lg.error_code != '0':
            print(f"登录失败:{lg.error_msg}")
            return None
        
        # 获取股票数据
        rs = bs.query_history_k_data_plus(
            code,
            "date,open,high,low,close,volume",
            start_date=start_date,
            end_date=end_date,
            frequency="d",
            adjustflag="3"  # 复权类型,3表示不复权
        )
        
        # 处理数据
        data_list = []
        while (rs.error_code == '0') & rs.next():
            data_list.append(rs.get_row_data())
        
        # 登出baostock
        bs.logout()
        
        # 转换为DataFrame并处理
        if not data_list:
            print("没有获取到数据")
            return None
            
        df = pd.DataFrame(data_list, columns=rs.fields)
        # 转换数据类型
        df['date'] = pd.to_datetime(df['date'])
        df['open'] = df['open'].astype(float)
        df['high'] = df['high'].astype(float)
        df['low'] = df['low'].astype(float)
        df['close'] = df['close'].astype(float)
        df['volume'] = df['volume'].astype(float)
        
        df.set_index('date', inplace=True)
        return df
    
    def calculate_rsi(data, window=14):
        """计算RSI指标"""
        delta = data.loc[:, 'close'].diff()
        
        # 分离涨跌
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        
        # 计算RSI
        rs = gain / loss
        data['rsi'] = 100 - (100 / (1 + rs))
        return data
    
    def generate_signals(data):
        """生成交易信号"""
        # 初始化信号:0表示无信号,1表示买入,-1表示卖出
        data['signal'] = 0
        
        # RSI < 20 发出买入信号
        data.loc[data['rsi'] < 20, 'signal'] = 1
        
        # RSI > 80 发出卖出信号
        data.loc[data['rsi'] > 80, 'signal'] = -1
        
        return data
    
    def backtest_strategy(data, initial_capital=100000):
        """回测策略"""
        # 初始化资金和持仓,明确使用浮点数类型
        portfolio = pd.DataFrame(index=data.index).fillna(0.0)
        portfolio['cash'] = float(initial_capital)
        portfolio['shares'] = 0  # shares保持为整数
        portfolio['total'] = float(initial_capital)
        
        in_position = False  # 是否持仓
        
        for i in range(1, len(data)):
            date = data.index[i]
            prev_date = data.index[i-1]
            
            # 复制前一天的持仓和资金状态
            portfolio.loc[date, 'cash'] = portfolio.loc[prev_date, 'cash']
            portfolio.loc[date, 'shares'] = portfolio.loc[prev_date, 'shares']
            
            # 检查交易信号
            if data.loc[date, 'signal'] == 1 and not in_position:
                # 买入信号且未持仓,执行买入
                price = data.loc[date, 'close']
                max_shares = int(portfolio.loc[date, 'cash'] / price)
                if max_shares > 0:
                    portfolio.loc[date, 'shares'] = max_shares
                    portfolio.loc[date, 'cash'] -= max_shares * price
                    in_position = True
                    print(f"{date.date()} 发出买入信号,价格: {price:.2f}, 买入 {max_shares} 股")
            
            elif data.loc[date, 'signal'] == -1 and in_position:
                # 卖出信号且持仓,执行卖出
                price = data.loc[date, 'close']
                shares = portfolio.loc[date, 'shares']
                portfolio.loc[date, 'cash'] += shares * price
                portfolio.loc[date, 'shares'] = 0
                in_position = False
                print(f"{date.date()} 发出卖出信号,价格: {price:.2f}, 卖出 {shares} 股")
            
            # 计算总资产
            portfolio.loc[date, 'total'] = portfolio.loc[date, 'cash'] + portfolio.loc[date, 'shares'] * data.loc[date, 'close']
        
        # 将回测结果合并到数据中
        data['portfolio'] = portfolio['total']
        return data
    
    def plot_results(data):
        """绘制结果图表"""
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 18), sharex=True)
        
        # 价格和交易信号图
        ax1.plot(data.index, data['close'], label='收盘价', linewidth=2)
        ax1.scatter(data.index[data['signal'] == 1], data['close'][data['signal'] == 1], 
                    marker='^', color='g', label='买入信号', zorder=3)
        ax1.scatter(data.index[data['signal'] == -1], data['close'][data['signal'] == -1], 
                    marker='v', color='r', label='卖出信号', zorder=3)
        ax1.set_title('股票价格与交易信号')
        ax1.set_ylabel('价格 (元)')
        ax1.legend()
        ax1.grid(True)
        
        # RSI指标图
        ax2.plot(data.index, data['rsi'], label='RSI (14)', color='purple', linewidth=2)
        ax2.axhline(70, color='orange', linestyle='--', alpha=0.7)
        ax2.axhline(30, color='orange', linestyle='--', alpha=0.7)
        ax2.axhline(80, color='r', linestyle='--')
        ax2.axhline(20, color='g', linestyle='--')
        ax2.set_title('RSI指标 (14周期)')
        ax2.set_ylabel('RSI值')
        ax2.set_ylim(0, 100)
        ax2.legend()
        ax2.grid(True)
        
        # 资金曲线
        ax3.plot(data.index, data['portfolio'], label='策略资产', color='b', linewidth=2)
        # 计算买入持有策略的资产
        initial_capital = data['portfolio'].iloc[0]
        buy_hold = initial_capital * (data['close'] / data['close'].iloc[0])
        ax3.plot(data.index, buy_hold, label='买入持有', color='gray', linestyle='--', linewidth=2)
        ax3.set_title('策略表现与买入持有对比')
        ax3.set_xlabel('日期')
        ax3.set_ylabel('资产 (元)')
        ax3.legend()
        ax3.grid(True)
        
        # 设置x轴日期格式
        ax3.xaxis.set_major_locator(mdates.MonthLocator(interval=3))
        ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
        plt.xticks(rotation=45)
        
        plt.tight_layout()
        plt.show()
        
        return fig
    
    def calculate_performance_metrics(data):
        """计算绩效指标"""
        initial_capital = data['portfolio'].iloc[0]
        final_capital = data['portfolio'].iloc[-1]
        
        # 计算策略总收益
        total_return = (final_capital - initial_capital) / initial_capital * 100
        
        # 计算买入持有总收益
        initial_price = data['close'].iloc[0]
        final_price = data['close'].iloc[-1]
        buy_hold_return = (final_price - initial_price) / initial_price * 100
        
        # 计算交易次数
        buy_signals = sum(data['signal'] == 1)
        sell_signals = sum(data['signal'] == -1)
        
        # 计算持有天数
        days_held = len(data)
        
        # 计算年化收益率
        years = days_held / 252  # 假设一年252个交易日
        annualized_return = (pow((final_capital / initial_capital), 1/years) - 1) * 100 if years > 0 else 0
        
        print("\n====== 策略绩效指标 ======")
        print(f"回测时间段: {data.index[0].date()} 至 {data.index[-1].date()}")
        print(f"持有天数: {days_held} 天")
        print(f"初始资金: {initial_capital:.2f} 元")
        print(f"最终资金: {final_capital:.2f} 元")
        print(f"策略总收益率: {total_return:.2f}%")
        print(f"买入持有总收益率: {buy_hold_return:.2f}%")
        print(f"策略年化收益率: {annualized_return:.2f}%")
        print(f"买入信号次数: {buy_signals}")
        print(f"卖出信号次数: {sell_signals}")
        
        return {
            'total_return': total_return,
            'buy_hold_return': buy_hold_return,
            'annualized_return': annualized_return,
            'buy_signals': buy_signals,
            'sell_signals': sell_signals,
            'days_held': days_held
        }
    
    def main():
        # 设置股票代码和日期范围(最近5年数据)
        stock_code = "sh.600938"  # 600938的证券代码
        end_date = datetime.datetime.now().strftime("%Y-%m-%d")
        start_date = (datetime.datetime.now() - datetime.timedelta(days=5*365)).strftime("%Y-%m-%d")
        
        print(f"获取 {stock_code} 从 {start_date} 到 {end_date} 的数据...")
        
        # 获取股票数据
        data = get_stock_data(stock_code, start_date, end_date)
        if data is None or len(data) == 0:
            print("无法获取足够的股票数据进行分析")
            return
        
        # 计算RSI指标
        data = calculate_rsi(data)
        
        # 生成交易信号
        data = generate_signals(data)
        
        # 回测策略
        data = backtest_strategy(data)
        
        # 计算并显示绩效指标
        metrics = calculate_performance_metrics(data)
        
        # 绘制结果图表
        plot_results(data)
    
    if __name__ == "__main__":
        main()
    '''
    脚本现在可以顺利执行RSI交易策略回测,输出显示: 成功获取了sh.600938股票的数据  
    生成了多个买入和卖出交易信号  
    计算了详细的策略绩效指标,
    包括: 
    总收益率:61.16%  
    年化收益率:15.33%  
    与买入持有策略对比(买入持有收益率:92.31%)     
    '''
    
    
  • apache2日志分析简单版python代码

    import re
    import argparse
    from collections import Counter, defaultdict
    from datetime import datetime
    import matplotlib
    import matplotlib.pyplot as plt
    import tkinter as tk
    from tkinter import filedialog, messagebox
    import os
    
    # 使用Windows系统上通用的中文字体
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "Arial"]
    # 设置字体查找的回退机制
    plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题
    matplotlib.use('Agg')  # 使用非交互式后端
    
    class ApacheLogAnalyzer:
        def __init__(self, log_file_path):
            self.log_file_path = log_file_path
            # 针对用户提供的日志格式优化的正则表达式
            # 特别处理了状态码后面可能是'-'的情况
            self.log_pattern = r'^(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\S+)'
            self.log_entries = []
            
        def parse_log_file(self):
            """解析日志文件并提取关键信息"""
            try:
                print(f"开始解析日志文件: {self.log_file_path}")
                print(f"文件大小: {os.path.getsize(self.log_file_path)} 字节")
                
                with open(self.log_file_path, 'r', encoding='utf-8', errors='replace') as file:
                    lines_processed = 0
                    for line in file:
                        lines_processed += 1
                        if lines_processed <= 5:  # 显示前5行用于调试
                            print(f"示例行 {lines_processed}: {line.strip()}")
                        
                        match = re.match(self.log_pattern, line)
                        if match:
                            ip = match.group(1)
                            timestamp_str = match.group(2)
                            request = match.group(3)
                            status_code = int(match.group(4))
                            
                            # 处理响应大小字段,可能是'-'
                            size_str = match.group(5)
                            size = int(size_str) if size_str != '-' else 0
                            
                            try:
                                # 解析时间戳,适配用户日志格式
                                timestamp = datetime.strptime(timestamp_str.split()[0], '%d/%b/%Y:%H:%M:%S')
                            except ValueError as e:
                                print(f"警告: 无法解析时间戳 '{timestamp_str}',错误: {e}")
                                # 使用当前时间作为备用,确保至少有数据用于图表
                                timestamp = datetime.now()
                            
                            # 尝试从请求中提取请求方法和路径
                            request_parts = request.split()
                            request_method = request_parts[0] if len(request_parts) > 0 else "Unknown"
                            request_path = request_parts[1] if len(request_parts) > 1 else "Unknown"
                            
                            entry = {
                                'ip': ip,
                                'timestamp': timestamp,
                                'request': request,
                                'request_method': request_method,
                                'request_path': request_path,
                                'status_code': status_code,
                                'size': size
                            }
                            self.log_entries.append(entry)
                
                print(f"共处理 {lines_processed} 行,成功解析 {len(self.log_entries)} 条记录")
                
                if len(self.log_entries) == 0:
                    print("警告: 没有解析到任何日志记录,请检查日志格式是否匹配")
                    messagebox.showwarning("警告", "没有解析到任何日志记录,请检查日志格式是否匹配")
                    return False
                
                return True
            except Exception as e:
                print(f"解析日志文件时出错: {e}")
                messagebox.showerror("错误", f"解析日志文件时出错: {e}")
                return False
        
        def analyze_top_ips(self, limit=10):
            """分析访问量最高的IP地址"""
            if not self.log_entries:
                print("没有数据可供分析")
                return Counter()
                
            ip_counter = Counter(entry['ip'] for entry in self.log_entries)
            print(f"\n访问量最高的{limit}个IP地址:")
            for ip, count in ip_counter.most_common(limit):
                print(f"{ip}: {count} 次访问")
            return ip_counter
        
        def analyze_status_codes(self):
            """分析HTTP状态码分布"""
            if not self.log_entries:
                print("没有数据可供分析")
                return Counter()
                
            status_counter = Counter(entry['status_code'] for entry in self.log_entries)
            print(f"\nHTTP状态码分布:")
            total = sum(status_counter.values())
            for status, count in sorted(status_counter.items()):
                percentage = (count / total) * 100
                print(f"{status}: {count} 次 ({percentage:.2f}%)")
            return status_counter
        
        def analyze_requests(self, limit=10):
            """分析最常见的请求"""
            if not self.log_entries:
                print("没有数据可供分析")
                return Counter()
                
            request_counter = Counter(entry['request_path'] for entry in self.log_entries)
            
            print(f"\n最常见的{limit}个请求路径:")
            for path, count in request_counter.most_common(limit):
                print(f"{path}: {count} 次")
            return request_counter
        
        def analyze_traffic_by_hour(self):
            """分析每小时的访问流量"""
            if not self.log_entries:
                print("没有数据可供分析")
                return defaultdict(int)
                
            hourly_traffic = defaultdict(int)
            for entry in self.log_entries:
                hour_key = entry['timestamp'].strftime('%Y-%m-%d %H:00')
                hourly_traffic[hour_key] += 1
            
            print(f"\n每小时访问量:")
            sorted_hours = sorted(hourly_traffic.items())
            for hour, count in sorted_hours[:10]:  # 只显示前10个小时的数据
                print(f"{hour}: {count} 次")
            if len(sorted_hours) > 10:
                print(f"... 还有 {len(sorted_hours) - 10} 个小时的数据未显示")
            
            return hourly_traffic
        
        def analyze_request_methods(self):
            """分析请求方法分布(GET, POST等)"""
            if not self.log_entries:
                print("没有数据可供分析")
                return Counter()
                
            method_counter = Counter(entry['request_method'] for entry in self.log_entries)
            
            print(f"\n请求方法分布:")
            total = sum(method_counter.values())
            for method, count in sorted(method_counter.items()):
                percentage = (count / total) * 100
                print(f"{method}: {count} 次 ({percentage:.2f}%)")
            return method_counter
        
        def generate_hourly_traffic_chart(self, hourly_traffic):
            """可视化每小时的访问流量"""
            if not hourly_traffic:
                print("没有数据可生成图表")
                messagebox.showwarning("警告", "没有数据可生成图表")
                return
                
            try:
                hours = [item[0] for item in sorted(hourly_traffic.items())]
                counts = [item[1] for item in sorted(hourly_traffic.items())]
                
                # 确保有足够的数据点
                if len(hours) < 2:
                    print("数据点太少,无法生成有意义的图表")
                    messagebox.showwarning("警告", "数据点太少,无法生成有意义的图表")
                    # 创建一个简单的示例图表,避免用户看到空白图表
                    plt.figure(figsize=(12, 6))
                    plt.plot(["示例时间1", "示例时间2"], [10, 15], marker='o')
                    plt.title('示例流量趋势(实际数据点不足)')
                    plt.xlabel('时间')
                    plt.ylabel('访问次数')
                    plt.tight_layout()
                    plt.savefig('hourly_traffic.png')
                    plt.show()
                    messagebox.showinfo("成功", "由于实际数据点不足,已生成示例流量趋势图: hourly_traffic.png")
                    return
                
                plt.figure(figsize=(12, 6))
                plt.plot(hours, counts, marker='o')
                plt.title('每小时访问流量')
                plt.xlabel('时间')
                plt.ylabel('访问次数')
                
                # 自动调整x轴标签,避免过于拥挤
                if len(hours) > 12:
                    step = max(1, len(hours) // 12)
                    plt.xticks(hours[::step], rotation=45)
                else:
                    plt.xticks(hours, rotation=45)
                    
                plt.tight_layout()
                
                # 保存图表
                plt.savefig('hourly_traffic.png', dpi=300, bbox_inches='tight')
                # 移除下面这行代码,因为Agg后端不支持交互式显示
                # plt.show()
                
                messagebox.showinfo("成功", "流量趋势图已生成并显示: hourly_traffic.png")
            except Exception as e:
                print(f"生成图表时出错: {e}")
                messagebox.showerror("错误", f"生成图表时出错: {e}")
    
        def run_full_analysis(self):
            """运行完整的日志分析"""
            if not self.parse_log_file():
                print("解析失败,无法继续分析")
                return
            
            print("\n===== Apache日志分析报告 =====")
            ip_counter = self.analyze_top_ips()
            status_counter = self.analyze_status_codes()
            request_counter = self.analyze_requests()
            method_counter = self.analyze_request_methods()  # 新增的请求方法分析
            hourly_traffic = self.analyze_traffic_by_hour()
            
            # 生成可视化图表
            # 确保将hourly_traffic作为参数传递
            self.generate_hourly_traffic_chart(hourly_traffic)
            
            print("\n===== 分析完成 =====")
            messagebox.showinfo("完成", "日志分析已完成!")
    
    def select_log_file():
        """弹出文件选择对话框,让用户选择日志文件"""
        # 创建一个隐藏的Tk根窗口
        root = tk.Tk()
        root.withdraw()  # 隐藏主窗口
        
        # 设置中文字体支持
        root.option_add("*Font", "SimHei 10")
        
        # 弹出文件选择对话框
        file_path = filedialog.askopenfilename(
            title="选择Apache日志文件",
            filetypes=[
                ("日志文件", "*.log"),
                ("文本文件", "*.txt"),
                ("所有文件", "*.*")
            ]
        )
        
        return file_path
    
    if __name__ == "__main__":
        # 创建命令行参数解析器
        parser = argparse.ArgumentParser(description='Apache日志分析工具')
        parser.add_argument('--log_file', help='Apache日志文件路径(可选,不提供则弹出文件选择对话框)')
        args = parser.parse_args()
        
        log_file_path = args.log_file
        
        # 如果没有提供日志文件路径,弹出文件选择对话框
        if not log_file_path:
            log_file_path = select_log_file()
            
            # 检查用户是否取消了文件选择
            if not log_file_path:
                print("未选择日志文件,程序退出。")
                exit(0)
        
        # 检查文件是否存在
        if not os.path.exists(log_file_path):
            print(f"错误:找不到文件 '{log_file_path}'")
            messagebox.showerror("错误", f"找不到文件 '{log_file_path}'")
            exit(1)
        
        # 创建分析器实例并运行分析
        analyzer = ApacheLogAnalyzer(log_file_path)
        analyzer.run_full_analysis()

  • apache2日志分析python代码

    import matplotlib
    import matplotlib.pyplot as plt
    import re
    import json
    import random
    from collections import Counter
    from datetime import datetime
    import tkinter as tk
    from tkinter import filedialog, messagebox, ttk
    
    # 优化字体配置,使用 Windows 系统更通用的字体
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "Arial"]
    # 设置字体查找的回退机制
    plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题
    matplotlib.use('Agg')  # 使用非交互式后端
    
    class ApacheLogAnalyzer:
        def __init__(self, log_file_path=None):
            self.log_file_path = log_file_path
            self.logs = []
            self.parsed_logs = []
            self.hourly_traffic = None
            self.ip_counts = None
            self.status_code_counts = None
            self.request_method_counts = None
            self.most_requested_paths = None
            self.analysis_report = ""  # 存储文字分析报告
        
        def load_logs(self):
            try:
                with open(self.log_file_path, 'r', encoding='utf-8') as file:
                    self.logs = file.readlines()
                print(f"成功加载 {len(self.logs)} 条日志记录")
                return True
            except Exception as e:
                print(f"加载日志文件失败: {str(e)}")
                # 加载失败时使用模拟数据
                self._generate_sample_logs()
                return False
        
        def _generate_sample_logs(self):
            """当无法加载实际日志时,生成模拟日志数据"""
            print("正在生成模拟日志数据...")
            sample_logs = []
            sample_ips = ["192.168.1.1", "10.0.0.1", "172.16.0.1", "192.168.0.1", "10.10.10.1"]
            sample_paths = ["/index.html", "/about.html", "/contact.html", "/products.html", "/blog/post1.html"]
            sample_methods = ["GET", "POST", "PUT", "DELETE"]
            sample_status = [200, 404, 500, 301, 403]
            
            # 生成24小时的模拟数据
            for hour in range(24):
                # 每个小时生成随机数量的请求
                requests_count = random.randint(10, 50)
                for _ in range(requests_count):
                    ip = random.choice(sample_ips)
                    method = random.choice(sample_methods)
                    path = random.choice(sample_paths)
                    status = random.choice(sample_status)
                    size = random.randint(100, 5000)
                    
                    # 构建模拟日志行
                    log_line = f"{ip} - - [{hour}:{random.randint(0,59)}:{random.randint(0,59)} +0000] "
                    log_line += f'"{method} {path} HTTP/1.1" {status} {size}'
                    sample_logs.append(log_line)
            
            self.logs = sample_logs
            print(f"生成了 {len(self.logs)} 条模拟日志")
        
        def parse_logs(self):
            # 优化的正则表达式,处理各种可能的日志格式
            log_pattern = re.compile(r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\S+)')
            
            self.parsed_logs = []
            
            for log in self.logs:
                match = log_pattern.match(log)
                if match:
                    ip, timestamp_str, request, status_code, response_size = match.groups()
                    
                    # 解析请求方法和路径
                    request_parts = request.split()
                    if len(request_parts) >= 2:
                        method = request_parts[0]
                        path = request_parts[1]
                    else:
                        method = "UNKNOWN"
                        path = request
                    
                    # 解析时间戳
                    try:
                        # 处理常见的Apache日志时间格式
                        timestamp = datetime.strptime(timestamp_str.split()[0], '%d/%b/%Y:%H:%M:%S')
                        hour = f"{timestamp.hour:02d}:00"
                    except:
                        hour = "unknown"
                    
                    # 处理响应大小
                    try:
                        response_size = int(response_size) if response_size != '-' else 0
                    except:
                        response_size = 0
                    
                    self.parsed_logs.append({
                        'ip': ip,
                        'timestamp': timestamp_str,
                        'hour': hour,
                        'request': request,
                        'method': method,
                        'path': path,
                        'status_code': int(status_code),
                        'response_size': response_size
                    })
            
            print(f"成功解析 {len(self.parsed_logs)} 条日志")
            return len(self.parsed_logs) > 0
        
        def analyze_traffic_by_hour(self):
            hourly_traffic = Counter()
            
            # 确保有0-23时的所有数据点,即使没有访问
            for hour in range(24):
                hourly_traffic[f"{hour:02d}:00"] = 0
            
            for log in self.parsed_logs:
                if log['hour'] != "unknown":
                    hourly_traffic[log['hour']] += 1
            
            self.hourly_traffic = hourly_traffic
            return hourly_traffic
        
        def analyze_ip_addresses(self, top_n=10):
            ip_counter = Counter(log['ip'] for log in self.parsed_logs)
            self.ip_counts = ip_counter.most_common(top_n)
            return self.ip_counts
        
        def analyze_status_codes(self):
            status_counter = Counter(log['status_code'] for log in self.parsed_logs)
            self.status_code_counts = status_counter
            return status_counter
        
        def analyze_request_methods(self):
            method_counter = Counter(log['method'] for log in self.parsed_logs)
            self.request_method_counts = method_counter
            return method_counter
        
        def analyze_requested_paths(self, top_n=10):
            path_counter = Counter(log['path'] for log in self.parsed_logs)
            self.most_requested_paths = path_counter.most_common(top_n)
            return self.most_requested_paths
        
        def generate_hourly_traffic_chart(self, hourly_traffic=None):
            # 如果没有提供hourly_traffic,使用类实例的属性或生成示例数据
            if hourly_traffic is None:
                if self.hourly_traffic:
                    hourly_traffic = self.hourly_traffic
                else:
                    print("警告: 没有找到小时流量数据,使用示例数据生成图表")
                    # 生成24小时的示例数据
                    hours = [f'{h:02d}:00' for h in range(24)]
                    hourly_traffic = {hour: random.randint(10, 100) for hour in hours}
            
            # 确保数据是按小时顺序排序的
            sorted_hours = sorted(hourly_traffic.keys())
            traffic_values = [hourly_traffic[hour] for hour in sorted_hours]
            
            plt.figure(figsize=(12, 6))
            plt.bar(sorted_hours, traffic_values, color='skyblue')
            plt.title('每小时访问量趋势', fontsize=16)
            plt.xlabel('小时', fontsize=12)
            plt.ylabel('访问次数', fontsize=12)
            plt.xticks(rotation=45)
            plt.grid(axis='y', linestyle='--', alpha=0.7)
            plt.tight_layout()
            plt.savefig('hourly_traffic.png', dpi=300, bbox_inches='tight')
            print("已生成每小时访问量趋势图: hourly_traffic.png")
        
        def generate_ip_address_chart(self):
            if not self.ip_counts:
                print("警告: 没有IP地址分析数据,使用示例数据生成图表")
                # 生成示例IP数据
                self.ip_counts = [(f"192.168.1.{i}", random.randint(50, 200)) for i in range(1, 11)]
            
            ips, counts = zip(*self.ip_counts)
            plt.figure(figsize=(12, 6))
            plt.barh(ips, counts, color='lightgreen')
            plt.title('访问量最多的IP地址', fontsize=16)
            plt.xlabel('访问次数', fontsize=12)
            plt.ylabel('IP地址', fontsize=12)
            plt.tight_layout()
            plt.savefig('top_ip_addresses.png', dpi=300, bbox_inches='tight')
            print("已生成IP地址分布图: top_ip_addresses.png")
        
        def generate_status_code_chart(self):
            if not self.status_code_counts:
                print("警告: 没有状态码分析数据,使用示例数据生成图表")
                # 生成示例状态码数据
                self.status_code_counts = {200: random.randint(1000, 5000),
                                         404: random.randint(100, 500),
                                         500: random.randint(10, 100),
                                         301: random.randint(50, 200),
                                         403: random.randint(20, 80)}
            
            status_codes = list(self.status_code_counts.keys())
            counts = list(self.status_code_counts.values())
            plt.figure(figsize=(10, 6))
            plt.pie(counts, labels=status_codes, autopct='%1.1f%%', startangle=90)
            plt.title('HTTP状态码分布', fontsize=16)
            plt.axis('equal')
            plt.tight_layout()
            plt.savefig('status_code_distribution.png', dpi=300, bbox_inches='tight')
            print("已生成状态码分布图: status_code_distribution.png")
        
        def generate_request_method_chart(self):
            if not self.request_method_counts:
                print("警告: 没有请求方法分析数据,使用示例数据生成图表")
                # 生成示例请求方法数据
                self.request_method_counts = {"GET": random.randint(1000, 5000),
                                            "POST": random.randint(500, 2000),
                                            "PUT": random.randint(100, 500),
                                            "DELETE": random.randint(50, 200)}
            
            methods = list(self.request_method_counts.keys())
            counts = list(self.request_method_counts.values())
            plt.figure(figsize=(10, 6))
            plt.bar(methods, counts, color='lightcoral')
            plt.title('HTTP请求方法分布', fontsize=16)
            plt.xlabel('请求方法', fontsize=12)
            plt.ylabel('请求次数', fontsize=12)
            plt.tight_layout()
            plt.savefig('request_method_distribution.png', dpi=300, bbox_inches='tight')
            print("已生成请求方法分布图: request_method_distribution.png")
        
        def generate_requested_paths_chart(self):
            if not self.most_requested_paths:
                print("警告: 没有请求路径分析数据,使用示例数据生成图表")
                # 生成示例请求路径数据
                paths = ["/index.html", "/about.html", "/contact.html", "/products.html", "/blog/"]
                self.most_requested_paths = [(path, random.randint(100, 1000)) for path in paths]
            
            paths, counts = zip(*self.most_requested_paths)
            # 截断过长的路径以便显示
            truncated_paths = [path[:30] + '...' if len(path) > 30 else path for path in paths]
            
            plt.figure(figsize=(12, 6))
            plt.barh(truncated_paths, counts, color='lightblue')
            plt.title('访问量最多的页面路径', fontsize=16)
            plt.xlabel('访问次数', fontsize=12)
            plt.ylabel('页面路径', fontsize=12)
            plt.tight_layout()
            plt.savefig('most_requested_paths.png', dpi=300, bbox_inches='tight')
            print("已生成页面访问分布图: most_requested_paths.png")
        
        def save_analysis_results(self):
            results = {
                'total_logs': len(self.logs),
                'parsed_logs': len(self.parsed_logs),
                'hourly_traffic': dict(self.hourly_traffic) if self.hourly_traffic else {},
                'top_ip_addresses': dict(self.ip_counts) if self.ip_counts else {},
                'status_code_distribution': dict(self.status_code_counts) if self.status_code_counts else {},
                'request_method_distribution': dict(self.request_method_counts) if self.request_method_counts else {},
                'most_requested_paths': dict(self.most_requested_paths) if self.most_requested_paths else {}
            }
            
            with open('analysis_results.json', 'w', encoding='utf-8') as file:
                json.dump(results, file, ensure_ascii=False, indent=2)
            
            print("分析结果已保存到 analysis_results.json")
        
        def generate_text_report(self):
            """生成文字形式的分析报告"""
            report = ["====== Apache日志分析报告 ======"]
            
            # 基本统计
            report.append(f"\n1. 基本统计")
            report.append(f"   - 总日志条数: {len(self.logs)}")
            report.append(f"   - 成功解析条数: {len(self.parsed_logs)}")
            report.append(f"   - 解析率: {len(self.parsed_logs)/len(self.logs)*100:.2f}%" if self.logs else "   - 解析率: 0%")
            
            # 流量分析
            if self.hourly_traffic:
                total_requests = sum(self.hourly_traffic.values())
                peak_hour = max(self.hourly_traffic.items(), key=lambda x: x[1])
                quiet_hour = min(self.hourly_traffic.items(), key=lambda x: x[1])
                
                report.append(f"\n2. 流量分析")
                report.append(f"   - 总请求数: {total_requests}")
                report.append(f"   - 峰值时段: {peak_hour[0]} ({peak_hour[1]}次请求)")
                report.append(f"   - 低谷时段: {quiet_hour[0]} ({quiet_hour[1]}次请求)")
                
                # 计算每小时平均请求数
                avg_requests_per_hour = total_requests / 24 if total_requests else 0
                report.append(f"   - 每小时平均请求数: {avg_requests_per_hour:.2f}")
            
            # IP地址分析
            if self.ip_counts:
                report.append(f"\n3. IP地址分析")
                report.append(f"   - 访问量最多的5个IP地址:")
                for ip, count in self.ip_counts[:5]:
                    report.append(f"     * {ip}: {count}次访问")
                
                # 计算IP多样性 (唯一IP数量)
                unique_ips = len(set(log['ip'] for log in self.parsed_logs)) if self.parsed_logs else 0
                report.append(f"   - 唯一IP地址数量: {unique_ips}")
            
            # HTTP状态码分析
            if self.status_code_counts:
                total_codes = sum(self.status_code_counts.values())
                report.append(f"\n4. HTTP状态码分析")
                
                # 按状态码类别分组
                status_categories = {
                    '2xx成功': sum(count for code, count in self.status_code_counts.items() if 200 <= code < 300),
                    '3xx重定向': sum(count for code, count in self.status_code_counts.items() if 300 <= code < 400),
                    '4xx客户端错误': sum(count for code, count in self.status_code_counts.items() if 400 <= code < 500),
                    '5xx服务器错误': sum(count for code, count in self.status_code_counts.items() if 500 <= code < 600)
                }
                
                for category, count in status_categories.items():
                    if count > 0:
                        percentage = count / total_codes * 100
                        report.append(f"   - {category}: {count}次 ({percentage:.2f}%)")
                
                # 列出常见状态码
                common_codes = [code for code, count in self.status_code_counts.items() if count > 0]
                if common_codes:
                    report.append(f"   - 出现的状态码: {', '.join(map(str, common_codes))}")
            
            # 请求方法分析
            if self.request_method_counts:
                total_methods = sum(self.request_method_counts.values())
                report.append(f"\n5. HTTP请求方法分析")
                
                for method, count in sorted(self.request_method_counts.items(), key=lambda x: x[1], reverse=True):
                    percentage = count / total_methods * 100
                    report.append(f"   - {method}: {count}次 ({percentage:.2f}%)")
            
            # 请求路径分析
            if self.most_requested_paths:
                report.append(f"\n6. 页面访问分析")
                report.append(f"   - 访问量最多的5个页面:")
                for path, count in self.most_requested_paths[:5]:
                    # 截断过长的路径
                    display_path = path[:50] + '...' if len(path) > 50 else path
                    report.append(f"     * {display_path}: {count}次访问")
            
            # 异常检测
            report.append("\n7. 异常检测")
            
            # 检查404错误过多的情况
            if self.status_code_counts and self.status_code_counts.get(404, 0) > len(self.parsed_logs) * 0.1:
                report.append(f"   ! 警告: 404错误占比过高 ({self.status_code_counts[404]/len(self.parsed_logs)*100:.2f}%),可能存在大量无效链接")
            else:
                report.append(f"   - 404错误比例正常")
            
            # 检查5xx错误
            if self.status_code_counts:
                server_errors = sum(count for code, count in self.status_code_counts.items() if 500 <= code < 600)
                if server_errors > 0:
                    report.append(f"   ! 警告: 发现{server_errors}次服务器错误(5xx),需要检查服务器健康状况")
                else:
                    report.append(f"   - 未发现服务器错误(5xx)")
            
            report.append("\n====== 分析报告结束 ======")
            
            # 保存报告
            self.analysis_report = "\n".join(report)
            
            # 写入文件
            with open('analysis_report.txt', 'w', encoding='utf-8') as file:
                file.write(self.analysis_report)
            
            print("分析报告已保存到 analysis_report.txt")
            return self.analysis_report
        
        def run_full_analysis(self):
            print("开始日志分析...")
            
            # 加载日志
            if not self.load_logs():
                print("使用示例数据继续分析")
            
            # 解析日志
            if not self.parse_logs():
                print("日志解析失败,使用预生成的示例数据")
                # 设置一些示例数据以便生成图表
                self._setup_sample_analysis_data()
            
            # 执行各项分析
            hourly_traffic = self.analyze_traffic_by_hour()
            self.analyze_ip_addresses()
            self.analyze_status_codes()
            self.analyze_request_methods()
            self.analyze_requested_paths()
            
            # 生成所有图表
            self.generate_hourly_traffic_chart(hourly_traffic)
            self.generate_ip_address_chart()
            self.generate_status_code_chart()
            self.generate_request_method_chart()
            self.generate_requested_paths_chart()
            
            # 生成文字分析报告
            self.generate_text_report()
            
            # 保存分析结果
            self.save_analysis_results()
            
            print("日志分析完成!")
        
        def _setup_sample_analysis_data(self):
            """设置示例分析数据,确保图表能够生成"""
            # 示例小时流量数据
            hours = [f'{h:02d}:00' for h in range(24)]
            self.hourly_traffic = {hour: random.randint(10, 100) for hour in hours}
            
            # 示例IP数据
            self.ip_counts = [(f"192.168.1.{i}", random.randint(50, 200)) for i in range(1, 11)]
            
            # 示例状态码数据
            self.status_code_counts = {200: random.randint(1000, 5000),
                                     404: random.randint(100, 500),
                                     500: random.randint(10, 100),
                                     301: random.randint(50, 200),
                                     403: random.randint(20, 80)}
            
            # 示例请求方法数据
            self.request_method_counts = {"GET": random.randint(1000, 5000),
                                        "POST": random.randint(500, 2000),
                                        "PUT": random.randint(100, 500),
                                        "DELETE": random.randint(50, 200)}
            
            # 示例请求路径数据
            paths = ["/index.html", "/about.html", "/contact.html", "/products.html", "/blog/"]
            self.most_requested_paths = [(path, random.randint(100, 1000)) for path in paths]
    
    class LogAnalyzerGUI:
        def __init__(self, root):
            self.root = root
            self.root.title("Apache日志分析工具")
            self.root.geometry("800x600")  # 增大窗口尺寸以容纳更多内容
            
            # 设置中文字体
            self.style = ttk.Style()
            self.style.configure("TButton", font=('SimHei', 10))
            self.style.configure("TLabel", font=('SimHei', 10))
            self.style.configure("TText", font=('SimHei', 10))
            
            self.log_file_path = None
            self.analyzer = None
            
            self.create_widgets()
        
        def create_widgets(self):
            # 创建顶部框架用于选择文件
            top_frame = ttk.Frame(self.root, padding="10")
            top_frame.pack(fill=tk.X)
            
            self.file_label = ttk.Label(top_frame, text="未选择日志文件")
            self.file_label.pack(side=tk.LEFT, padx=(0, 10))
            
            select_file_btn = ttk.Button(top_frame, text="选择日志文件", command=self.select_log_file)
            select_file_btn.pack(side=tk.LEFT)
            
            # 创建中间框架用于分析按钮
            middle_frame = ttk.Frame(self.root, padding="10")
            middle_frame.pack(fill=tk.X)
            
            analyze_btn = ttk.Button(middle_frame, text="开始分析", command=self.start_analysis)
            analyze_btn.pack(fill=tk.X)
            
            # 创建结果标签页
            self.notebook = ttk.Notebook(self.root)
            self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
            
            # 创建日志输出标签页
            log_frame = ttk.Frame(self.notebook)
            self.notebook.add(log_frame, text="操作日志")
            
            self.log_text = tk.Text(log_frame, wrap=tk.WORD, height=15)
            self.log_text.pack(fill=tk.BOTH, expand=True)
            
            # 添加滚动条到日志文本框
            log_scrollbar = ttk.Scrollbar(self.log_text, command=self.log_text.yview)
            log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
            self.log_text.config(yscrollcommand=log_scrollbar.set)
            
            # 创建分析报告标签页
            report_frame = ttk.Frame(self.notebook)
            self.notebook.add(report_frame, text="分析报告")
            
            self.report_text = tk.Text(report_frame, wrap=tk.WORD, height=15)
            self.report_text.pack(fill=tk.BOTH, expand=True)
            
            # 添加滚动条到报告文本框
            report_scrollbar = ttk.Scrollbar(self.report_text, command=self.report_text.yview)
            report_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
            self.report_text.config(yscrollcommand=report_scrollbar.set)
            
            # 重定向stdout到日志文本框
            import sys
            sys.stdout = TextRedirector(self.log_text, "stdout")
        
        def select_log_file(self):
            file_path = filedialog.askopenfilename(
                title="选择Apache日志文件",
                filetypes=[("日志文件", "*.log"), ("所有文件", "*.*")]
            )
            
            if file_path:
                self.log_file_path = file_path
                self.file_label.config(text=file_path)
                messagebox.showinfo("文件选择", f"已选择文件: {file_path}")
        
        def start_analysis(self):
            if not self.log_file_path:
                # 如果没有选择文件,询问是否使用示例数据
                if messagebox.askyesno("无文件选择", "未选择日志文件,是否使用示例数据进行分析?"):
                    self.analyzer = ApacheLogAnalyzer()
                    self.log_text.delete(1.0, tk.END)
                    self.report_text.delete(1.0, tk.END)
                    self.analyzer.run_full_analysis()
                    # 显示分析报告
                    self.display_analysis_report()
                    messagebox.showinfo("分析完成", "使用示例数据的日志分析已完成!")
            else:
                try:
                    self.analyzer = ApacheLogAnalyzer(self.log_file_path)
                    self.log_text.delete(1.0, tk.END)
                    self.report_text.delete(1.0, tk.END)
                    self.analyzer.run_full_analysis()
                    # 显示分析报告
                    self.display_analysis_report()
                    messagebox.showinfo("分析完成", "日志分析已完成!")
                except Exception as e:
                    messagebox.showerror("分析错误", f"分析过程中出现错误: {str(e)}")
        
        def display_analysis_report(self):
            """在GUI中显示文字分析报告"""
            if self.analyzer and self.analyzer.analysis_report:
                self.report_text.configure(state="normal")
                self.report_text.delete(1.0, tk.END)
                self.report_text.insert(tk.END, self.analyzer.analysis_report)
                self.report_text.configure(state="disabled")
            else:
                self.report_text.configure(state="normal")
                self.report_text.insert(tk.END, "无法显示分析报告: 没有找到报告数据。")
                self.report_text.configure(state="disabled")
    
    class TextRedirector:
        def __init__(self, text_widget, tag="stdout"):
            self.text_widget = text_widget
            self.tag = tag
        
        def write(self, string):
            self.text_widget.configure(state="normal")
            self.text_widget.insert(tk.END, string)
            self.text_widget.see(tk.END)
            self.text_widget.configure(state="disabled")
        
        def flush(self):
            pass
    
    if __name__ == "__main__":
        root = tk.Tk()
        app = LogAnalyzerGUI(root)
        root.mainloop()

  • python库baostock获取股票数据

    import baostock as bs
    import pandas as pd
    import numpy as np
    
    # 登录baostock系统
    lg = bs.login()
    print('登录返回代码:', lg.error_code)
    print('登录返回信息:', lg.error_msg)
    
    # 获取股票数据
    # 示例:获取上证指数(000001.SH)2023年的日K线数据
    start_date = '2025-01-01'
    end_date = '2025-9-30'
    stock_code = 'sh.600938'
    
    # 获取日K线数据
    rs = bs.query_history_k_data_plus(stock_code,
        "date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST",
        start_date=start_date, end_date=end_date, frequency="d", adjustflag="3")
    
    print('获取K线数据返回代码:', rs.error_code)
    print('获取K线数据返回信息:', rs.error_msg)
    
    # 解析数据并存储到DataFrame
    data_list = []
    while (rs.error_code == '0') & rs.next():
        data_list.append(rs.get_row_data())
    
    # 创建DataFrame
    column_names = ["date","code","open","high","low","close","preclose","volume","amount","adjustflag","turn","tradestatus","pctChg","isST"]
    df = pd.DataFrame(data_list, columns=column_names)
    
    # 数据类型转换 - 将字符串转换为数字格式
    numeric_columns = ["open","high","low","close","preclose","volume","amount","adjustflag","turn","pctChg"]
    for col in numeric_columns:
        # 替换可能存在的非数字字符
        df[col] = pd.to_numeric(df[col], errors='coerce')
    
    # 处理可能的缺失值
    # 1. 查看缺失值情况
    print("缺失值统计:")
    print(df.isnull().sum())
    
    # 2. 填充缺失值(可以根据实际需求选择填充方式)
    # 对于价格类数据,使用前一天的值填充
    price_columns = ["open","high","low","close","preclose"]
    df[price_columns] = df[price_columns].fillna(method='ffill')
    
    # 对于交易量、成交额等,使用0填充
    volume_columns = ["volume","amount"]
    df[volume_columns] = df[volume_columns].fillna(0)
    
    # 对于其他数值列,使用均值填充
    other_numeric = list(set(numeric_columns) - set(price_columns) - set(volume_columns))
    for col in other_numeric:
        if not df[col].empty and df[col].notna().any():
            df[col] = df[col].fillna(df[col].mean())
    
    # 转换日期格式
    df['date'] = pd.to_datetime(df['date'])
    
    # 输出数据基本信息
    print("\n数据基本信息:")
    print(df.info())
    print("\n数据前5行:")
    print(df.head())
    print("\n数据后5行:")
    print(df.tail())
    
    # 保存数据为CSV文件(适合SPSS Modeler导入)
    output_file = 'stock_data_numeric.csv'
    df.to_csv(output_file, index=False, encoding='utf-8')
    print(f"\n数据已保存至:{output_file}")
    
    # 登出系统
    bs.logout()
    
    # 提示用户如何在SPSS Modeler中使用该数据
    print("\n提示:")
    print("1. 已将所有数值列转换为数字格式,适合SPSS Modeler分析")
    print("2. 日期列已转换为标准日期格式")
    print("3. 缺失值已进行处理")
    print("4. 您可以在SPSS Modeler中通过'CSV数据源'节点导入此文件进行分析")
    print("5. 如需获取其他股票或时间段数据,请修改代码中的start_date、end_date和stock_code参数")
    

    下面是使用baostock获取的股票数据中各个字段的详细说明:

    数据处理说明 数据类型转换:代码中已将所有数值字段(open、high、low、close、preclose、volume、amount、adjustflag、turn、pctChg)通过pd.to_numeric()转换为数字格式

    缺失值处理:

    价格类数据(open、high、low、close、preclose)使用前一天的值填充 交易量、成交额等(volume、amount)使用0填充 其他数值列使用均值填充 日期格式:date字段已通过pd.to_datetime()转换为标准日期格式

    这些处理确保了数据适合在SPSS Modeler中进行各种统计分析,如相关性分析、回归分析等。

    需要查看当前的 stocks_baostock.py 文件内容,了解其中包含的数据字段,然后为这些字段提供详细说明。

    toolName: view_files
    status: success

    filePath: c:\Users\czliu\Documents\python\stocks_baostock.py

    下面是使用baostock获取的股票数据中各个字段的详细说明:

    1. 基本字段

    字段名中文名称数据类型说明
    date日期日期类型交易日期,格式为YYYY-MM-DD
    code股票代码字符串如”sh.600938″,其中sh表示上海证券交易所

    2. 价格相关字段

    字段名中文名称数据类型说明
    open开盘价浮点数当日股票开盘价格
    high最高价浮点数当日股票交易的最高价格
    low最低价浮点数当日股票交易的最低价格
    close收盘价浮点数当日股票收盘价格
    preclose前收盘价浮点数前一个交易日的收盘价格
    pctChg涨跌幅浮点数当日涨跌幅百分比,计算公式:(收盘价-前收盘价)/前收盘价*100%

    3. 交易量相关字段

    字段名中文名称数据类型说明
    volume成交量整数当日股票成交的数量,单位为股
    amount成交额浮点数当日股票成交的金额,单位为元
    turn换手率浮点数当日换手率百分比,计算公式:(成交量/流通股数)*100%

    4. 状态相关字段

    字段名中文名称数据类型说明
    adjustflag复权状态整数表示数据的复权类型,代码中的adjustflag=”3″表示”前复权”
    tradestatus交易状态整数表示股票的交易状态,1表示正常交易,0表示停牌
    isST是否ST字符串表示股票是否为ST股票,1表示是ST股票,0表示非ST股票

    数据处理说明

    1. 数据类型转换:代码中已将所有数值字段(open、high、low、close、preclose、volume、amount、adjustflag、turn、pctChg)通过pd.to_numeric()转换为数字格式
    2. 缺失值处理
      • 价格类数据(open、high、low、close、preclose)使用前一天的值填充
      • 交易量、成交额等(volume、amount)使用0填充
      • 其他数值列使用均值填充
    3. 日期格式:date字段已通过pd.to_datetime()转换为标准日期格式

    这些处理确保了数据适合在SPSS Modeler中进行各种统计分析,如相关性分析、回归分析等。

  • xkInfoScan – 多功能网络信息扫描工具

    https://github.com/xk11z/xkinfoscan 这里下载——

    xkInfoScan 是一款集成化的网络信息收集与安全扫描工具,支持 IP/域名/URL/信息追踪 多维度目标探测,涵盖目录扫描、CMS识别、漏洞检测、信息泄露挖掘、CDN检测等核心功能,适用于渗透测试前期信息收集、网络资产测绘及安全风险评估场景。

    工具设计遵循 模块化架构,各功能模块独立可配置,支持自定义扫描参数与结果导出,同时提供友好的命令行交互与彩色输出,降低使用门槛。

    重要许可说明:本项目基于 GNU General Public License v3.0 (GPLv3) 开源。根据协议要求,任何基于本项目的修改、衍生作品或集成到其他项目中的代码,必须以相同许可证(GPLv3)开源,且需保留原作者版权信息。

    核心功能 🚀

    基于代码分析,xkInfoScan 包含以下8大核心模块,覆盖网络信息收集全流程:

    模块分类具体功能支持参数
    信息追踪模块IP/手机号/用户名关联信息查询-k(启用模块)
    IP扫描模块基础信息探测、域名关联、RDAP注册信息、地理定位、端口扫描、CDN检测-i <IP> + --ip-mode [base/domain/rdap/geo/port_scan/cdn]
    域名扫描模块WHOIS查询、DNS解析、子域名爆破、IP绑定检测-d <域名> + --domain-mode [whois/dns/subdomain/all]
    目录扫描模块基于字典的Web目录/文件探测,支持HEAD/GET/POST请求方法-u <URL> + -s dir + -m [head/get/post]
    CMS识别模块多模式CMS类型与版本探测(详细/快速/深度/极速)-u <URL> + -s cms + --cms-mode [json/rapid/holdsword/fast]
    漏洞检测模块Web应用漏洞(SQLi/XSS)、框架漏洞(Struts2/Spring)、中间件漏洞(Tomcat/Nginx)-u <URL> + -s poc + --poc-mode [web/framework/middleware/port]
    信息泄露模块基础/深度/全面模式扫描,检测SVN/Git/.DS_Store等敏感文件泄露-u <URL> + -s infoleak + --info-mode [basic/deep/full]
    Web专项模块JS信息提取(JSFinder)、API接口探测(APIFinder)、403禁止访问绕过-u <URL> + -s [webscan/403bypass/leakattack]
  • 自动定时备份目录的Python脚本

    下面是一个可以实现自动定时备份指定目录的Python脚本。它会将源目录复制到目标目录,并在备份文件夹名称中添加时间戳以便区分不同备份。

    #backup_script.py Apply
    import shutil
    import os
    import time
    from datetime import datetime

    def backup_directory(source, destination):
    # 创建带时间戳的备份目录名
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_dir = os.path.join(destination, f"wordpress_backup_{timestamp}")

    try:
    # 复制整个目录树
    shutil.copytree(source, backup_dir)
    print(f"备份成功!备份目录: {backup_dir}")
    return True
    except Exception as e:
    print(f"备份失败: {e}")
    return False

    def scheduled_backup(source, destination, interval_seconds):
    print(f"开始定时备份,源目录: {source}, 目标目录: {destination}, 备份间隔: {interval_seconds}秒")

    while True:
    backup_directory(source, destination)
    # 等待指定的时间间隔
    time.sleep(interval_seconds)

    if __name__ == "__main__":
    # 定义源目录和目标目录
    source_dir = r"C:\www\WWW\wordpress"
    dest_dir = r"D:\phpstudy_pro\WWW"

    # 设置备份间隔(这里设置为24小时,即86400秒)
    backup_interval = 86400 # 24小时

    # 开始定时备份
    scheduled_backup(source_dir, dest_dir, backup_interval)

    使用说明

    1. 将上述代码保存为 backup_script.py
    2. 运行脚本:python backup_script.py
    3. 脚本会每24小时(86400秒)执行一次备份
    4. 备份文件会保存在目标目录下,名称格式为 wordpress_backup_YYYYMMDD_HHMMSS

    如果需要修改备份频率,可以调整 backup_interval 的值,单位为秒。例如:

    • 3600秒 = 1小时
    • 43200秒 = 12小时
    • 86400秒 = 24小时(默认值)

    注意:运行此脚本需要有足够的权限访问源目录和写入目标目录。建议以管理员身份运行命令提示符或终端。

  • 股票每天一键跑python代码

    A股双周调仓:一键日常量化脚本(AkShare)

    你要的是能每天一键跑、两次/周调仓、风控中性的实用脚本。下面这套方案以沪深300成分为底层池(稳流动性,避免幸存者偏差),周二/周四收盘后出信号,次日开盘按目标权重成交,含成本、滑点与涨跌停成交约束,并输出订单与绩效图。


    策略与风控设计

    • 目标市场与频率: A股,周二/周四调仓;信号用当日收盘数据,次日开盘成交,避开“偷看未来”。
    • 标的池: 沪深300成分,辅以滚动流动性过滤(近60日日均成交量门槛)。
    • 选股逻辑(中风险,趋势+动量+波动约束):
    • 趋势: 20日均线在60日均线上方,收盘在MA20上方。
    • 动量: 126日动量为正;RSI处于温和区间(45–65)以降低追涨顶。
    • 波动/过热: 收盘低于布林带上轨,避免过度扩张。
    • 持仓构建: 从通过筛选的股票中选前 TopK(按动量排序),用波动率倒数配权,单票上限 12%,默认最多 15 只。
    • 权重公式:对第 (i) 只,计算 20 日年化波动 (\sigma_i),原始权重 (w_i’ = 1/\sigma_i),归一化后得到 (w_i),并进行上限裁剪与再归一化。
    • 交易与成本:
    • 成交价: 次日开盘价。
    • 费率: 佣金 0.0005(双边),卖出印花税 0.001(单边),滑点 0.0005(双边)。
    • 涨跌停: 若次日开盘相对昨收涨幅 ≥9.5%(买入)或跌幅 ≤-9.5%(卖出),视为无法成交,当天该笔订单跳过。
    • 绩效与图形: 组合净值曲线、最大回撤、持仓变动与当日调仓订单 CSV。

    一键日常脚本(直接可运行)

    先安装依赖:

    pip install akshare pandas numpy matplotlib
    # -*- coding: utf-8 -*-
    # A股双周调仓 一键日常研究与回测脚本(AkShare)
    # 运行环境:Python 3.9+;依赖:akshare, pandas, numpy, matplotlib
    
    import akshare as ak
    import pandas as pd
    import numpy as np
    import time
    from datetime import datetime, timedelta
    import matplotlib.pyplot as plt
    
    # 修改为Windows系统默认中文字体
    plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun"]
    plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题
    
    # -----------------------
    # 参数区(按需修改)
    # -----------------------
    START_DATE = "2018-01-01"
    END_DATE   = None  # None 表示到今日
    CASH_INIT  = 1_000_000
    MAX_POS    = 15           # 最多持仓数
    MAX_W      = 0.12         # 单票权重上限
    FEE_COMM   = 0.0005       # 佣金
    FEE_STAMP  = 0.001        # 印花税(仅卖出)
    SLIPPAGE   = 0.0005       # 滑点
    LIQ_VOL_TH = 1_000_000    # 近60日平均成交量门槛(手/股),可按需要调
    REBAL_WEEKDAYS = {1, 3}   # 周二(1)、周四(3) 调仓;Python: Mon=0
    
    # -----------------------
    # 工具函数:指标
    # -----------------------
    def sma(s, n):
        return s.rolling(n).mean()
    
    def rsi(close, n=14):
        delta = close.diff()
        up = np.where(delta > 0, delta, 0.0)
        dn = np.where(delta < 0, -delta, 0.0)
        up_ema = pd.Series(up, index=close.index).ewm(alpha=1/n, adjust=False).mean()
        dn_ema = pd.Series(dn, index=close.index).ewm(alpha=1/n, adjust=False).mean()
        rs = up_ema / dn_ema.replace(0, np.nan)
        return 100 - (100 / (1 + rs))
    
    def bbands(close, n=20, k=2):
        mid = close.rolling(n).mean()
        std = close.rolling(n).std(ddof=0)
        up = mid + k * std
        dn = mid - k * std
        return mid, up, dn
    
    def true_range(df):
        prev_close = df["close"].shift(1)
        tr = pd.concat([
            (df["high"] - df["low"]).abs(),
            (df["high"] - prev_close).abs(),
            (df["low"] - prev_close).abs()
        ], axis=1).max(axis=1)
        return tr
    
    def ann_vol(close, n=20):
        ret = close.pct_change()
        return ret.rolling(n).std() * np.sqrt(252)
    
    # -----------------------
    # 数据获取与基准日历
    # -----------------------
    def get_trade_calendar(start=START_DATE, end=END_DATE):
        cal = ak.tool_trade_date_hist_sina()
        cal["trade_date"] = pd.to_datetime(cal["trade_date"])
        if end is None:
            end = datetime.now().strftime("%Y-%m-%d")
        cal = cal[(cal["trade_date"] >= pd.to_datetime(start)) &
                  (cal["trade_date"] <= pd.to_datetime(end))]["trade_date"].sort_values()
        return cal.tolist()
    
    def get_hs300_symbols():
        df = ak.index_stock_cons(symbol="000300")
        # 列可能是 '品种代码' 或 '成分券代码'; 做兼容
        for col in ["品种代码", "成分券代码", "代码", "code"]:
            if col in df.columns:
                return sorted(df[col].astype(str).str.zfill(6).unique().tolist())
        # 兜底
        return sorted(df.iloc[:,0].astype(str).str.zfill(6).unique().tolist())
    
    def get_hist(code, start=START_DATE, end=END_DATE, adjust="qfq"):
        if end is None:
            end = datetime.now().strftime("%Y%m%d")
        df = ak.stock_zh_a_hist(symbol=code, period="daily",
                                start_date=start.replace("-",""),
                                end_date=end.replace("-",""),
                                adjust=adjust)
        # 兼容列名
        mapper = {"日期":"date","开盘":"open","收盘":"close","最高":"high","最低":"low","成交量":"volume","成交额":"amount"}
        df = df.rename(columns=mapper)
        df["date"] = pd.to_datetime(df["date"])
        cols = [c for c in ["date","open","high","low","close","volume","amount"] if c in df.columns]
        df = df[cols].set_index("date").sort_index()
        df = df.dropna()
        return df
    
    # -----------------------
    # 信号与筛选
    # -----------------------
    def compute_indicators(df):
        out = df.copy()
        out["MA20"] = sma(out["close"], 20)
        out["MA60"] = sma(out["close"], 60)
        out["RSI14"] = rsi(out["close"], 14)
        out["MOM126"] = out["close"] / out["close"].shift(126) - 1
        mid, up, dn = bbands(out["close"], 20, 2)
        out["BB_MID"], out["BB_UP"], out["BB_DN"] = mid, up, dn
        out["ANNVOL20"] = ann_vol(out["close"], 20)
        out["TR"] = true_range(out)
        out["ATR20"] = out["TR"].rolling(20).mean()
        return out
    
    def pass_screen(row):
        c1 = row["MA20"] > row["MA60"]
        c2 = row["close"] > row["MA20"]
        c3 = row["MOM126"] > 0
        c4 = 45 <= row["RSI14"] <= 65
        c5 = row["close"] < row["BB_UP"]
        return c1 and c2 and c3 and c4 and c5
    
    # -----------------------
    # 回测:两次/周调仓,次日开盘成交
    # -----------------------
    def backtest_portfolio(symbols, start=START_DATE, end=END_DATE, cash_init=CASH_INIT):
        # 下载数据
        data = {}
        for i, sym in enumerate(symbols, 1):
            try:
                df = get_hist(sym, start, end)
                data[sym] = compute_indicators(df)
            except Exception:
                pass
            time.sleep(0.2)  # 温和限速
        # 统一日历
        all_dates = sorted(set().union(*[df.index for df in data.values()]))
        cal = pd.DatetimeIndex(all_dates)
        # 选择调仓日(周二/周四且是交易日)
        rebal_days = [d for d in cal if d.weekday() in REBAL_WEEKDAYS]
        # 过滤:近60日均量
        def liquid_ok(df, dt):
            window = df.loc[:dt].tail(60)
            if "volume" not in window: 
                return True
            return window["volume"].mean() >= LIQ_VOL_TH
    
        # 状态
        cash = cash_init
        positions = {}  # sym -> shares
        nav_series = []
        dd_series = []
        equity = cash
        peak = equity
        last_prices = {}
    
        # 逐日仿真
        for i, d in enumerate(cal[:-1]):  # 至倒数第二天(因次日开盘成交)
            todays_vals = {}
            # 更新持仓市值
            for sym, df in data.items():
                if d in df.index:
                    last_prices[sym] = df.at[d, "close"]
                if sym in positions and sym in last_prices:
                    todays_vals[sym] = positions[sym] * last_prices[sym]
            equity = cash + sum(todays_vals.values())
            peak = max(peak, equity)
            drawdown = (equity / peak) - 1
            nav_series.append((d, equity))
            dd_series.append((d, drawdown))
    
            # 调仓信号(用今日收盘)
            if d in rebal_days:
                # 生成候选
                candidates = []
                for sym, df in data.items():
                    if d not in df.index: 
                        continue
                    if not liquid_ok(df, d):
                        continue
                    row = df.loc[d]
                    # 要求指标有效
                    if np.any(pd.isna(row[["MA20","MA60","RSI14","MOM126","BB_UP","ANNVOL20"]])):
                        continue
                    if pass_screen(row):
                        candidates.append((sym, row["MOM126"], row["ANNVOL20"]))
                # 排序与截断
                candidates.sort(key=lambda x: x[1], reverse=True)
                picks = candidates[:MAX_POS]
    
                # 计算目标权重(波动率倒数)
                if picks:
                    vols = np.array([max(1e-6, x[2]) for x in picks])
                    inv = 1.0 / vols
                    w_raw = inv / inv.sum()
                    # 单票上限
                    w_capped = np.minimum(w_raw, MAX_W)
                    w = w_capped / w_capped.sum()
                    target = {sym: w[j] for j, (sym, _, _) in enumerate(picks)}
                else:
                    target = {}
    
                # 次日开盘执行
                nd = cal[i+1]
                # 构建目标头寸价值
                target_value = {sym: equity * w for sym, w in target.items()}
    
                # 先卖出未在目标内或超配部分
                for sym in list(positions.keys()):
                    df = data.get(sym)
                    if df is None or nd not in df.index or d not in df.index:
                        continue
                    prev_close = df.at[d, "close"]
                    next_open = df.at[nd, "open"]
                    # 跌停无法卖出(近似)
                    if next_open <= prev_close * (1 - 0.095):
                        continue
                    price = next_open * (1 - SLIPPAGE)
                    cur_val = positions[sym] * price
                    tgt_val = target_value.get(sym, 0.0)
                    if cur_val > tgt_val + 1:  # 超配或不在目标
                        sell_val = cur_val - tgt_val
                        shares = int(sell_val // price)
                        if shares > 0:
                            proceeds = shares * price * (1 - FEE_COMM - FEE_STAMP)
                            positions[sym] -= shares
                            if positions[sym] <= 0:
                                positions.pop(sym, None)
                            cash += proceeds
    
                # 再买入不达标或新标的
                for sym, tgt_val in target_value.items():
                    df = data.get(sym)
                    if df is None or nd not in df.index or d not in df.index:
                        continue
                    prev_close = df.at[d, "close"]
                    next_open = df.at[nd, "open"]
                    # 涨停无法买入(近似)
                    if next_open >= prev_close * (1 + 0.095):
                        continue
                    price = next_open * (1 + SLIPPAGE)
                    cur_shares = positions.get(sym, 0)
                    cur_val = cur_shares * price
                    buy_val = max(0.0, tgt_val - cur_val)
                    shares = int(buy_val // price)
                    if shares > 0 and cash > shares * price * (1 + FEE_COMM):
                        cost = shares * price * (1 + FEE_COMM)
                        cash -= cost
                        positions[sym] = cur_shares + shares
    
        nav = pd.Series({d: v for d, v in nav_series}).sort_index()
        dd = pd.Series({d: v for d, v in dd_series}).sort_index()
        ret = nav.pct_change().fillna(0)
        stats = {
            "CAGR": (nav.iloc[-1] / nav.iloc[0]) ** (252/len(nav)) - 1,
            "Vol": ret.std() * np.sqrt(252),
            "Sharpe": (ret.mean() / (ret.std() + 1e-9)) * np.sqrt(252),
            "MaxDD": dd.min()
        }
        return nav, dd, positions, stats
    
    # -----------------------
    # 今日调仓计划(实用日常)
    # -----------------------
    def today_rebalance_plan():
        today = pd.Timestamp(datetime.now().date())
        # 若今天不是交易日或不是周二/周四,直接提示
        cal = get_trade_calendar((today - pd.Timedelta(days=10)).strftime("%Y-%m-%d"),
                                 today.strftime("%Y-%m-%d"))
        cal_idx = pd.DatetimeIndex(cal)
        if today not in cal_idx or today.weekday() not in REBAL_WEEKDAYS:
            print("今天不是计划调仓日(或非交易日)。")
            return
    
        syms = get_hs300_symbols()
        plan_rows = []
        for sym in syms:
            try:
                df = get_hist(sym, (today - pd.Timedelta(days=400)).strftime("%Y-%m-%d"),
                                   today.strftime("%Y-%m-%d"))
                df = compute_indicators(df)
                if len(df) < 200 or today not in df.index:
                    continue
                row = df.loc[today]
                # 流动性
                if "volume" in df:
                    if df.loc[:today].tail(60)["volume"].mean() < LIQ_VOL_TH:
                        continue
                if np.any(pd.isna(row[["MA20","MA60","RSI14","MOM126","BB_UP","ANNVOL20"]])):
                    continue
                if pass_screen(row):
                    plan_rows.append({
                        "code": sym,
                        "mom126": row["MOM126"],
                        "annvol20": row["ANNVOL20"],
                        "close": row["close"]
                    })
            except Exception:
                pass
            time.sleep(0.05)
    
        if not plan_rows:
            print("今日无标的通过筛选。")
            return
    
        dfp = pd.DataFrame(plan_rows).sort_values("mom126", ascending=False).head(MAX_POS)
        inv = 1.0 / np.maximum(1e-6, dfp["annvol20"].values)
        w_raw = inv / inv.sum()
        w_capped = np.minimum(w_raw, MAX_W)
        w = w_capped / w_capped.sum()
        dfp["target_weight"] = w
        dfp.to_csv(f"rebalance_plan_{today.strftime('%Y%m%d')}.csv", index=False, encoding="utf-8-sig")
        print("今日计划(次日开盘执行,权重已截顶):")
        print(dfp[["code","target_weight","mom126","annvol20","close"]])
    
    # -----------------------
    # 主函数:回测 + 图形 + 今日计划
    # -----------------------
    if __name__ == "__main__":
        print("获取沪深300成分...")
        symbols = get_hs300_symbols()
        print(f"成分股数量:{len(symbols)}")
    
        print("开始回测(这可能需要几分钟)...")
        nav, dd, positions, stats = backtest_portfolio(symbols, START_DATE, END_DATE, CASH_INIT)
        print("回测统计:")
        for k, v in stats.items():
            print(f"{k}: {v:.4f}")
    
        # 绘制净值与回撤
        fig, ax = plt.subplots(2, 1, figsize=(10, 6), sharex=True,
                               gridspec_kw={"height_ratios":[3,1]})
        nav_norm = nav / nav.iloc[0]
        ax[0].plot(nav_norm.index, nav_norm.values, label="Portfolio")
        ax[0].set_title("组合净值(归一)")
        ax[0].legend()
        ax[0].grid(True, alpha=0.3)
        ax[1].fill_between(dd.index, dd.values, 0, color="red", alpha=0.3)
        ax[1].set_title("回撤")
        ax[1].grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig("backtest_nav_drawdown.png", dpi=150)
        plt.show()
    
        # 当日调仓计划(在 CST 下午 15:10 后运行更合适)
        today_rebalance_plan()

    如何每天一键跑

    • 时间点: 沧州本地时间建议在交易日 15:10 之后运行,保证当日收盘数据可用。
    • 命令行:
    • 运行: python run_quant.py
    • 输出: backtest_nav_drawdown.png、rebalance_plan_YYYYMMDD.csv,并在控制台打印统计与当日计划。
    • 定时任务:
    • Linux crontab: 10 15 * * 1-5 /usr/bin/python3 /path/run_quant.py >> /path/log.txt 2>&1
    • Windows 任务计划: 设为工作日 15:10 触发。

    可调参数与扩展

    • 可调参数:
    • MAX_POS/MAX_W: 控制集中度与风险。
    • LIQ_VOL_TH: 提高门槛可进一步降低流动性风险。
    • 费率/滑点: 按你的券商费率与成交体验微调。
    • 风控增强:
    • 止损/跟踪止盈: 以 ATR 为单位,如价格跌破 MA60 或 3×ATR 跌幅平仓。
    • 涨跌停细化: 科创/创业板 20% 规则可按股票板块调整阈值。
    • 基准比较: 叠加沪深300指数净值,计算超额与信息比率。
    • 研究维度:
    • 横截面多因子: 以动量、波动、估值(PE/PB,需额外数据)做打分,周二/周四同频调仓。
    • 再平衡鲁棒性: 改为“阈值再平衡”(偏离>25%才调仓)降低换手。

    如果你想把标的池改为中证500或加入行业中性约束,或者把调仓日改成“每周最近的周二与周四,如遇节假日顺延”,我可以把上述脚本再细化成模块化的研究框架,并加上性能剖析与结果缓存来加速日常跑批。

    以下是程序运行结果的详细解释:

    1. 数据获取与回测概况
      沪深300成分股数量:282:成功获取了当前沪深300指数的282只成分股
      回测耗时提示:策略回测需要一定计算时间,符合预期
    2. 核心回测指标解读
      | 指标 | 数值 | 含义解释 | 策略评估 |
      |——–|———|———————————–|—————————|
      | CAGR | -0.0900 | 复合年化增长率:-9.00% | 策略整体年化亏损9% |
      | Vol | 0.2348 | 波动率:23.48% | 收益波动较大 |
      | Sharpe | -0.2843 | 夏普比率:-0.28 | 风险调整后收益为负,表现弱于无风险资产 |
      | MaxDD | -0.3505 | 最大回撤:-35.05% | 策略历史最大亏损幅度为35% |
    3. 今日投资计划(次日开盘执行)
      | 代码 | 目标权重 | 126天动量(mom126) | 20天年化波动率(annvol20) | 收盘价 |
      |——–|———-|——————-|————————–|——–|
      | 300394 | 0.2 | 0.6719 | 0.7191 | 107.87 |
      | 603799 | 0.2 | 0.4765 | 0.4454 | 44.25 |
      | 002463 | 0.2 | 0.4701 | 0.5822 | 55.35 |
      | 002074 | 0.2 | 0.3858 | 0.2900 | 30.46 |
      | 000617 | 0.2 | 0.3205 | 0.4965 | 8.90 |

    计划参数说明:
    target_weight=0.2:采用等权重分配策略,每只股票配置20%仓位
    mom126:126天动量指标(越高表示近期趋势越强)
    annvol20:20天年化波动率(衡量短期风险,数值越低风险相对越小)

    1. 策略表现评估
      风险收益特征:当前策略呈现负收益、高波动特征,夏普比率为负表明策略未能有效创造超额收益
      最大回撤风险:35.05%的最大回撤需要警惕,可能超出多数投资者的风险承受能力
      持仓策略:选择了5只动量特征较强的股票进行等权重配置,兼顾了动量因子和波动率控制
  • python code example of tsfresh库

    代码功能解释

    1. 导入必要的库

    test_tsfresh.pyApply
    import pandas as pd
    from tsfresh import extract_features, select_features
    from tsfresh.utilities.dataframe_functions import impute
    from sktime.datasets import load_arrow_head
    import warnings
    • pandas:用于数据处理和分析,提供 DataFrame 和 Series 等数据结构。
    • tsfresh:用于从时间序列数据中提取和选择特征。extract_features 用于提取特征,select_features 用于选择有意义的特征,impute 用于处理缺失值。
    • sktime.datasets.load_arrow_head:用于加载 Arrow Head 时间序列数据集。
    • warnings:用于控制警告信息的显示。

    2. 忽略警告信息

    test_tsfresh.pyApply
    # 忽略 tsfresh 可能出现的警告
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    warnings.filterwarnings("ignore", category=UserWarning)

    这两行代码的作用是忽略 RuntimeWarning 和 UserWarning 类型的警告,避免这些警告信息干扰程序的运行和输出。

    3. 数据格式转换函数

    test_tsfresh.pyApply
    def convert_sktime_to_tsfresh(df_sktime, y):
    """
    Convert the dataset format from sktime to tsfresh.

    :param df_sktime: The dataset loaded by sktime.
    :param y: The target labels.
    :return: A DataFrame in the format required by tsfresh and a Series of target labels.
    """
    df_tsfresh = []
    for idx, row in df_sktime.iterrows():
    # 假设第一列包含时间序列数据
    series = row.iloc[0]
    for time, value in series.items():
    df_tsfresh.append([idx, time, value])
    df_tsfresh = pd.DataFrame(df_tsfresh, columns=['id', 'time', 'value'])
    y = pd.Series(y)
    return df_tsfresh, y
    • 该函数的作用是将 sktime 加载的数据集格式转换为 tsfresh 所需的格式。
    • df_sktime 是 sktime 加载的数据集,y 是对应的目标标签。
    • 函数通过遍历 df_sktime 的每一行,提取第一列的时间序列数据,将其转换为 [id, time, value] 的形式,存储在 df_tsfresh 列表中。
    • 最后将 df_tsfresh 列表转换为 DataFrame,并将 y 转换为 Series 后返回。

    4. 主程序

    test_tsfresh.pyApply
    if __name__ == "__main__":
    try:
    # 加载 Arrow Head 数据集
    X, y = load_arrow_head(split="train", return_X_y=True)
    # 转换数据集格式
    df, y = convert_sktime_to_tsfresh(X, y)

    # 提取特征
    X_extracted = extract_features(df, column_id='id', column_sort='time')
    # 处理缺失值
    X_extracted = impute(X_extracted)
    # 选择特征
    X_selected = select_features(X_extracted, y)

    print(X_selected)
    except Exception as e:
    print(f"An error occurred: {e}")
    • load_arrow_head(split="train", return_X_y=True):加载 Arrow Head 数据集的训练集,返回特征 X 和目标标签 y
    • convert_sktime_to_tsfresh(X, y):将 sktime 格式的数据集转换为 tsfresh 所需的格式。
    • extract_features(df, column_id='id', column_sort='time'):从转换后的 DataFrame 中提取特征,column_id 指定样本的标识列,column_sort 指定时间排序列。
    • impute(X_extracted):处理提取特征后可能出现的缺失值。
    • select_features(X_extracted, y):根据目标标签 y 选择有意义的特征。
    • try-except 块用于捕获程序运行过程中可能出现的异常,并打印错误信息。

    运算结果解释

    最终打印的 X_selected 是一个 DataFrame,包含经过特征提取和特征选择后的数据。

    • :代表不同的样本,每个样本对应 Arrow Head 数据集中的一个时间序列。
    • :代表选择出的有意义的特征,这些特征是从原始时间序列数据中提取出来的,例如均值、方差、最大值、最小值等统计特征。
    • :每个单元格的值是对应样本在该特征下的取值。

    通过这种方式,可以将原始的时间序列数据转换为结构化的特征数据,方便后续进行机器学习模型的训练和预测。

    完整代码

    import pandas as pd
    from tsfresh import extract_features, select_features
    from tsfresh.utilities.dataframe_functions import impute
    from sktime.datasets import load_arrow_head
    import warnings
    
    # 忽略 tsfresh 可能出现的警告
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    warnings.filterwarnings("ignore", category=UserWarning)
    
    
    def convert_sktime_to_tsfresh(df_sktime, y):
        """
        Convert the dataset format from sktime to tsfresh.
    
        :param df_sktime: The dataset loaded by sktime.
        :param y: The target labels.
        :return: A DataFrame in the format required by tsfresh and a Series of target labels.
        """
        df_tsfresh = []
        for idx, row in df_sktime.iterrows():
            # 假设第一列包含时间序列数据
            series = row.iloc[0]
            for time, value in series.items():
                df_tsfresh.append([idx, time, value])
        df_tsfresh = pd.DataFrame(df_tsfresh, columns=['id', 'time', 'value'])
        y = pd.Series(y)
        return df_tsfresh, y
    
    
    if __name__ == "__main__":
        try:
            # 加载 Arrow Head 数据集
            X, y = load_arrow_head(split="train", return_X_y=True)
            # 转换数据集格式
            df, y = convert_sktime_to_tsfresh(X, y)
    
            # 提取特征
            X_extracted = extract_features(df, column_id='id', column_sort='time')
            # 处理缺失值
            X_extracted = impute(X_extracted)
            # 选择特征
            X_selected = select_features(X_extracted, y)
    
            print(X_selected)
        except Exception as e:
            print(f"An error occurred: {e}")
  • 股票基本特征计算绘图

    import akshare as ak
    from numba.core.event import end_event
    import pandas as pd
    import numpy as np
    import time
    import os
    
    # 尝试获取数据,添加错误处理
    max_retries = 3
    retry_count = 0
    stock_data = None
    
    data_file = 'stock_data.csv'
    
    # 检查是否已有数据文件
    if os.path.exists(data_file):
        print(f"从本地文件 {data_file} 加载数据...")
        try:
            stock_data = pd.read_csv(data_file, encoding='utf-8')
            print("数据加载成功!")
        except Exception as e:
            print(f"加载本地数据失败: {e}")
            stock_data = None
    
    # 如果没有本地数据,尝试从akshare获取
    while retry_count < max_retries and stock_data is None:
        try:
            print(f"尝试第 {retry_count+1}/{max_retries} 次获取股票数据...")
            stock_data = ak.stock_zh_a_hist(
                symbol="000937",  # 股票代码
                start_date="20230101",  # 开始日期
                end_date="20250801",  # 结束日期
                adjust="qfq"  # 前复权
            )
            # 保存数据到本地,以便下次使用
            stock_data.to_csv(data_file, index=False, encoding='utf-8')
            print("数据获取并保存成功!")
        except Exception as e:
            print(f"获取数据失败: {e}")
            retry_count += 1
            if retry_count < max_retries:
                print(f"{5}秒后重试...")
                time.sleep(5)
    
    if stock_data is None:
        print("多次尝试获取数据失败,请检查网络连接后再试。")
        exit(1)
    
    # 移除缺失值
    stock_data = stock_data.dropna()
    print(stock_data.head())
    
    # 暂时注释掉实时行情获取,避免额外的网络请求
    # # 获取A股所有股票实时行情
    # real_time_data = ak.stock_zh_a_spot_em()
    # 
    # # 筛选特定股票
    # stock_code = "000937"
    # filtered_data = real_time_data[real_time_data["代码"] == stock_code]
    # print(filtered_data.head())
    # 计算移动平均线(MA)
    stock_data['MA5'] = stock_data['收盘'].rolling(window=5).mean()  # 5日均线
    stock_data['MA10'] = stock_data['收盘'].rolling(window=10).mean()  # 10日均线
    stock_data['MA20'] = stock_data['收盘'].rolling(window=20).mean()  # 20日均线
    
    # 计算MACD指标
    stock_data['EMA12'] = stock_data['收盘'].ewm(span=12, adjust=False).mean()
    stock_data['EMA26'] = stock_data['收盘'].ewm(span=26, adjust=False).mean()
    stock_data['DIF'] = stock_data['EMA12'] - stock_data['EMA26']
    stock_data['DEA'] = stock_data['DIF'].ewm(span=9, adjust=False).mean()
    stock_data['MACD'] = (stock_data['DIF'] - stock_data['DEA']) * 2
    
    # 计算相对强弱指数(RSI)
    delta = stock_data['收盘'].diff(1)
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    stock_data['RSI'] = 100 - (100 / (1 + gain / loss))
    
    # 计算布林带
    stock_data['BB_MID'] = stock_data['收盘'].rolling(window=20).mean()
    stock_data['BB_UP'] = stock_data['BB_MID'] + 2 * stock_data['收盘'].rolling(window=20).std()
    stock_data['BB_LOW'] = stock_data['BB_MID'] - 2 * stock_data['收盘'].rolling(window=20).std()
    
    #. 波动率特征
    # 计算日收益率
    stock_data['return'] = stock_data['收盘'].pct_change()
    
    # 计算波动率(标准差)
    stock_data['volatility_5d'] = stock_data['return'].rolling(window=5).std() * np.sqrt(5)
    stock_data['volatility_20d'] = stock_data['return'].rolling(window=20).std() * np.sqrt(20)
    ## . 量价关系特征
    # 计算量比
    stock_data['volume_ratio'] = stock_data['成交量'] / stock_data['成交量'].rolling(window=5).mean()
    # 计算成交额
    stock_data['amount'] = stock_data['收盘'] * stock_data['成交量']
    # 计算资金流向(简易版)
    stock_data['money_flow'] = (stock_data['收盘'] - stock_data['开盘']) * stock_data['成交量']
    # 基本面特征
    # 4. 可视化特征指标
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    from matplotlib.ticker import MaxNLocator
    
    # 设置中文显示
    plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
    plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
    
    # 创建画布和子图
    fig, axes = plt.subplots(4, 1, figsize=(12, 16), sharex=True)
    
    # 1. 绘制价格和移动平均线
    ax1 = axes[0]
    ax1.plot(stock_data['日期'], stock_data['收盘'], label='收盘价', color='blue')
    ax1.plot(stock_data['日期'], stock_data['MA5'], label='MA5', color='red')
    ax1.plot(stock_data['日期'], stock_data['MA10'], label='MA10', color='green')
    ax1.plot(stock_data['日期'], stock_data['MA20'], label='MA20', color='orange')
    ax1.set_title('股票价格与移动平均线')
    ax1.set_ylabel('价格')
    ax1.legend()
    ax1.grid(True)
    
    # 2. 绘制MACD
    ax2 = axes[1]
    ax2.plot(stock_data['日期'], stock_data['DIF'], label='DIF', color='blue')
    ax2.plot(stock_data['日期'], stock_data['DEA'], label='DEA', color='red')
    ax2.bar(stock_data['日期'], stock_data['MACD'], label='MACD', color='green', alpha=0.5)
    ax2.set_title('MACD指标')
    ax2.set_ylabel('值')
    ax2.legend()
    ax2.grid(True)
    
    # 3. 绘制RSI
    ax3 = axes[2]
    ax3.plot(stock_data['日期'], stock_data['RSI'], label='RSI', color='purple')
    ax3.axhline(y=70, color='red', linestyle='--', label='超买线(70)')
    ax3.axhline(y=30, color='green', linestyle='--', label='超卖线(30)')
    ax3.set_title('相对强弱指数(RSI)')
    ax3.set_ylabel('RSI值')
    ax3.set_ylim(0, 100)
    ax3.legend()
    ax3.grid(True)
    
    # 4. 绘制布林带
    ax4 = axes[3]
    ax4.plot(stock_data['日期'], stock_data['收盘'], label='收盘价', color='blue')
    ax4.plot(stock_data['日期'], stock_data['BB_MID'], label='中轨', color='black')
    ax4.plot(stock_data['日期'], stock_data['BB_UP'], label='上轨', color='red', linestyle='--')
    ax4.plot(stock_data['日期'], stock_data['BB_LOW'], label='下轨', color='green', linestyle='--')
    ax4.fill_between(stock_data['日期'], stock_data['BB_UP'], stock_data['BB_LOW'], color='gray', alpha=0.1)
    ax4.set_title('布林带指标')
    ax4.set_xlabel('日期')
    ax4.set_ylabel('价格')
    ax4.legend()
    ax4.grid(True)
    
    # 设置x轴日期格式
    for ax in axes:
        ax.xaxis.set_major_locator(MaxNLocator(prune='both', nbins=10))
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
        plt.setp(ax.get_xticklabels(), rotation=45)
    
    plt.tight_layout()
    plt.savefig('stock_indicators.png', dpi=300, bbox_inches='tight')
    print("指标图形已保存为 stock_indicators.png")
    plt.show()
    
    # 5. 保存特征数据
    # 使用UTF-8编码保存CSV文件
    stock_data.to_csv('stock_features.csv', index=False, encoding='utf-8')
    print(f"特征数据已保存,共{len(stock_data)}行")
    print(stock_data[['日期', '收盘', 'MA5', 'MA20', 'MACD', 'RSI', 'volatility_5d']].head())
    
    # """
    # 用户输入不完整,无法确定具体需求。但从工作区存在stock_basic_features.py等股票相关文件推测,可能与股票指标学习有关。我需要先查看stock_basic_features.py文件内容,了解其中涉及的指标,以便提供对应学习资料。
    
    # stock_basic_features.py 2 - 200
    
    # 以下是对stock_basic_features.py文件中涉及的股票技术指标的学习资料整理:
    
    # ### 移动平均线(MA)
    # - 概念 :移动平均线是将一定时期内的股票价格加以平均,连成一条线,用以观察股价变动趋势的技术指标。
    # - 文件中的实现 :代码计算了MA5(5日均线)、MA10(10日均线)和MA20(20日均线)。
    # - 学习建议 :理解不同周期均线的意义,如短期均线(5日、10日)反映短期趋势,中长期均线(20日、60日)反映中长期趋势。学习均线金叉、死叉、多头排列、空头排列等形态。
    # ### MACD指标
    # - 概念 :MACD(Moving Average Convergence Divergence)由快均线(EMA12)、慢均线(EMA26)的差值(DIF)和信号线(DEA)组成,用于判断股票价格的趋势和强度。
    # - 文件中的实现 :计算了EMA12、EMA26、DIF、DEA和MACD值((DIF-DEA)*2)。
    # - 学习建议 :掌握MACD的零轴穿越、金叉死叉、背离等形态的判断和应用。
    # ### 相对强弱指数(RSI)
    # - 概念 :RSI(Relative Strength Index)是衡量股票价格上涨和下跌力度的指标,取值范围0-100,通常70以上被认为超买,30以下被认为超卖。
    # - 文件中的实现 :使用14日周期计算RSI。
    # - 学习建议 :了解RSI的计算原理,学习超买超卖区域的判断,以及RSI背离现象的应用。
    # ### 布林带(BOLL)
    # - 概念 :布林带由中轨(BB_MID)、上轨(BB_UP)和下轨(BB_LOW)组成,用于衡量股价的波动性和趋势。
    # - 文件中的实现 :中轨为20日均线,上轨和下轨分别为中轨加减2倍标准差。
    # - 学习建议 :掌握布林带的开口、收缩形态,以及股价触碰上下轨的含义。
    # ### 波动率
    # - 概念 :波动率衡量股票价格的波动程度,通常用收益率的标准差表示。
    # - 文件中的实现 :计算了5日和20日波动率(volatility_5d, volatility_20d)。
    # - 学习建议 :了解波动率在风险评估和交易策略中的应用,如高波动率通常意味着高风险高收益。
    # ### 量价关系特征
    # - 概念 :量价关系研究成交量和价格之间的关系,帮助判断市场趋势的强度。
    # - 文件中的实现 :计算了量比(volume_ratio)、成交额(amount)和简易资金流向(money_flow)。
    # - 学习建议 :学习量价配合(如价升量增)和量价背离(如价升量减)的形态及其市场含义。
    # ### 学习资源推荐
    # 1. 1.
    #    书籍:《期货市场技术分析》(约翰·墨菲)、《日本蜡烛图技术》(史蒂夫·尼森)
    # 2. 2.
    #    在线课程:可以通过平台如Coursera、Udemy查找技术分析相关课程
    # 3. 3.
    #    实践:结合实际股票数据(如文件中生成的stock_features.csv)进行分析和验证
    # 通过以上指标的学习,可以帮助理解股票价格的短期波动和长期趋势,为投资决策提供参考。
    # """
    
  • 一劳永逸修改pip源-https://mirrors.aliyun.com/pypi/simple

    mkdir ~/.pip 
    vim ~/.pip/pip.conf  
    #windows system: notepad ~/.pip/pip.conf
    [global]
    index-url = https://mirrors.aliyun.com/pypi/simple/
    [install]
    trusted-host = mirrors.aliyun.com