前言
在刚刚结束的kaggle比赛M5 Forecasting - Accuracy中,因为是第一次参加,笔者也是花了大量的时间和精力在上面,历时4个月,最终拿到一块银牌(所以我拿到了大学第一个考试挂科。。。求求电磁场老师高抬贵手给点平时分放我一马吧。。。早上看到kaggle成绩异常兴奋,中午考完直接爆炸=。=),当然运气占了很大因素,这次比赛private leadboard的shake up非常大,排名波动几千名都存在的。笔者运气不错,是向上的shake:)。这里总结一下销量预测的基本流程。
从下图可以看到这次比赛排名波动非常大。
正文
读取数据
首先导入我们常用的包,因为是时间序列所以导入datetime包处理时序;因为kaggle的数据集非常大,笔者的内存实在顶不住,需要gc.collect()帮助垃圾回收;numpy、pandas、sklearn自不必说;threading和queue用多线程加速特征工程;笔者常用的融合模型,万金油的lightgbm框架。
from datetime import date, timedelta,datetime
import gc
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import lightgbm as lgb
from threading import Thread
from queue import Queue
然后我们加上一些宏变量,便于后续的特征处理和日后的代码移植。
date_last = 1913 # 最大天数
max_lags = 1000 # 用于训练的天数
fday = datetime(2016, 4, 25) # 训练集最大日期
start_day = date_last - max_lags # 开始天数
p_horizon = 28 # 预测天数
TARGET = 'sales' # 目标值
导入数据,因为数据特征会分成文件,需要使用pd.merge()进行合并,不要想着就是一个train.csv文件,这是不可能的,因为时序特征会用到rolling(窗口滑动)的方法,需要前面的销量作为后续预测的特征。这里就拿price和sales文件举个例子。
这里解释一下为什么要转化为categories类型,因为例如item_id这种特征属于字符串类型,这是模型无法接受的数据,所以我们需要将其进行编码,比如one-hot,但是种类实在太多了,ont-hot会造成数据稀疏(且占据大量内存),而且树模型可以自行划分特征,所以直接利用categories类型转化为数字编码了。
# 数据类型
numcols = [f"d_{day}" for day in range(start_day,date_last+1)]
catcols = ['id', 'item_id', 'dept_id','store_id', 'cat_id', 'state_id']
dtype = {numcol:"float32" for numcol in numcols}
dtype.update({col: "category" for col in catcols if col != "id"})
price_dtypes = {'store_id': 'category', 'item_id': 'category', 'wm_yr_wk': 'int16','sell_price':'float32' }
# 将price编码
price=pd.read_csv('数据/sell_prices.csv', dtype = price_dtypes)
for col,col_dtype in price_dtypes.items():
if col_dtype == 'category':
price[col] = price[col].cat.codes.astype('int16')
price[col] -= price[col].min()
# 将train_sale编码
train_sale=pd.read_csv('数据/sales_train_validation.csv',usecols = catcols + numcols, dtype = dtype)
for col in catcols:
if col != "id":
train_sale[col] = train_sale[col].cat.codes.astype('int16')
train_sale[col] -= train_sale[col].min()
# 增加要预测的天数
for day in range(date_last+1, date_last+ 28 +1):
train_sale[f"d_{day}"] = np.nan
# 将 train_sale、calendar和 price合并
df = df.merge(calendar,on='d',copy=False)
df = df.merge(price,on=["store_id", "item_id", "wm_yr_wk"],copy=False)
将时间信息转化为模型可以学习的方式,比如2020/7/1日可以转化为年份特征year:2020,月份特征month:7,星期特征wday:3等。
date_features = {
"wday": "weekday",
"week": "weekofyear",
"month": "month",
"quarter": "quarter",
"year": "year",
"mday": "day",
}
for date_feat_name, date_feat_func in date_features.items():
if date_feat_name in df.columns:
df[date_feat_name] = df[date_feat_name].astype("int16")
else:
df[date_feat_name] = getattr(df["date"].dt, date_feat_func).astype("int16")
特征处理
然后进行我们的重头戏,特征处理,这一步非常重要,直接影响你的模型的预测精度,做机器学习基本一大半时间花在这个上面(当然如果你是神经网络deepNN爱好者,你可能也要花大量的时间在调参上面,所以我非常抗拒神经网络,能不用就不用)。常说特征工程决定你模型的上限,调参是为了接近那个上限。这里给一个万金油写法(将过去的数据作为未来的特征):
def get_timespan(df, time, minus, periods, freq='D'):
return df[pd.date_range(time - timedelta(days=minus), periods=periods, freq=freq)]
# 创建特征
def prepare_dataset(df, time, name_prefix=None):
X = {}
# 这里创建你想要的特征,下面这里是举个例子
# for i in [3, 7, 14, 30, 60, 140]:
# tmp = get_timespan(df, time, i, i)
# X['diff_%s_mean' % i] = tmp.diff(axis=1).mean(axis=1).values
# X['mean_%s_decay' % i] = (tmp * np.power(0.9, np.arange(i)[::-1])).sum(axis=1).values
# X['mean_%s' % i] = tmp.mean(axis=1).values
# X['median_%s' % i] = tmp.median(axis=1).values
# X['min_%s' % i] = tmp.min(axis=1).values
# X['max_%s' % i] = tmp.max(axis=1).values
# X['std_%s' % i] = tmp.std(axis=1).values
X = pd.DataFrame(X, index = df.index) # 转换为dataframe
if name_prefix is not None:
X.columns = ['%s_%s' % (name_prefix, c) for c in X.columns]
if name_prefix == 'sales':
X['date'] = time # 加上时间信息
return X
因为速度感人,这里使用多线程加快处理速度(因为GIL锁的存在,没法实现真正的多线程,加速的效果不尽如意,但是windows下多进程的进程之间互相通信特别麻烦,我懒写)
# 每种商品每天的销量
df_sales = df.loc[:,['id','dept_id','store_id','date',TARGET]]
df_sales = df_sales.set_index(['id','dept_id','store_id','date'])[TARGET].unstack()
df_sales = df_sales.reset_index()
df_sales = df_sales.set_index('id')
# 准备数据
print("Preparing dataset...")
num_days = 800 # 需要创建特征的天数
t2016 = fday - timedelta(num_days)
dt = []
thread_list = []
q_day = Queue()
for i in range(num_days):# 加入队列
delta = timedelta(days=i)
q_day.put(t2016 + delta)
def func():
while q_day.empty() == 0:
day = q_day.get()
X_tmp = prepare_dataset(df_sales, day, name_prefix = 'sales') # 创建特征
X_tmp = df.merge(X_tmp, on=['date','id'], copy=False) # 特征与训练集合并
dt.append(X_tmp)
print(day)
gc.collect()
for i in range(4):# 创建线程
t = Thread(target=func)
thread_list.append(t)
for t in thread_list:# 启动线程
time.sleep(3) # 线程之间错开运行
t.start()
for t in thread_list:# 等待线程结束
t.join()
dt = pd.concat(dt, axis=0)
准备训练集和测试集
分出训练集和测试集,因为时序问题的特殊性,应该用过去的数据预测未来,用未来的数据预测过去是不科学的,这也造成了我们做local cv的苦难,不能将数据集一分为四进行验证模型。这里我用了最近的5天数据作为验证集。
# 训练集和验证集
x_train = dt[dt.date < (fday - timedelta(5))]
x_test = dt[(dt.date < fday) & (dt.date > (fday - timedelta(5)))] # 5天的数据
y_train = x_train[TARGET] # 训练集target
y_test = x_test[TARGET] # 测试集target
print('训练集数量:'+str(x_train.shape))
建模预测
终于到了我们激动人心的模型训练环节(谁不想做一个优雅的调参者呢,特征处理这种脏活让别人干最好233)
首先设置模型参数,这里根据自己的情况设定
# lightgbm建模预测
params = {
'boosting_type': 'gbdt',
'objective': 'tweedie',
'tweedie_variance_power': 1.1,
'metric': 'rmse',
'subsample': 0.5,
'subsample_freq': 1,
'learning_rate': 0.03,
'num_leaves': 2**11-1,
'min_data_in_leaf': 2**12-1,
'feature_fraction': 0.5,
'max_bin': 100,
'n_estimators': 1400,
'boost_from_average': False,
'verbose': -1,
# 'device':'gpu',
}
训练模型,这里采用lightgbm的原生接口。
# 转换数据集格式
lgb_train = lgb.Dataset(x_train, label = y_train)
lgb_test = lgb.Dataset(x_test, label = y_test)
# 训练
m_lgb = lgb.train(params, lgb_train, valid_sets = [lgb_test],verbose_eval=100)
m_lgb.save_model("model.lgb")
前面提到我们需要采用滚动窗口预测的方式将过去的数据作为现在的特征,我们只能一天一天预测,将前一天预测的结果作为后一天的特征
# 预测
for days in range(0, 28):
current_day = fday + timedelta(days=days)
X_tmp = prepare_dataset(df_sales, current_day, name_prefix = 'sales')
# X_tmp2 = prepare_dataset(df_price, current_day, name_prefix = 'price')
# X_tmp = pd.concat([X_tmp, X_tmp2], axis=1)
x_predict = df.merge(X_tmp, on=['date','id'], copy=False)
x_predict = x_predict
y_predict = m_lgb.predict(x_predict,axis=1))
print(current_day) # 打印日期
大功告成!这么一系列操作你应该可以得到一个还算可以的结果,当然争奖牌是远远不够的。后续我们还需要进行模型验证、参数优化等一系列操作,这里就不展开讲了:)
tip
因为数据集一般都非常大,本次比赛的数据全部展开要达到千万级的数据量,占用非常大的内存,在服务器上直接爆内存,只能用家里的电脑借助虚拟内存撑一撑。这里附一个降低精度的代码,可以有效缓解内存不够的窘境。
import pandas as pd
import numpy as np
def reduce_mem_usage(df):
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if str(col_type)[:3] == 'int' or str(col_type)[:5] == 'float':
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
# pandas没有float16类型, 可能会报错
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
elif col_type == 'object':
df[col] = df[col].astype('category')
end_mem = df.memory_usage().sum() / 1024**2
if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
return df