import json
from pathlib import Path
from typing import Optional
import pandas as pd
import numpy as np
from IPython.display import display
from IPython.display import display_markdownData Preparation
We’ve got some great data to work with, but it’s nowhere near ready to feed into a recommender system. In this notebook, I’ll walk through the steps of transforming the data to a format that can be used in a recommender system.
Load Data
Let’s define a few paths for our data we’re working with. As shown in the last notebook, we left the raw data (in a pickle file) in the data/raw/ directory. I’ll use the following structure for this project:
data/raw/- Uncleaned data from the input sources.data/final/- Data that is ready to be fed into a model, including train/test splits.data/intermediate/- Significant stopping points while transforming the data from its raw state to a final form.
# Data directories
root_dir = Path().resolve().parent
data_dir = root_dir / 'data'
raw_data_dir = data_dir / 'raw'
intermediate_data_dir = data_dir / 'intermediate'
final_data_dir = data_dir / 'final'
# Raw dataset
ucsd_playtime_path = raw_data_dir / 'ucsd_playtime.pkl'
# Intermediate Datasets
user_item_intermediate_path = intermediate_data_dir / 'user_item_intermediate.feather'
users_path = intermediate_data_dir / 'users.feather'
items_path = intermediate_data_dir / 'items.feather'
# Final datasets
user_items_full_path = final_data_dir / 'user_items_full.feather'
user_items_full_train_path = final_data_dir / 'user_items_full_train.feather'
user_items_full_val_path = final_data_dir / 'user_items_full_val.feather'
user_items_full_test_path = final_data_dir / 'user_items_full_test.feather'
user_items_full_meta_path = final_data_dir / 'user_items_full_meta.json'
user_items_sample_path = final_data_dir / 'user_items_1p_sample.feather'
user_items_sample_train_path = final_data_dir / 'user_items_1p_sample_train.feather'
user_items_sample_val_path = final_data_dir / 'user_items_1p_sample_val.feather'
user_items_sample_test_path = final_data_dir / 'user_items_1p_sample_test.feather'
user_items_sample_meta_path = final_data_dir / 'user_items_1p_sample_meta.json'While we’re getting ready to prepare the data, it may be helpful to define a small function for summarizing a DataFrame. I’m primarily interested in seeing the column names, data types, null count/percentage, and first few rows. We can create a quick helper function to display this:
def summarize_df(
df: pd.DataFrame,
name: str | None = None,
nulls: bool = True,
head: int | None = 5,
formatted: bool = True,
) -> None:
summary = pd.DataFrame({
'DType': df.dtypes,
'Null': df.isna().sum(),
'Total': len(df),
'% Null': df.isna().mean(),
})
if formatted:
summary['Null'] = summary['Null'].map('{:,.0f}'.format)
summary['Total'] = summary['Total'].map('{:,.0f}'.format)
summary['% Null'] = summary['% Null'].map('{:.2%}'.format)
if name:
display_markdown(f'### {name}', raw=True)
if nulls:
display(summary)
if head:
display(df.head(head))
returnucsd_playtime = pd.read_pickle(ucsd_playtime_path)
# For type hints (read_pickle() can return a Series)
assert isinstance(ucsd_playtime, pd.DataFrame)
summarize_df(ucsd_playtime, 'UCSD Playtime - Raw Data')UCSD Playtime - Raw Data
| DType | Null | Total | % Null | |
|---|---|---|---|---|
| user_id | object | 0 | 88,310 | 0.00% |
| items_count | int64 | 0 | 88,310 | 0.00% |
| steam_id | object | 0 | 88,310 | 0.00% |
| user_url | object | 0 | 88,310 | 0.00% |
| items | object | 0 | 88,310 | 0.00% |
| user_id | items_count | steam_id | user_url | items | |
|---|---|---|---|---|---|
| 0 | 76561197970982479 | 277 | 76561197970982479 | http://steamcommunity.com/profiles/76561197970... | [{'item_id': '10', 'item_name': 'Counter-Strik... |
| 1 | js41637 | 888 | 76561198035864385 | http://steamcommunity.com/id/js41637 | [{'item_id': '10', 'item_name': 'Counter-Strik... |
| 2 | evcentric | 137 | 76561198007712555 | http://steamcommunity.com/id/evcentric | [{'item_id': '1200', 'item_name': 'Red Orchest... |
| 3 | Riot-Punch | 328 | 76561197963445855 | http://steamcommunity.com/id/Riot-Punch | [{'item_id': '10', 'item_name': 'Counter-Strik... |
| 4 | doctr | 541 | 76561198002099482 | http://steamcommunity.com/id/doctr | [{'item_id': '300', 'item_name': 'Day of Defea... |
Converted Nested to Expanded Form
As shown in the previous notebook, we see the following data structure present:
user_id: stritems_count: intsteam_id: struser_url: stritems: list[dict], with nested dictionaries:item_id: stritem_name: strplaytime_forever: intplaytime_2weeks: int
Since we have item information stored in a nested list of dictionaries in the items column, we’ll need to expand it in order to have one row per user-item pair. There are a few other cleaning steps required, so let’s summarize them here:
- Correct names of user identifier column.
- The input data has
user_idandsteam_id, with the former more akin to a username, while the Steam ID is a unique integer for each user. - We’ll want to keep the
steam_idcolumn, but to keep consistent with recommender system conventions, we’ll rename this touser_idand drop the original version.
- The input data has
- Expand nested items
- We need to have one row correspond to a user/item pair.
- Current structure has one row per user, with
itemscontaining all games for that user. - The
itemsdata type can be described aslist[dict[str, str|int]]. - We’ll need to expand this to get one row per user/item.
- General Cleaning, such as:
- Dropping unneeded columns
- Renaming
playtime_foreverto justplaytimefor simplicity - Dropping null values & duplicates
- Correcting data types
I’d like to make sure these transformations happen all at once, rather than trying to remember which steps have been performed to the data. So, I’ll define each of these steps in a separate function, to be applied all-at-once.
Correcting the User Identifying Column
This part is pretty quick, though I’ll leave a docstring here to add additional clarity to the purpose.
def correct_naming_of_user_columns(df: pd.DataFrame) -> pd.DataFrame:
'''
1. Drop `user_id` as it corresponds to a username set by the user.
2. Rename `steam_id` to `user_id` to match recommender system conventions.
'''
df = df.drop(columns='user_id')
df = df.rename(columns={'steam_id': 'user_id'})
return dfExpanding the Nested Items Data
Expanding the nested items data will need to happen in 3 steps:
- Explode the nested lists, leaving one nested dictionary per row.
- Convert the form from one dictionary per column to multiple columns, with one column per dictionary key.
- Join the exploded data onto the original.
We can make use of the pd.DataFrame.explode() and pd.json_normalize() for steps 1 and 2 respectively.
def expand_nested_items(df: pd.DataFrame) -> pd.DataFrame:
df = df.explode('items').reset_index(drop=True) # 1
items_df = pd.json_normalize(df['items'].tolist()) # 2
df = pd.concat([df.drop(columns='items'), items_df], axis=1) # 3
return dfOther Data Cleaning
A few steps here that are typical, so I won’t add much explanation behind them:
- Drop
items_count,user_url, andplaytime_2weekscolumns. - Rename
playtime_foreverto justplaytime. - Drop null values & duplicate values from
user_id,item_id, andplaytime. - Convert
user_id/item_idto integers andplaytimeto a float. - Sort & reindex the data.
def clean_expanded_data(df: pd.DataFrame) -> pd.DataFrame:
df = df.drop(columns=['items_count', 'user_url', 'playtime_2weeks']) # 1
df = df.rename(columns={'playtime_forever': 'playtime'}) # 2
df = df.dropna(subset=['user_id', 'item_id'], how='any') # 3
df = df.drop_duplicates(subset=['user_id', 'item_id']) # 3
df = df.astype({'user_id': int, 'item_id': int, 'playtime': float}) # 4
df = df.sort_values(by=['user_id', 'item_id']).reset_index(drop=True) # 5
return dfApply Transformations
Now we can apply all transformations to the data. As discussed earlier, this marks a significant point in our data preparation, as we have moved from a nested/compact form to an expanded form, so we’ll save the data here to mark our progress.
user_items_inter = ucsd_playtime.copy()
user_items_inter = correct_naming_of_user_columns(user_items_inter)
user_items_inter = expand_nested_items(user_items_inter)
user_items_inter = clean_expanded_data(user_items_inter)
summarize_df(user_items_inter, 'User-Items - Intermediate Data')User-Items - Intermediate Data
| DType | Null | Total | % Null | |
|---|---|---|---|---|
| user_id | int64 | 0 | 5,094,082 | 0.00% |
| item_id | int64 | 0 | 5,094,082 | 0.00% |
| item_name | object | 0 | 5,094,082 | 0.00% |
| playtime | float64 | 0 | 5,094,082 | 0.00% |
| user_id | item_id | item_name | playtime | |
|---|---|---|---|---|
| 0 | 76561197960269200 | 10 | Counter-Strike | 15170.0 |
| 1 | 76561197960269200 | 20 | Team Fortress Classic | 0.0 |
| 2 | 76561197960269200 | 50 | Half-Life: Opposing Force | 0.0 |
| 3 | 76561197960269200 | 70 | Half-Life | 33.0 |
| 4 | 76561197960269200 | 80 | Counter-Strike: Condition Zero | 1.0 |
user_items_inter.to_pickle(user_item_intermediate_path)Long-Form User-Items - Corrections Before Modeling
There are a couple of issues with our dataset that we must correct before using in any models.
- The “Cold-Start” Problem. Typically, models have a lot of trouble when providing recommendations to users with very few interactions. To adjust for this, we’ll drop any users who have interacted with with fewer than 10 items, so that our model can focus on recommending to users with more interactions.
- The long tail. Some users have only logged a few hours on each game (the “casual” gamer), while others have logged as high as 10s of thousands of hours. To balance the two, we can use the log of playtime as a rating variable, instead of the raw playtime.
- User ID values. Many models expect the IDs to be continuous integers, but we can see that this isn’t the case for either
user_idoritem_id. We don’t want to lose the original IDs, so we can create and save a user ID map (from raw ID to continuous ID) and use the updated IDs for modeling. - The
item_namecolumn. This column is quite useful to identify the name of a game, but adds no additional value from theitem_id. We can save this column along with the item ID mappings and drop it from the final dataset.
As before, I’ll define some helper functions and apply the transformation all at once.
Drop Users with Fewer Than 10 Items
def filter_users_by_num_items(df: pd.DataFrame, min_items: int) -> pd.DataFrame:
'''
Filters DataFrame for users who have only interacted with at least
`min_items` items.
'''
n_items_by_user = df.groupby('user_id')['item_id'].count()
users_with_enough_items = n_items_by_user.loc[lambda n: n >= min_items]
df = df.loc[df['user_id'].isin(users_with_enough_items.index)]
return df.reset_index(drop=True)Convert Playtime to Log-Playtime
def convert_to_log_playtime(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df['playtime'] = df['playtime'].map(lambda x: np.log(1 + x))
return dfMap & Save User IDs
def map_to_continuous_id(data: pd.Series) -> pd.Series:
'''Maps unique values to a continuous integer ID.'''
current_ids = sorted(data.unique())
continuous_ids = range(len(current_ids))
id_map = dict(zip(current_ids, continuous_ids))
return data.map(id_map)def extract_and_save_user_id_map(df: pd.DataFrame) -> pd.DataFrame:
current_ids = df['user_id'].drop_duplicates().sort_values()
continuous_ids = map_to_continuous_id(current_ids)
id_map = pd.DataFrame({
'user_id_steam': current_ids,
'user_id_continuous': continuous_ids,
})
id_map.to_feather(users_path)
return id_mapdef map_user_ids_to_continuous_integers(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
users_df = extract_and_save_user_id_map(df)
id_map = users_df.set_index('user_id_steam')['user_id_continuous']
df['user_id'] = df['user_id'].map(id_map)
return dfMap & Save Item IDs/Names
def extract_and_save_item_name_and_id_map(df: pd.DataFrame) -> pd.DataFrame:
df = df[['item_id', 'item_name']]
df = df.drop_duplicates(subset='item_id')
df = df.sort_values(by='item_id')
item_names = df['item_name']
current_ids = df['item_id'].drop_duplicates().sort_values()
continuous_ids = map_to_continuous_id(current_ids)
items_data = pd.DataFrame({
'item_id_steam': current_ids.values,
'item_id_continuous': continuous_ids.values,
'item_name': item_names.values,
})
items_data.to_feather(items_path)
return items_data.drop(columns='item_name')def map_item_ids_to_continuous_integers(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
items_df = extract_and_save_item_name_and_id_map(df)
id_map = items_df.set_index('item_id_steam')['item_id_continuous']
df['item_id'] = df['item_id'].map(id_map)
df = df.drop(columns = 'item_name')
return dfApply Transformations
# Parameters
min_num_items = 10
# Apply Transformations
user_items_long = user_items_inter.copy()
user_items_long = filter_users_by_num_items(user_items_long, min_num_items)
user_items_long = convert_to_log_playtime(user_items_long)
user_items_long = map_user_ids_to_continuous_integers(user_items_long)
user_items_long = map_item_ids_to_continuous_integers(user_items_long)
summarize_df(user_items_long, 'Long-Form User-Items Data')
display(
user_items_long[['user_id', 'item_id']]
.nunique()
.map('{:,}'.format)
.to_frame('# Unique')
)Long-Form User-Items Data
| DType | Null | Total | % Null | |
|---|---|---|---|---|
| user_id | int64 | 0 | 5,038,365 | 0.00% |
| item_id | int64 | 0 | 5,038,365 | 0.00% |
| playtime | float64 | 0 | 5,038,365 | 0.00% |
| user_id | item_id | playtime | |
|---|---|---|---|
| 0 | 0 | 0 | 9.627141 |
| 1 | 0 | 1 | 0.000000 |
| 2 | 0 | 4 | 0.000000 |
| 3 | 0 | 6 | 3.526361 |
| 4 | 0 | 7 | 0.693147 |
| # Unique | |
|---|---|
| user_id | 57,333 |
| item_id | 10,976 |
user_items_long.to_feather(user_items_full_path)Additional Datasets
There’s a couple other datasets that will be useful to save:
- Train/dev/test splits. While typical in machine learning models, we will need a small adjustment to the typical process. Instead of a simple % to split, we will need to ensure all users are present in each split. If we didn’t, then the model would attempt to make recommendations to users it has never seen before, which isn’t the point of a recommender.
- Sample data. With over 60,000 users and 10,000 items, we’ll be creating a user-item matrix with over 600 million elements. That’s huge! If we tried to test various models with the full data, we’d end up spending most of our time waiting on the models to train, instead of developing the model architectures. We’ll create a couple samples of the full dataset for experimenting, then run the full dataset when we’re ready for a full evaluation.
Testing Various Sample Sizes
def sample_by_user(
df: pd.DataFrame,
frac: float,
random_state: Optional[int] = None,
) -> pd.DataFrame:
users = df['user_id'].drop_duplicates()
sample_users = users.sample(frac=frac, random_state=random_state)
df_sample = df.loc[df['user_id'].isin(sample_users)]
return df_sample.reset_index(drop=True)
random_state = 0
fractions = [0.001, 0.01, 0.1, 1.0]
sample_sizes = []
for frac in fractions:
df_sample = sample_by_user(user_items_long, frac, random_state)
n_users = df_sample['user_id'].nunique()
n_items = df_sample['item_id'].nunique()
sample_sizes.append([n_users, n_items])
sample_sizes_df = pd.DataFrame(
sample_sizes,
index = pd.Index(fractions, name='frac').map('{:.1%}'.format),
columns = ['Users', 'Items'],
)
sample_sizes_df['User-Item Elements'] = sample_sizes_df.product(axis=1)
sample_sizes_df.map('{:,}'.format)| Users | Items | User-Item Elements | |
|---|---|---|---|
| frac | |||
| 0.1% | 57 | 1,590 | 90,630 |
| 1.0% | 573 | 4,110 | 2,355,030 |
| 10.0% | 5,733 | 7,911 | 45,353,763 |
| 100.0% | 57,333 | 10,976 | 629,287,008 |
There’s a massive size difference between the sample sizes, but we can keep the 1% sample in addition to the full dataset.
Apply Train/Valid/Test Splits & Sampling
Since we’ll need train/test splits for sampling, we’ll need to create them for both the full dataset and the sample dataset. Since we’ll still need to reconstruct a user-item matrix, even for a train split, we’ll also save the number of users/items so we can recreate it.
def train_valid_test_split_by_item(
df: pd.DataFrame,
test_size: float=0.2,
valid_size: float=0.2,
random_state: Optional[int]=None,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
'''
Creates train/valid/test splits by keeping all users in both datasets.
'''
rng = np.random.default_rng(random_state)
train_dfs, valid_dfs, test_dfs = [], [], []
for _, df_user in df.groupby('user_id'):
n_items = len(df_user)
n_items_test = max(1, round(n_items * test_size))
n_items_valid = max(1, round(n_items * valid_size))
assert (n_items_test + n_items_valid) < n_items
indexes = np.arange(n_items)
rng.shuffle(indexes)
test_indexes = indexes[0 : n_items_test]
valid_indexes = indexes[n_items_test : n_items_test+n_items_valid]
train_indexes = indexes[n_items_test+n_items_valid :]
train_dfs.append(df_user.iloc[train_indexes])
valid_dfs.append(df_user.iloc[valid_indexes])
test_dfs.append(df_user.iloc[test_indexes])
df_train = pd.concat(train_dfs, ignore_index=True)
df_valid = pd.concat(valid_dfs, ignore_index=True)
df_test = pd.concat(test_dfs, ignore_index=True)
return df_train, df_valid, df_test# Save train/test splits for full dataset
test_size = 0.2
valid_size = 0.2
random_state = 0
df = user_items_long
df_train, df_valid, df_test \
= train_valid_test_split_by_item(df, test_size, valid_size, random_state)
df_train.to_feather(user_items_full_train_path)
df_valid.to_feather(user_items_full_val_path)
df_test.to_feather(user_items_full_test_path)
meta = {
'n_users': int(df['user_id'].nunique()),
'n_items': int(df['item_id'].nunique()),
'test_size': test_size,
'valid_size': valid_size,
'random_state': random_state,
}
with user_items_full_meta_path.open('w', encoding='utf-8') as file:
json.dump(meta, file, indent=2)
meta{'n_users': 57333,
'n_items': 10976,
'test_size': 0.2,
'valid_size': 0.2,
'random_state': 0}
# Save train/test splits for sample dataset
sample_frac = 0.01
test_size = 0.2
valid_size = 0.2
random_state = 0
df_sample = sample_by_user(user_items_long, sample_frac, random_state)
df_sample['user_id'] = map_to_continuous_id(df_sample['user_id'])
df_sample['item_id'] = map_to_continuous_id(df_sample['item_id'])
df_sample_train, df_sample_valid, df_sample_test \
= train_valid_test_split_by_item(df_sample, test_size,
valid_size, random_state)
df_sample_train.to_feather(user_items_sample_train_path)
df_sample_valid.to_feather(user_items_sample_val_path)
df_sample_test.to_feather(user_items_sample_test_path)
meta = {
'n_users': int(df_sample['user_id'].nunique()),
'n_items': int(df_sample['item_id'].nunique()),
'sample_frac': sample_frac,
'test_size': test_size,
'valid_size': valid_size,
'random_state': random_state,
}
with user_items_sample_meta_path.open('w', encoding='utf-8') as file:
json.dump(meta, file, indent=2)
meta{'n_users': 573,
'n_items': 4110,
'sample_frac': 0.01,
'test_size': 0.2,
'valid_size': 0.2,
'random_state': 0}