
威廉·江恩(William Gann)方法(第三部分):占星术是否有效?
概述
金融市场参与者一直在不断寻找新的市场分析和预测方法。即使是最不可思议的概念也从未被忽视。其中一种非传统且完全独特的方法是将占星术应用于交易,这一方法是由著名的交易员威廉·江恩(William Gann)推广的。
我们在之前的文章中已经提及了江恩的工具。这里是第一部分和第二部分。我们现在将专注于探索行星和恒星的位置对全球市场的影响。
让我们尝试将最先进的技术和古老的知识结合起来。我们将使用 Python 编程语言以及 MetaTrader 5 平台,来寻找天文现象与 EURUSD 汇率走势之间的联系。我们将涵盖金融领域占星术的理论部分,并尝试在开发预测系统的实践中进行探索。
此外,我们还将研究收集和同步天文和金融数据,创建相关矩阵,并可视化结果。
金融领域占星术的理论基础
我对这个话题已经感兴趣很久了,今天我想分享一下我对占星术对金融市场影响的看法。这是一个非常吸引人的领域,尽管存在着很大的争议。
我所理解的金融占星术的基本观点是,天体的运动与市场周期之间存在着某种联系。这个概念有着悠久而丰富的历史,上个世纪著名的交易员威廉·江恩尤其大力推广了这一概念。
我对这一理论的基本原则思考了很多。例如,循环论的观点认为,恒星和行星的运动是周期性的,市场运动也是如此。就行星相位而言,有些人认为某些行星位置对市场有很强的影响。那么星座呢?人们认为行星穿过不同的黄道星座也会影响市场。
值得提及的还有月球周期和太阳活动。我看到过一些观点认为,月相与市场的短期波动有关,而太阳耀斑则与长期趋势有关。这些假设很有趣,不是吗?
威廉·江恩是这一领域的真正先驱。他开发了一系列工具,例如他著名的九宫格,这些工具基于天文学、几何学和数列。他的成就至今仍引发激烈争论。
当然,不容忽视的是,科学界总体上对占星术持怀疑态度。在许多国家,它被正式认定为伪科学。坦白地说,到目前为止,还没有严格的证据证明占星术方法在金融领域的有效性。通常,某些观察到的相关性结果仅仅是认知偏差的结果。
尽管如此,仍有许多交易员积极地捍卫金融占星术的观点。
这就是为什么我决定进行自己的研究。我想尝试使用统计方法和大数据,对占星术对金融市场的影响进行客观评估。谁知道呢,也许我们会发现一些有趣的东西。无论如何,这都是一次探索星星与股票图表交汇世界的神秘之旅。
应用的Python库概述
我将需要一整套Python库。
首先,我决定使用Skyfield包来获取天文数据。我花了很长时间选择合适的工具,Skyfield的精确度给我留下了深刻的印象。借助它,我能够以非常高的精度(小数点后几位)收集有关天体位置和月相的信息—— 这是我构建数据集所需的一切。
至于市场数据,我选择了官方的Python MetaTrader 5库。它可以下载货币对的历史数据,甚至在必要时还能开仓交易。
Pandas将成为我们在数据处理方面的忠实伙伴。我过去经常使用这个库,它在处理时间序列方面不可或缺。我将用它来预处理和同步所有收集到的数据。
在统计分析方面,我选择了SciPy库。其强大的功能令人印象深刻,尤其是相关分析和回归分析的工具。我希望它们能帮我找到有趣的模式。
为了可视化结果,我决定使用我的老朋友— Matplotlib和Seaborn。我喜欢这些库在创建图表方面的灵活性。我相信它们将有助于可视化所有发现。
整套工具已经准备就绪。这就好比用优质的组件组装一台强大的电脑。我们现在拥有了一切所需的要素,可以全面研究占星术因素对金融市场的影响力。我已经迫不及待地想深入数据并开始测试我的假设了!
收集天文数据
import pandas as pd import numpy as np from skyfield.api import load, wgs84, utc from skyfield.data import mpc from datetime import datetime, timedelta import requests # Loading planet ephemerides planets = load('de421.bsp') earth = planets['earth'] ts = load.timescale() def get_planet_positions(date): t = ts.from_datetime(date.replace(tzinfo=utc)) planet_positions = {} planet_ids = { 'mercury': 'MERCURY BARYCENTER', 'venus': 'VENUS BARYCENTER', 'mars': 'MARS BARYCENTER', 'jupiter': 'JUPITER BARYCENTER', 'saturn': 'SATURN BARYCENTER', 'uranus': 'URANUS BARYCENTER', 'neptune': 'NEPTUNE BARYCENTER' } for planet, planet_id in planet_ids.items(): planet_obj = planets[planet_id] astrometric = earth.at(t).observe(planet_obj) ra, dec, _ = astrometric.radec() planet_positions[planet] = {'ra': ra.hours, 'dec': dec.degrees} return planet_positions def get_moon_phase(date): t = ts.from_datetime(date.replace(tzinfo=utc)) eph = load('de421.bsp') moon, sun, earth = eph['moon'], eph['sun'], eph['earth'] e = earth.at(t) _, m, _ = e.observe(moon).apparent().ecliptic_latlon() _, s, _ = e.observe(sun).apparent().ecliptic_latlon() phase = (m.degrees - s.degrees) % 360 return phase def get_solar_activity(date): # Get solar activity data from NOAA API url = f"https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json" response = requests.get(url) data = response.json() # Convert date to 'YYYY-MM' format target_date = date.strftime("%Y-%m") # Find the closest date in the data closest_data = min(data, key=lambda x: abs(datetime.strptime(x['time-tag'], "%Y-%m") - datetime.strptime(target_date, "%Y-%m"))) return { 'sunspot_number': closest_data.get('ssn', None), 'f10.7_flux': closest_data.get('f10.7', None) } def calculate_aspects(positions): aspects = {} planets = list(positions.keys()) for i in range(len(planets)): for j in range(i+1, len(planets)): planet1 = planets[i] planet2 = planets[j] ra1 = positions[planet1]['ra'] ra2 = positions[planet2]['ra'] angle = abs(ra1 - ra2) % 24 angle = min(angle, 24 - angle) * 15 # Convert to degrees if abs(angle - 0) <= 10 or abs(angle - 180) <= 10: aspects[f"{planet1}_{planet2}"] = "conjunction" if abs(angle - 0) <= 10 else "opposition" elif abs(angle - 90) <= 10: aspects[f"{planet1}_{planet2}"] = "square" elif abs(angle - 120) <= 10: aspects[f"{planet1}_{planet2}"] = "trine" return aspects start_date = datetime(2024, 4, 1, tzinfo=utc) end_date = datetime(2024, 5, 31, tzinfo=utc) current_date = start_date astronomical_data = [] while current_date <= end_date: planet_positions = get_planet_positions(current_date) moon_phase = get_moon_phase(current_date) try: solar_activity = get_solar_activity(current_date) except Exception as e: print(f"Error getting solar activity for {current_date}: {e}") solar_activity = {'sunspot_number': None, 'f10.7_flux': None} aspects = calculate_aspects(planet_positions) data = { 'date': current_date, 'moon_phase': moon_phase, 'sunspot_number': solar_activity.get('sunspot_number'), 'f10.7_flux': solar_activity.get('f10.7_flux'), **planet_positions, **aspects } astronomical_data.append(data) current_date += timedelta(days=1) print(f"Processed: {current_date}") # Convert data to DataFrame df = pd.DataFrame(astronomical_data) # Save data to CSV file df.to_csv('astronomical_data_2018_2024.csv', index=False) print("Data saved to astronomical_data_2018_2024.csv")
这段 Python 代码收集了我们未来将用于市场分析的天文数据。
代码根据2018年1月1日至2024年5月31日这一时间段,并收集了以下一系列数据:
- 行星位置——金星、水星、火星、木星、土星、天王星和海王星
- 月相
- 太阳活动
- 行星相位(行星之间的相对位置关系)
脚本包括库的导入、主循环以及将数据保存为Excel格式。代码使用了前面提到的Skyfield库来计算行星位置,使用Pandas处理数据,以及通过requests获取太阳活动数据。
其中值得注意的函数包括:用于获取行星的位置,即赤经和赤纬的get_planet_positions() 、用于查找当前月相的get_moon_phase()、直接从 NOAA API 获取太阳活动数据的get_solar_activity()以及用于计算相位,即行星之间的相对位置关系的calculate_aspects() 。
我们通过循环逐天收集所有数据。最终,我们将所有数据保存在一个Excel文件中,以便后续使用。
通过MetaTrader 5获取金融数据
为了获取金融数据,我们将使用Python的MetaTrader 5库。该库将允许我们直接从经纪商处下载金融数据,并获取任何金融工具的价格时间序列。以下是我们的历史数据加载代码:
import MetaTrader5 as mt5 import pandas as pd from datetime import datetime # Connect to MetaTrader5 if not mt5.initialize(): print("initialize() failed") mt5.shutdown() # Set query parameters symbol = "EURUSD" timeframe = mt5.TIMEFRAME_D1 start_date = datetime(2018, 1, 1) end_date = datetime(2024, 12, 31) # Request historical data rates = mt5.copy_rates_range(symbol, timeframe, start_date, end_date) # Convert data to DataFrame df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') # Save data to CSV file df.to_csv(f'{symbol}_data.csv', index=False) # Terminate the connection to MetaTrader5 mt5.shutdown()
脚本连接到交易终端,接收关于EURUSD D1(欧元兑美元日线)的数据,然后创建一个数据结构(dataframe),并将其保存到一个CSV文件中。
同步天文数据和金融数据
我们已经有了天文数据,也有欧元兑美元(EURUSD)的金融数据。现在我们需要将它们同步。让我们按日期将这些数据合并,以便一个数据集中包含所有必要的信息,既包括金融信息,也包括天文信息。
import pandas as pd # Load data astro_data = pd.read_csv('astronomical_data_2018_2024.csv') financial_data = pd.read_csv('EURUSD_data.csv') # Convert date columns to datetime astro_data['date'] = pd.to_datetime(astro_data['date']) financial_data['time'] = pd.to_datetime(financial_data['time']) # Merge data merged_data = pd.merge(financial_data, astro_data, left_on='time', right_on='date', how='inner') # Save merged data merged_data.to_csv('merged_astro_financial_data.csv', index=False)
脚本加载所有保存的数据,将日期列格式化为日期时间格式,并按日期合并数据集。最终,我们得到了一个包含所有我们需要用于后续分析的数据的CSV文件。
相关性的统计分析
那么,让我们继续吧!我们已经有一个共同的数据集,现在是时候找出天文现象和市场走势之间是否存在任何关系了。我们将使用Pandas库中的 corr() 函数来完成这项工作。此外,我们将把之前的两段代码合并成一个完整的脚本。
以下是最终的脚本:
import pandas as pd import numpy as np from skyfield.api import load, wgs84, utc from skyfield.data import mpc from datetime import datetime, timedelta import requests import MetaTrader5 as mt5 import seaborn as sns import matplotlib.pyplot as plt # Part 1: Collecting astronomical data # Loading planetary ephemerides planets = load('de421.bsp') earth = planets['earth'] ts = load.timescale() def get_planet_positions(date): t = ts.from_datetime(date.replace(tzinfo=utc)) planet_positions = {} planet_ids = { 'mercury': 'MERCURY BARYCENTER', 'venus': 'VENUS BARYCENTER', 'mars': 'MARS BARYCENTER', 'jupiter': 'JUPITER BARYCENTER', 'saturn': 'SATURN BARYCENTER', 'uranus': 'URANUS BARYCENTER', 'neptune': 'NEPTUNE BARYCENTER' } for planet, planet_id in planet_ids.items(): planet_obj = planets[planet_id] astrometric = earth.at(t).observe(planet_obj) ra, dec, _ = astrometric.radec() planet_positions[planet] = {'ra': ra.hours, 'dec': dec.degrees} return planet_positions def get_moon_phase(date): t = ts.from_datetime(date.replace(tzinfo=utc)) eph = load('de421.bsp') moon, sun, earth = eph['moon'], eph['sun'], eph['earth'] e = earth.at(t) _, m, _ = e.observe(moon).apparent().ecliptic_latlon() _, s, _ = e.observe(sun).apparent().ecliptic_latlon() phase = (m.degrees - s.degrees) % 360 return phase def get_solar_activity(date): url = f"https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json" response = requests.get(url) data = response.json() target_date = date.strftime("%Y-%m") closest_data = min(data, key=lambda x: abs(datetime.strptime(x['time-tag'], "%Y-%m") - datetime.strptime(target_date, "%Y-%m"))) return { 'sunspot_number': closest_data.get('ssn', None), 'f10.7_flux': closest_data.get('f10.7', None) } def calculate_aspects(positions): aspects = {} planets = list(positions.keys()) for i in range(len(planets)): for j in range(i+1, len(planets)): planet1 = planets[i] planet2 = planets[j] ra1 = positions[planet1]['ra'] ra2 = positions[planet2]['ra'] angle = abs(ra1 - ra2) % 24 angle = min(angle, 24 - angle) * 15 # Convert to degrees if abs(angle - 0) <= 10 or abs(angle - 180) <= 10: aspects[f"{planet1}_{planet2}"] = "conjunction" if abs(angle - 0) <= 10 else "opposition" elif abs(angle - 90) <= 10: aspects[f"{planet1}_{planet2}"] = "square" elif abs(angle - 120) <= 10: aspects[f"{planet1}_{planet2}"] = "trine" return aspects # Collecting astronomical data start_date = datetime(2024, 3, 1, tzinfo=utc) end_date = datetime(2024, 7, 30, tzinfo=utc) current_date = start_date astronomical_data = [] while current_date <= end_date: planet_positions = get_planet_positions(current_date) moon_phase = get_moon_phase(current_date) try: solar_activity = get_solar_activity(current_date) except Exception as e: print(f"Error getting solar activity for {current_date}: {e}") solar_activity = {'sunspot_number': None, 'f10.7_flux': None} aspects = calculate_aspects(planet_positions) data = { 'date': current_date, 'moon_phase': moon_phase, 'sunspot_number': solar_activity.get('sunspot_number'), 'f10.7_flux': solar_activity.get('f10.7_flux'), **planet_positions, **aspects } astronomical_data.append(data) current_date += timedelta(days=1) print(f"Processed: {current_date}") # Convert data to DataFrame and save astro_df = pd.DataFrame(astronomical_data) astro_df.to_csv('astronomical_data_2018_2024.csv', index=False) print("Astronomical data saved to astronomical_data_2018_2024.csv") # Part 2: Retrieving financial data via MetaTrader5 # Initialize connection to MetaTrader5 if not mt5.initialize(): print("initialize() failed") mt5.shutdown() # Set request parameters symbol = "EURUSD" timeframe = mt5.TIMEFRAME_D1 start_date = datetime(2024, 3, 1) end_date = datetime(2024, 7, 30) # Request historical data rates = mt5.copy_rates_range(symbol, timeframe, start_date, end_date) # Convert data to DataFrame financial_df = pd.DataFrame(rates) financial_df['time'] = pd.to_datetime(financial_df['time'], unit='s') # Save financial data financial_df.to_csv(f'{symbol}_data.csv', index=False) print(f"Financial data saved to {symbol}_data.csv") # Shutdown MetaTrader5 connection mt5.shutdown() # Part 3: Synchronizing astronomical and financial data # Load data astro_df = pd.read_csv('astronomical_data_2018_2024.csv') financial_df = pd.read_csv('EURUSD_data.csv') # Convert date columns to datetime astro_df['date'] = pd.to_datetime(astro_df['date']).dt.tz_localize(None) financial_df['time'] = pd.to_datetime(financial_df['time']) # Merge data merged_data = pd.merge(financial_df, astro_df, left_on='time', right_on='date', how='inner') # Save merged data merged_data.to_csv('merged_astro_financial_data.csv', index=False) print("Merged data saved to merged_astro_financial_data.csv") # Part 4: Statistical analysis of correlations # Select numeric columns for correlation analysis numeric_columns = merged_data.select_dtypes(include=[np.number]).columns # Create lags for astronomical data for col in numeric_columns: if col not in ['open', 'high', 'low', 'close', 'tick_volume', 'spread', 'real_volume']: for lag in range(1, 6): # Create lags from 1 to 5 merged_data[f'{col}_lag{lag}'] = merged_data[col].shift(lag) # Update list of numeric columns numeric_columns = merged_data.select_dtypes(include=[np.number]).columns # Calculate correlation matrix correlation_matrix = merged_data[numeric_columns].corr() # Create heatmap of correlations plt.figure(figsize=(20, 16)) sns.heatmap(correlation_matrix, annot=False, cmap='coolwarm', vmin=-1, vmax=1, center=0) plt.title('Correlation Matrix of Astronomical Factors (with Lags) and EURUSD Prices') plt.tight_layout() plt.savefig('correlation_heatmap_with_lags.png') plt.close() # Output the most significant correlations with the closing price significant_correlations = correlation_matrix['close'].sort_values(key=abs, ascending=False) print("Most significant correlations with the closing price:") print(significant_correlations) # Create a separate correlation matrix for astronomical data with lags and the current price astro_columns = [col for col in numeric_columns if col not in ['open', 'high', 'low', 'tick_volume', 'spread', 'real_volume']] astro_columns.append('close') # Add the current closing price astro_correlation_matrix = merged_data[astro_columns].corr() # Create heatmap of correlations for astronomical data with lags and the current price import seaborn as sns import matplotlib.pyplot as plt # Increase the header and axis label font plt.figure(figsize=(18, 14)) sns.heatmap(astro_correlation_matrix, annot=False, cmap='coolwarm', vmin=-1, vmax=1, center=0, cbar_kws={'label': 'Correlation'}) plt.title('Correlation matrix of astronomical factors (with lags) and current EURUSD price', fontsize=24) plt.xlabel('X-axis Label', fontsize=30) plt.ylabel('Y-axis Label', fontsize=30) plt.xticks(fontsize=30) plt.yticks(fontsize=30) plt.tight_layout() plt.savefig('astro_correlation_heatmap_with_lags.png') plt.close() print("Analysis completed. Results saved in CSV and PNG files.")
这段脚本展示了数据集中所有数字之间的相关性地图,还以热力图格式显示了所有相关性的矩阵,并且生成了与收盘价最显著的相关性列表。
相关性的存在或缺失并不意味着因果关系的存在或缺失。即使我们发现了天文数据与价格走势之间存在强相关性,这也不意味着一个因素决定了另一个因素,反之亦然。需要进行新的研究,因为相关性地图只是最基本的东西。
如果我们更深入地探讨这个话题,会发现无法在数据中找到任何显著的相关性。过去的天文数据与市场指标之间没有明确的相关性。
让机器学习来帮忙
我思考了接下来该怎么做,决定应用一个机器学习模型。我使用CatBoost库编写了两个脚本,尝试利用数据集中的数据作为特征来预测未来的价格。这是第一个模型——回归模型:
import pandas as pd import numpy as np from catboost import CatBoostRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score import matplotlib.pyplot as plt from sklearn.preprocessing import LabelEncoder # Loading data data = pd.read_csv('merged_astro_financial_data.csv') # Converting date to datetime data['date'] = pd.to_datetime(data['date']) # Creating lags for financial data for col in ['open', 'high', 'low', 'close']: for lag in range(1, 6): # Creating lags from 1 to 5 data[f'{col}_lag{lag}'] = data[col].shift(lag) # Creating lags for astronomical data astro_cols = ['mercury', 'venus', 'mars', 'jupiter', 'saturn', 'uranus', 'neptune'] for col in astro_cols: data[f'{col}_ra'] = data[col].apply(lambda x: eval(x)['ra'] if pd.notna(x) else np.nan) data[f'{col}_dec'] = data[col].apply(lambda x: eval(x)['dec'] if pd.notna(x) else np.nan) for lag in range(1, 6): # Lags from 1 to 5 data[f'{col}_ra_lag{lag}'] = data[f'{col}_ra'].shift(lag) data[f'{col}_dec_lag{lag}'] = data[f'{col}_dec'].shift(lag) data.drop(columns=[col, f'{col}_ra', f'{col}_dec'], inplace=True) # Converting aspects to numerical features aspect_cols = ['mercury_saturn', 'venus_mars', 'venus_jupiter', 'venus_uranus', 'mars_jupiter', 'mars_uranus', 'jupiter_uranus', 'mercury_neptune', 'venus_saturn', 'venus_neptune', 'mars_saturn', 'mercury_venus', 'mars_neptune', 'mercury_uranus', 'saturn_neptune', 'mercury_jupiter', 'mercury_mars', 'jupiter_saturn'] # Using LabelEncoder for encoding aspects label_encoders = {} for col in aspect_cols: label_encoders[col] = LabelEncoder() data[col] = label_encoders[col].fit_transform(data[col].astype(str)) # Filling missing values with mean values for numeric columns numeric_cols = data.select_dtypes(include=[np.number]).columns data[numeric_cols] = data[numeric_cols].fillna(data[numeric_cols].mean()) # Removing rows with missing values data = data.dropna() # Preparing features and target variable features = [col for col in data.columns if col not in ['date', 'time', 'close']] X = data[features] y = data['close'] # Splitting data into training and testing sets X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1) # Creating and training the CatBoost model model = CatBoostRegressor(iterations=500, learning_rate=0.1, depth=9, random_state=1) model.fit(X_train, y_train, eval_set=(X_test, y_test), early_stopping_rounds=200, verbose=100) # Evaluating the model y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) mae = mean_absolute_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) print(f"Mean Squared Error: {mse}") print(f"Mean Absolute Error: {mae}") print(f"R-squared Score: {r2}") # Visualizing feature importance feature_importance = model.feature_importances_ feature_names = X.columns sorted_idx = np.argsort(feature_importance) pos = np.arange(sorted_idx.shape[0]) + 0.5 plt.figure(figsize=(12, 6)) plt.barh(pos, feature_importance[sorted_idx], align='center') plt.yticks(pos, np.array(feature_names)[sorted_idx]) plt.xlabel('Feature Importance') plt.title('Feature Importance') plt.show() # Predicting the next value def predict_next(): # Selecting the last row of data last_data = data.iloc[-1] input_features = last_data[features].values.reshape(1, -1) # Prediction prediction = model.predict(input_features) print(f"Prediction for the next closing price: {prediction[0]}") # Example of using the function to predict the next value predict_next()
第二个是分类模型:
import pandas as pd import numpy as np from skyfield.api import load, utc from datetime import datetime, timedelta import requests import MetaTrader5 as mt5 import seaborn as sns import matplotlib.pyplot as plt from catboost import CatBoostClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, classification_report, confusion_matrix from sklearn.preprocessing import LabelEncoder # Part 1: Collecting astronomical data planets = load('de421.bsp') earth = planets['earth'] ts = load.timescale() def get_planet_positions(date): t = ts.from_datetime(date.replace(tzinfo=utc)) planet_positions = {} planet_ids = { 'mercury': 'MERCURY BARYCENTER', 'venus': 'VENUS BARYCENTER', 'mars': 'MARS BARYCENTER', 'jupiter': 'JUPITER BARYCENTER', 'saturn': 'SATURN BARYCENTER', 'uranus': 'URANUS BARYCENTER', 'neptune': 'NEPTUNE BARYCENTER' } for planet, planet_id in planet_ids.items(): planet_obj = planets[planet_id] astrometric = earth.at(t).observe(planet_obj) ra, dec, _ = astrometric.radec() planet_positions[planet] = {'ra': ra.hours, 'dec': dec.degrees} return planet_positions def get_moon_phase(date): t = ts.from_datetime(date.replace(tzinfo=utc)) eph = load('de421.bsp') moon, sun, earth = eph['moon'], eph['sun'], eph['earth'] e = earth.at(t) _, m, _ = e.observe(moon).apparent().ecliptic_latlon() _, s, _ = e.observe(sun).apparent().ecliptic_latlon() phase = (m.degrees - s.degrees) % 360 return phase def get_solar_activity(date): url = f"https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json" response = requests.get(url) data = response.json() target_date = date.strftime("%Y-%m") closest_data = min(data, key=lambda x: abs(datetime.strptime(x['time-tag'], "%Y-%m") - datetime.strptime(target_date, "%Y-%m"))) return { 'sunspot_number': closest_data.get('ssn', None), 'f10.7_flux': closest_data.get('f10.7', None) } def calculate_aspects(positions): aspects = {} planets = list(positions.keys()) for i in range(len(planets)): for j in range(i+1, len(planets)): planet1 = planets[i] planet2 = planets[j] ra1 = positions[planet1]['ra'] ra2 = positions[planet2]['ra'] angle = abs(ra1 - ra2) % 24 angle = min(angle, 24 - angle) * 15 # Convert to degrees if abs(angle - 0) <= 10 or abs(angle - 180) <= 10: aspects[f"{planet1}_{planet2}"] = "conjunction" if abs(angle - 0) <= 10 else "opposition" elif abs(angle - 90) <= 10: aspects[f"{planet1}_{planet2}"] = "square" elif abs(angle - 120) <= 10: aspects[f"{planet1}_{planet2}"] = "trine" return aspects # Part 2: Obtaining financial data through MetaTrader5 def get_financial_data(symbol, start_date, end_date): if not mt5.initialize(): print("initialize() failed") mt5.shutdown() return None timeframe = mt5.TIMEFRAME_D1 rates = mt5.copy_rates_range(symbol, timeframe, start_date, end_date) mt5.shutdown() financial_df = pd.DataFrame(rates) financial_df['time'] = pd.to_datetime(financial_df['time'], unit='s') return financial_df # Part 3: Synchronizing astronomical and financial data def sync_data(astro_df, financial_df): astro_df['date'] = pd.to_datetime(astro_df['date']).dt.tz_localize(None) financial_df['time'] = pd.to_datetime(financial_df['time']) merged_data = pd.merge(financial_df, astro_df, left_on='time', right_on='date', how='inner') return merged_data # Part 4: Training the model and making predictions def train_and_predict(merged_data): # Converting aspects to numerical features aspect_cols = [col for col in merged_data.columns if '_' in col and col not in ['date', 'time']] label_encoders = {} for col in aspect_cols: label_encoders[col] = LabelEncoder() merged_data[col] = label_encoders[col].fit_transform(merged_data[col].astype(str)) # Creating lags for financial data for col in ['open', 'high', 'low', 'close']: for lag in range(1, 6): merged_data[f'{col}_lag{lag}'] = merged_data[col].shift(lag) # Creating lags for astronomical data astro_cols = ['mercury', 'venus', 'mars', 'jupiter', 'saturn', 'uranus', 'neptune'] for col in astro_cols: merged_data[f'{col}_ra'] = merged_data[col].apply(lambda x: eval(x)['ra'] if pd.notna(x) else np.nan) merged_data[f'{col}_dec'] = merged_data[col].apply(lambda x: eval(x)['dec'] if pd.notna(x) else np.nan) for lag in range(1, 6): merged_data[f'{col}_ra_lag{lag}'] = merged_data[f'{col}_ra'].shift(lag) merged_data[f'{col}_dec_lag{lag}'] = merged_data[f'{col}_dec'].shift(lag) merged_data.drop(columns=[col, f'{col}_ra', f'{col}_dec'], inplace=True) # Filling missing values with mean values for numeric columns numeric_cols = merged_data.select_dtypes(include=[np.number]).columns merged_data[numeric_cols] = merged_data[numeric_cols].fillna(merged_data[numeric_cols].mean()) merged_data = merged_data.dropna() # Creating binary target variable merged_data['price_change'] = (merged_data['close'].shift(-1) > merged_data['close']).astype(int) # Removing rows with missing values in the target variable merged_data = merged_data.dropna(subset=['price_change']) features = [col for col in merged_data.columns if col not in ['date', 'time', 'close', 'price_change']] X = merged_data[features] y = merged_data['price_change'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1) model = CatBoostClassifier(iterations=500, learning_rate=0.1, depth=9, random_state=1) model.fit(X_train, y_train, eval_set=(X_test, y_test), early_stopping_rounds=200, verbose=100) y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) clf_report = classification_report(y_test, y_pred) conf_matrix = confusion_matrix(y_test, y_pred) print(f"Accuracy: {accuracy}") print("Classification Report:") print(clf_report) print("Confusion Matrix:") print(conf_matrix) # Visualizing feature importance feature_importance = model.feature_importances_ feature_names = X.columns sorted_idx = np.argsort(feature_importance) pos = np.arange(sorted_idx.shape[0]) + 0.5 plt.figure(figsize=(12, 6)) plt.barh(pos, feature_importance[sorted_idx], align='center') plt.yticks(pos, np.array(feature_names)[sorted_idx]) plt.xlabel('Feature Importance') plt.title('Feature Importance') plt.show() # Predicting the next value def predict_next(): last_data = merged_data.iloc[-1] input_features = last_data[features].values.reshape(1, -1) prediction = model.predict(input_features) print(f"Price change prediction (0: will decrease, 1: will increase): {prediction[0]}") predict_next() # Main program start_date = datetime(2023, 3, 1) end_date = datetime(2024, 7, 30) astro_data = [] current_date = start_date while current_date <= end_date: planet_positions = get_planet_positions(current_date) moon_phase = get_moon_phase(current_date) try: solar_activity = get_solar_activity(current_date) except Exception as e: print(f"Error getting solar activity for {current_date}: {e}") solar_activity = {'sunspot_number': None, 'f10.7_flux': None} aspects = calculate_aspects(planet_positions) astro_data.append({ 'date': current_date, 'mercury': str(planet_positions['mercury']), 'venus': str(planet_positions['venus']), 'mars': str(planet_positions['mars']), 'jupiter': str(planet_positions['jupiter']), 'saturn': str(planet_positions['saturn']), 'uranus': str(planet_positions['uranus']), 'neptune': str(planet_positions['neptune']), 'moon_phase': moon_phase, **solar_activity, **aspects }) current_date += timedelta(days=1) astro_df = pd.DataFrame(astro_data) symbol = "EURUSD" financial_data = get_financial_data(symbol, start_date, end_date) if financial_data is not None: merged_data = sync_data(astro_df, financial_data) train_and_predict(merged_data)
遗憾的是,这两个模型的准确性都不够高。分类模型的准确率略高于50%,这意味着我们完全可以靠抛硬币来预测。
也许,结果可以有所改进,因为回归模型并没有得到太多关注,事实上,通过行星的位置以及月亮和太阳的活动来预测价格是有可能的。如果我有兴致的话,我会就这个主题再写一篇文章。
结果
所以,现在是时候总结一下结果了。在进行了简单的分析并编写了两个预测模型之后,我们看到了关于占星术对市场潜在影响的研究结果。
相关性分析。 我们得到的相关性地图并没有揭示出行星位置与欧元兑美元(EURUSD)收盘价之间存在任何强相关性。我们所有的相关性系数都低于0.3,这让我们有理由相信,恒星或行星的位置与金融市场根本没有任何联系。
CatBoost回归模型。 回归模型的最终结果表明,基于天文数据预测未来准确收盘价的能力非常差。
模型的最终性能指标,如均方误差(MSE)、平均绝对误差(MAE)和R平方,都非常弱,对数据的解释效果也很差。同时,模型显示出的最重要的特征是滞后值和之前的价格,而不是行星的位置。那么,比起我们太阳系中任何行星的位置,这是否意味着价格本身就是一个更好的指标呢?
CatBoost分类模型。分类模型在预测未来价格的上涨或下跌方面表现得非常差。准确率勉强超过50%,这也进一步证实了在实际市场中,占星术是行不通的。
结论
研究结果相当明确——占星术的方法以及试图基于天文数据在实际市场中预测价格的方式完全无效。也许我会再回归这个话题,但就目前而言,威廉·江恩的教导看起来像是试图伪装那些根本不起作用的解决方案,而只是为了销售书籍和交易课程而设计的。
一个改进后的模型,如果也使用了江恩角度值、九宫格值以及江恩网格值,会不会表现会更好呢?我们还不知道。我对研究结果有点失望。
但我仍然认为江恩角度可以在某种程度上被用来获得有效的价格预测。价格在某种程度上受到角度的影响,它对角度变化有反应,从之前研究的结果中可以看出。也许角度也可以被用作训练模型的有效特征。我会尝试创建这样一个数据集,看看最终会得出什么结果。
本文由MetaQuotes Ltd译自俄文
原文地址: https://www.mql5.com/ru/articles/15625



跟随加纳的脚步,为了经验的纯粹性,你应该采取的不是欧元兑美元,而是,例如,棉花期货。而且这种工具大致相同,天文周期也可以在其中出现,毕竟是农业。