使用 ArcticDB 选择终极英超梦幻球队

ArcticDB

2024年8月15日

1*zvXOX31gLyupWpCDuFPj3A

想挑选一支终极梦幻足球队?在这篇博文中,我们将使用 ArcticDB 管理和分析球员原始统计数据以及过去比赛周的梦幻足球队数据。然后,通过摄取、处理和模拟球队选择,我们可以利用数据驱动的洞察力,发掘表现最佳的梦幻足球队。项目分解:

  1. 摄取球员数据并将其存储在 ArcticDB 中。

  2. 将球员数据合并到一个包含关键统计数据的单一 DataFrame 中。

  3. 分析 FPL(英超梦幻联赛)球员数据,并运行蒙特卡洛模拟,根据预定义约束选择最佳球队。

免责声明: 不能保证或承诺 ArcticDB 将选择未来表现最佳的梦幻英超球队,因为过去的表现并不代表未来的结果。本项目仅供娱乐,请勿认真对待或使用。

数据摄取

首先,我们克隆英超梦幻联赛数据仓库,如下所示,并使用 ArcticDB 和 Pandas 等 Python 库来处理数据。

git clone https://github.com/vaastav/Fantasy-Premier-League.git

使用 pandas 读取包含球员数据的 CSV 文件,并更新 ArcticDB 库。该库是所有后续数据操作的骨干。包括比赛周统计数据和原始球员属性在内的球员数据被摄取到 ArcticDB 中。我们对球员姓名进行规范化处理,去除非 ASCII 字符,以便每个数据帧的主 ID 易于输入。

每个球员的数据作为数据库中的一个独立符号存储,由规范化名称和球员 ID 标识,您可以在下方看到 KDB(凯文·德布劳内)比赛周球员数据的示例。然后我们将摄取一个名为

players_raw.csv
的文件。该文件包含球员位置 (element_type)、球队和球员 ID。

1*MAwv2yba71LDCUu3K0aB9A1*lQOLHiPiKwbfGovstr6Z1g

将数据保存到 ArcticDB 比读取 CSV 文件具有以下几个优势

  1. 更快的数据检索:对于大型数据集至关重要,对于英超梦幻联赛分析等较小数据集也有帮助。

  2. 高效的数据访问:无需重复从许多不同的 CSV 文件读取不同的球员数据。

  3. 无缝数据更新:支持在一个位置追加和更新数据。

  4. 简化数据管理:减少管理多个 CSV 文件的需求,最大限度地减少错误。

  5. 强大的查询能力:能够高效地进行复杂查询和聚合。

  6. 针对时间序列数据进行优化:非常适合随时间跟踪球员表现,提供结构化和可扩展的解决方案。

在此处了解更多关于 ArcticDB 与 CSV 文件的信息:为什么你应该使用 ArcticDB 而不是 CSV 来保存你的 Pandas DataFrames | 作者 Matthew Simpson | ArcticDB | Medium

在下面的代码片段中,您可以看到我们如何设置 ArcticDB 和一个库,以及如何使用 ArcticDB 中的写入函数将数据从 CSV 文件摄取到 ArcticDB 中的示例。

import os
import pandas as pd
import unicodedata
import arcticdb as adb
 
def normalize_name(name):
    """Normalize the name to remove non-ASCII characters."""
    return unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
 
 
def ingest_player_data(players_dir, lib):
    """Ingest player game week data into ArcticDB."""
    for player_folder in os.scandir(players_dir):
        if player_folder.is_dir():
            player_name, player_id = player_folder.name.rsplit('_', 1)
            csv_file_path = os.path.join(player_folder, 'gw.csv')
            symbol_name = f"{normalize_name(player_name)}_{player_id}"
            lib.write(symbol_name, pd.read_csv(csv_file_path))
 
# Constants
PLAYERS_DIR = './Fantasy-Premier-League/data/2023-24/players/'
RAW_DATA_PATH = './Fantasy-Premier-League/data/2023-24/players_raw.csv'
GAME_WEEK = 38
MAX_PLAYERS_PER_TEAM = 4
MAX_SPEND = 1000
RUNS = 100000
 
# Connect to ArcticDB
arctic = adb.Arctic("lmdb://fantasy_football")
library = arctic.get_library('players', create_if_missing=True)
 
# Main execution
ingest_player_data(PLAYERS_DIR, library)
library.write('players_raw', pd.read_csv(RAW_DATA_PATH))

数据转换

接下来,我们将原始球员统计数据与球员比赛周数据合并,创建一个综合数据集。在这个 DataFrame 中,我们将只保留运行模拟所需的数据,例如“element_type”、“team”、“second_name”、“first_name”、“id”、“total_points”、“value”和“DateTime”,您可以在下方看到示例。

1*8vdNMZJ1czSSz7wFlfX8Lg

我们想找到一种方法来表示每个球员的近期状态。为此,我们将使用过去 5 个比赛周的数据,并计算每个球员在这 5 个比赛周内的平均得分。

为此,我们首先使用 ArcticDB 的

QueryBuilder
进行分组聚合。这会按球员 ID 对球员数据进行分组,并计算过去 5 个比赛周的平均得分来表示球员状态。结果数据从 ArcticDB 库中读取并重置为标准索引。创建另一个查询以过滤特定比赛周的球员数据。然后将过滤后的数据与平均总得分数据合并。合并后的 DataFrame 会在适用情况下使用计算出的平均值更新总得分,删除不必要的列,并将更新后的数据写回 ArcticDB 库中,命名为 `game_week_filter` 的新符号下。您可以在下方看到输出 DataFrame 的示例以及在 ArcticDB 中执行分组和聚合的代码片段。

1*0wX7zdCDk7PEiZJOTXjs Q

def merge_player_data(lib):
    """Read raw player stats and merge with game week data."""
    raw_stats_df = lib.read('players_raw', columns=['element_type', 'team', 'second_name', 'first_name', 'id']).data
 
    df = pd.DataFrame()
    for _, row in raw_stats_df.iterrows():
        player_id = row['id']
        player_name = f"{row['first_name']}_{row['second_name']}_{player_id}"
        symbol_name = normalize_name(player_name)
        player_gw_data = lib.read(symbol_name, columns=['total_points', 'value']).data
        player_gw_data.reset_index(inplace=True)
        player_gw_data['id'] = player_id
        for col in row.index:
            player_gw_data[col] = row[col]
        df = pd.concat([df, player_gw_data], ignore_index=True)
 
    df = df[['element_type', 'team', 'second_name', 'first_name', 'id', 'total_points', 'value', 'index']]
    df = df.rename(columns={'index': 'Game_Week'})
    df["element_type"] = df["element_type"].map({1: 'GK', 2: 'DEF', 3: 'MID', 4: 'FWD'})
    return df
 
library.write('all_data', merge_player_data(library))
 
# Get data grouped by player ID and calculate the mean of total points
q2 = adb.QueryBuilder()
q2 = q2[(q2["Game_Week"] >= GAME_WEEK - 5)].groupby("id").agg({"total_points": "mean"})
new_total_points = library.read("all_data", query_builder=q2).data.reset_index()
 
# Get data filtered for the previous game week
q3 = adb.QueryBuilder()
q3 = q3[(q3["Game_Week"] == GAME_WEEK - 1)]
game_week_filter = library.read("all_data", query_builder=q3).data
 
# Merge the filtered game week data with the new total points data based on player ID
merged_df = game_week_filter.merge(new_total_points, on='id', how='left', suffixes=('', '_new'))
 
# Fill missing total point values with the original total points and create a new column for average total points
merged_df['average_total_points'] = merged_df['total_points_new'].fillna(merged_df['total_points'])
 
# Drop the temporary column used for merging
merged_df = merged_df.drop(columns=['total_points_new'])
 
# Write the merged DataFrame back to the ArcticDB library
library.write('game_week_filter', merged_df)

球队选择

项目m的核心是球员选择过程,旨在组建一支梦幻足球队。选择基于球员位置,并对每队球员最大数量和总预算设置了约束。使用 ArcticDB 的

QueryBuilder
,我们过滤球员并从可用池中随机选择,同时确保满足约束。球员根据其过往表现进行评分。例如,如果我们考察球员 KDB,脚本会获取 KDB 过去 5 个比赛周的梦幻积分并计算平均值。这就是我们评估状态积分的方式。这个过程重复进行,直到球队组建完成。下面的代码片段是一个如何选择一支球队的示例。

import random
from collections import Counter
 
def select_position(position, count, max_players_per_team, max_spend, lib, current_players, current_spend, current_teams):
    """Select players for a specific position until the required count is reached."""
    q = adb.QueryBuilder()
    q = q[(q["element_type"] == position)]
    players_df = lib.read('game_week_filter', query_builder=q).data
    current_player_ids = [player['id'] for player in current_players]
    current_teams = current_teams.copy()
 
    selected_players = []
    if players_df.empty:
        return selected_players
 
    while count > 0:
        player = players_df.sample().iloc[0]
        team_id = player['team']
        player_value = player['value']
 
        # if we haven't selected player already
        # and we haven't selected our max from each team
        # and we haven't spent too much
        if (player['id'] not in current_player_ids) and \
           (current_teams.get(team_id, 0) < max_players_per_team) and \
           (current_spend + player_value <= max_spend):
            # then add to roster
            selected_players.append(player)
            current_spend += player_value
            current_teams[team_id] += 1
            current_player_ids.append(player['id'])
            count -= 1
 
    return selected_players
 
 
 
def select_random_team(team_structure, max_players_per_team, max_spend, lib):
    """Select a random team of players based on position, budget, and team constraints."""
    total_spend = 0
    team_counts = Counter()
    selected_players = []
    keys = list(team_structure.keys())
    random.shuffle(keys)
    randomized_team_structure = {key: team_structure[key] for key in keys}
   
    for position, count in randomized_team_structure.items():
        players = select_position(position, count, max_players_per_team, max_spend, lib, selected_players, total_spend, team_counts)
        for player in players:
            total_spend+=player['value']
            team_counts[player['team']] += 1
            selected_players.append(player)
   
    return pd.DataFrame(selected_players) # Define the team structure with required player counts for each position
 
 
# Team Selection Simulation
team_structure = {'GK': 2, 'DEF': 5, 'MID': 5, 'FWD': 3}
all_teams = []
 
for run_id in range(RUNS):
    team_df = select_random_team(team_structure, MAX_PLAYERS_PER_TEAM, MAX_SPEND, library)
    team_df['run_ID'] = run_id
    all_teams.append(team_df)

为了找到最佳球队配置,以上代码运行 100,000 次迭代,本质上是执行蒙特卡洛模拟。每次迭代包括以下内容

  • 根据预定义结构组建球队(例如,2 名守门员,5 名后卫等)。

  • 从数据集中完全随机选择一名球员。

  • 计算球队的总积分和花费:球队选择的总积分基于球员在本赛季最后五场比赛的状态。这是通过计算这五场比赛周积分的平均值来完成的。然后价格基于球员的最新可用价格。

运行模拟后,根据获得的最高总积分确定最佳球队。然后显示该球队,展示所选球员、他们的位置和总积分。随着迭代次数的增加,最佳球队的总积分显著提高,如下方图表所示。

1*n BVsjt5Vo53WSgKxzLfvQ

请注意,本项目还有潜在的改进空间

  • 回测:实施回测以评估过往赛季的策略。

  • 保留部分样本外数据,以便我们最终测试策略是否存在过拟合。

  • 优化算法:利用优化算法进行更高效的球队选择。

  • 用户界面:开发用户友好的界面,以更好地与数据交互。

虽然本项目旨在娱乐,但此 notebook 展示了 ArcticDB 在定量分析方面的多功能性,适用于个人用户以及 Man Group 等组织的大规模生产系统。代码中的基本步骤具有普遍适用性。

此代码的一个关键优势是其适应性。尽管目前用于选择梦幻球队,但相同的数据可以轻松地重新用于未来的项目。由于数据以易于访问的格式存储,只需一行代码即可检索这些数据。这种方法不仅限于梦幻数据;它也可以是中央存储桶中的报价数据,组织中的任何人只需一行代码即可访问。想象一下这能带来多少生产力提升!

完整代码如下。

import os
import pandas as pd
import unicodedata
import arcticdb as adb
import random
from collections import Counter
from tabulate import tabulate

def normalize_name(name):
    """Normalize the name to remove non-ASCII characters."""
    return unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
 
def ingest_player_data(players_dir, lib):
    """Ingest player game week data into ArcticDB."""
    for player_folder in os.scandir(players_dir):
        if player_folder.is_dir():
            player_name, player_id = player_folder.name.rsplit('_', 1)
            csv_file_path = os.path.join(player_folder, 'gw.csv')
            symbol_name = f"{normalize_name(player_name)}_{player_id}"
            lib.write(symbol_name, pd.read_csv(csv_file_path))

def merge_player_data(lib):
    """Read raw player stats and merge with game week data."""
    raw_stats_df = lib.read('players_raw', columns=['element_type', 'team', 'second_name', 'first_name', 'id']).data
 
    df = pd.DataFrame()
    for _, row in raw_stats_df.iterrows():
        player_id = row['id']
        player_name = f"{row['first_name']}_{row['second_name']}_{player_id}"
        symbol_name = normalize_name(player_name)
        player_gw_data = lib.read(symbol_name, columns=['total_points', 'value']).data
        player_gw_data.reset_index(inplace=True)
        player_gw_data['id'] = player_id
        for col in row.index:
            player_gw_data[col] = row[col]
        df = pd.concat([df, player_gw_data], ignore_index=True)
 
    df = df[['element_type', 'team', 'second_name', 'first_name', 'id', 'total_points', 'value', 'index']]
    df = df.rename(columns={'index': 'Game_Week'})
    df["element_type"] = df["element_type"].map({1: 'GK', 2: 'DEF', 3: 'MID', 4: 'FWD'})
    return df

def select_position(position, count, max_players_per_team, max_spend, lib, current_players, current_spend, current_teams):
    """Select players for a specific position until the required count is reached."""
    q = adb.QueryBuilder()
    q = q[(q["element_type"] == position)]
    players_df = lib.read('game_week_filter', query_builder=q).data
    current_player_ids = [player['id'] for player in current_players]
    current_teams = current_teams.copy()
 
    selected_players = []
    if players_df.empty:
        return selected_players
 
    while count > 0:
        player = players_df.sample().iloc[0]
        team_id = player['team']
        player_value = player['value']
 
        # if we haven't selected player already
        # and we haven't selected our max from each team
        # and we haven't spent too much
        if (player['id'] not in current_player_ids) and \
           (current_teams.get(team_id, 0) < max_players_per_team) and \
           (current_spend + player_value <= max_spend):
            # then add to roster
            selected_players.append(player)
            current_spend += player_value
            current_teams[team_id] += 1
            current_player_ids.append(player['id'])
            count -= 1
 
    return selected_players
 
def select_random_team(team_structure, max_players_per_team, max_spend, lib):
    """Select a random team of players based on position, budget, and team constraints."""
    total_spend = 0
    team_counts = Counter()
    selected_players = []
    keys = list(team_structure.keys())
    random.shuffle(keys)
    randomized_team_structure = {key: team_structure[key] for key in keys}
   
    for position, count in randomized_team_structure.items():
        players = select_position(position, count, max_players_per_team, max_spend, lib, selected_players, total_spend, team_counts)
        for player in players:
            total_spend+=player['value']
            team_counts[player['team']] += 1
            selected_players.append(player)
   
    return pd.DataFrame(selected_players) # Define the team structure with required player counts for each position


# Constants
PLAYERS_DIR = './Fantasy-Premier-League/data/2023-24/players/'
RAW_DATA_PATH = './Fantasy-Premier-League/data/2023-24/players_raw.csv'
GAME_WEEK = 38
MAX_PLAYERS_PER_TEAM = 4
MAX_SPEND = 1000
RUNS = 100000
 
# Connect to ArcticDB
arctic = adb.Arctic("lmdb://fantasy_football")
library = arctic.get_library('players', create_if_missing=True)
 
# Main execution
ingest_player_data(PLAYERS_DIR, library)
library.write('players_raw', pd.read_csv(RAW_DATA_PATH))

library.write('all_data', merge_player_data(library))
 
# Get data grouped by player ID and calculate the mean of total points
q2 = adb.QueryBuilder()
q2 = q2[(q2["Game_Week"] >= GAME_WEEK - 5)].groupby("id").agg({"total_points": "mean"})
new_total_points = library.read("all_data", query_builder=q2).data.reset_index()
 
# Get data filtered for the previous game week
q3 = adb.QueryBuilder()
q3 = q3[(q3["Game_Week"] == GAME_WEEK - 1)]
game_week_filter = library.read("all_data", query_builder=q3).data
 
# Merge the filtered game week data with the new total points data based on player ID
merged_df = game_week_filter.merge(new_total_points, on='id', how='left', suffixes=('', '_new'))
 
# Fill missing total point values with the original total points and create a new column for average total points
merged_df['average_total_points'] = merged_df['total_points_new'].fillna(merged_df['total_points'])
 
# Drop the temporary column used for merging
merged_df = merged_df.drop(columns=['total_points_new'])
 
# Write the merged DataFrame back to the ArcticDB library
library.write('game_week_filter', merged_df)

 
# Team Selection Simulation
team_structure = {'GK': 2, 'DEF': 5, 'MID': 5, 'FWD': 3}
all_teams = []
 
for run_id in range(RUNS):
    team_df = select_random_team(team_structure, MAX_PLAYERS_PER_TEAM, MAX_SPEND, library)
    team_df['run_ID'] = run_id
    all_teams.append(team_df)

all_teams_df = pd.concat(all_teams, ignore_index=True)
total_points_per_run = all_teams_df.groupby('run_ID')['average_total_points'].sum()
best_run_id = total_points_per_run.idxmax()
best_team_df = all_teams_df[all_teams_df['run_ID'] == best_run_id]
total_spend_best_team = best_team_df['value'].sum()
 
# Display the best team
print("\nBest Team (Run ID with highest total points):")
print(tabulate(best_team_df, headers='keys', tablefmt='fancy_grid'))
print("\nTotal Points of Best Team:", best_team_df['average_total_points'].sum())
print("Total Spend on Best Team:", total_spend_best_team)

希望您喜欢这篇文章,并希望它能帮助您找到完美的梦幻球队。祝您赛季愉快!