TensorFlow模型训练高级API—Estimator使用经验总结
▉1 概述
Estimator是TensorFlow提供的一个高级API,可与dataset,feature_column等API一起实现数据预处理,模型构建,模型训练、评估以及测试等完整流程。
本文主要针对estimator使用过程中可能遇到的疑问进行总结,由于TensorFlow的版本迭代速度非常快,Python接口变化尤其剧烈,因此本文所总结的一些用法,仅基于v1.10.0版本,其他版本可参考官方文档。
当然,本文的总结也主要来自于官方文档,辅以日常使用过程中的经验。
▉2 local和distributed模式
Estimator实现了只要编写一份代码,无需任何改动,即可在local和多机distributed模式下运行,运行时的判定条件就是是否设置了系统的环境变量TF_CONFIG。
如果TF_CONFIG不为空且符合分布式cluster配置的规定,则运行分布式模式(当前仅支持between graph模式),如果该系统环境变量没有设置,则运行单机模式。
TF_CONFIG='{
"cluster": {
"chief": ["host0:2222"],
"worker": ["host1:2222", "host2:2222", "host3:2222"],
"ps": ["host4:2222", "host5:2222"]
},
"task": {"type": "chief", "index": 0}
}'
▉3 train/evaluate和train_and_evaluate
- 3.1 train/evaluate
对于每一个构建好的estimator,比如estimator = tf.estimator.DNNClassifier(…),分别提供了train和evaluate的接口,
即estimator.train(…)和estimator.evaluate(…)。其中
- train的参数为(input_fn, hooks=None, steps=None, max_steps=None, saving_listeners=None):
- input_fn提供batch大小的训练数据,其返回值需是一个(features, labels)的tuple,其中features可以是一个Tensor或者一个dict(key为feature_name,value为feature_value),labels的形式与features类似,features和labels最终都是传给model_fn,因此最主要的是要与model_fn接收的要求保持一致。
- hooks主要是用于监控运行时的一些状态,并在状态的不同阶段(开始前,开始后,结束时等)做相应的事情,此处不做详细讨论。
- saving_listeners主要是用于监听checkpoint保存前或保存后需要额外自定义进行的操作。
- 此处要重点总结的是steps和max_steps(一个step计算一个batch),官方文档对此也做了详细的解释:首先,steps和max_steps只能任选其一,不能两个参数同时使用。steps: 模型要训练多少步,即训练多少个batch的数据。如果设置为`None`,那么就会一直训练下去,永不停息或者直到input_fn产生`OutOfRange`或`StopIteration`错误。`steps`是增量统计的,调多少次train,steps就相应增加多少轮。如果你设置了train(steps=10),并且调用了2次train,那么一共就会训练10*2=20个step。当然,这是在保证input_fn能够提供至少20个batch的情况下,如果在调用train的过程中,抛出了`OutOfRange`或`StopIteration`错误,那么训练就会提前终止。一句话就是,如果数据充足,训练到指定step个数就停止,如果数据不足够step个batch,那么有多少训练多少。如果不想要step增量统计,那就使用max_steps这个参数。max_steps: 模型总共要训练的steps。如果设置为`None`,那么就会一直训练下去,永不停息或者直到input_fn产生`OutOfRange`或`StopIteration`错误。与steps有所不同,max_steps限定了训练step的总数,steps参数是只要数据充足,可以无限调用下去,而max_steps则是训练步数达到max_steps,则终止训练。两次调用train(steps=100),意味着要训练2*100=200个step。但两次调用train(max_steps=100),则意味着第二次调用时一步都不会训练,因为第一次调用已经把最大步数100步的数据都训练完了,总步数已经达到了max_steps。
- evaluate的参数为(input_fn, steps=None, hooks=None, checkpoint_path=None, name=None),steps为每次evaluate时需要对steps个batch的数据进行评估。
- 3.2 train_and_evaluate
TensorFlow还提供了一个更为高级的接口: tf.estimator.train_and_evaluate(…)。
对于某一构建好的estimator,比如estimator = tf.estimator.DNNClassifier(…),与直接使用estimator.train(…)和estimator.evaluate(…)有所不同,使用tf.estimator.train_and_evaluate(…)进行train和evaluate需要定义tf.estimator.TrainSpec(…)和tf.estimator.EvalSpec(…)。
- tf.estimator.TrainSpec(…)的参数(input_fn, max_steps=None, hooks=None),其中:
- max_steps表示模型训练的最大步数,需要注意的是训练的输入函数input_fn在train_and_evaluate这一API下不会产生`OutOfRangeError` 或者 `StopIteration`错误来终止训练过程,如果max_steps设置为`None`,训练会永远进行下去。具体的终止条件,后面会专门讨论。
- tf.estimator.EvalSpec(…)的参数(input_fn, steps=100, name=None, hooks=None, exporters=None, start_delay_secs=120, throttle_secs=600)。
- steps: 每次evaluate时需要对steps个batch的数据进行评估。
- exporters: 每次evaluation之后均会被调用,如果设置为exporter = tf.estimator.FinalExporter('./final_export', serving_input_receiver_fn=serving_input_fn),那么会在训练完成后输出一个模型。
- start_delay_secs: 在等待start_delay_secs秒后进行evaluation。
- throttle_secs: 新的evaluation需距离上次间隔throttle_secs秒才会再次进行。不过如果没有新的checkpoints可用,新的evaluation也不会发生,所以throttle_secs只是最小间隔(如果该间隔时间段内没有新的checkpoints产生,那么还会继续等待)。
▉4 训练的终止条件
从train/evaluate和train_and_evaluate接口的文档可以看出,训练终止的条件其实就是两种:
一是训练的步数达到max_steps(即消耗max_steps个batch的数据),
二是input_fn产生的数据被消耗完。
但是由于训练又有local和distributed两种模式,因此有以下需要注意的地方:
1. 如果使用train/evaluate这种分开训练和评估的接口,那么不管是local还是distributed模式,
1.1 如果steps和max_steps均设置为`None`,那么在input_fn的数据消耗完,则训练停止。因此如果在input_fn中将epoch设置为2,即全部训练数据训练两次,那么训练数据训练两次后训练即停止。
1.2 如果设置了max_steps,那么如果全部数据的条数total_num如果小于max_steps*batch_size,那么训练到max_steps即停止训练,反之则将所有训练数据训练完方才停止。
def run():
feature_columns = create_feature_columns()
estimator = create_classifier(
feature_columns,
model_dir='./checkpoints_dir')
# Train and evaluate the model every `flags.epochs_between_evals` epochs.
for n in range(train_epochs // epochs_between_evals):estimator.train(
input_fn=input_fn(train_file_path,
num_epochs=epochs_between_evals))
results = estimator.evaluate(
input_fn=input_fn(eval_file_path,
mode=tf.estimator.ModeKeys.EVAL,
num_epochs=1))
可以看出,estimator.train(...)并为设置max_steps,每次循环都会训练epoch_between_evals个epoch的数据。
2 如果使用train_and_evaluate接口,先看看官方文档的描述:
>>>
为了可靠地支持distributed和non-distributed两种模式,模型训练终止的唯一方式是`train_spec.max_steps`。如果`train_spec.max_steps`是`None`,那么模型将会永远训练下去。
*使用时需注意*如果模型训练的终止条件不是`train_spec.max_steps`。比如,如果模型训练的epoch=1,训练的`input_fn`被配置为完成1个epoch就抛出`OutOfRangeError`并且停止`Estimator.train`。那么对于一个配置有3个worker的分布式训练中,每一个worker都会讲数据训练1个epoch,实际上就是训练了3个epoch的数据而不是1个。
<<<
从上述官方文档中可以看到,官方推荐的终止条件就是设置最大训练步数`train_spec.max_steps`。实际的情况是,在local模式下,如果不设置`train_spec.max_steps`,那么模型会一直训练下去,但是如果在distributed环境下,即使设置`train_spec.max_steps`为`None`,input_fn的数据如果被消耗完,也会终止训练。因此,如果分布式训练需要训练n个epoch的数据,则不妨先做好数据的分发策略,将一个epoch的数据分配到各个worker上,让每个worker训练n个epoch的数据。
def run_train_evaluate():
train_input_fn = generate_input_fn(
data_conf['train_file_path'],
num_epochs=train_epochs,
batch_size=batch_size)
eval_input_fn = generate_input_fn(
data_conf['eval_file_path'],
mode=tf.estimator.ModeKeys.EVAL,
batch_size=batch_size,
num_epochs=1)
exporter = tf.estimator.FinalExporter(
name='final_export',
serving_input_receiver_fn=serving_input_fn)
feature_columns = create_feature_columns()
estimator = lr_model.create_classifier(
feature_columns,
model_dir='./checkpoints_dir',
learning_rate=learning_rate)
train_spec = tf.estimator.TrainSpec(
train_input_fn,
max_steps=train_max_steps)
eval_spec = tf.estimator.EvalSpec(
eval_input_fn,
steps=eval_steps,
exporters=[exporter],
throttle_secs=600)
tf.estimator.train_and_evaluate(
estimator=estimator,
train_spec=train_spec,
eval_spec=eval_spec)
▉5 分布式训练的两个注意事项
1. chief和ps
在分布式训练中,需要保证`RunConfig.model_dir`在所有worker中都一样,即所有worker都需要在自己的环境下的同一路径保存checkpoints,并且需要保证这些不同worker下的文件是在一个共享文件系统中,即各个worker都能读这个文件夹进行读写。
如果没有这样的共享文件系统,每个worker间的文件夹不能相互读写,那么在分布式训练时,需要保证chief和ps在同一个ip下(使用不同端口),否则checkpoints就无法正常读取(ps的文件和chief的文件共同组成一个完整的checkpoints,如果两者不能共享,那么就无法读取完整的checkpoints。)
2. chief和master
在设置TF_CONFIG的cluster环境时,可以设置chief或master,如果设置成chief,那么这个chief同其他的worker并无明显区别,功能相近,如果设置为master则会有一些额外功能,比如训练过程中会输出各种eval的评价指标(auc,accuracy等)。
▉6 模型保存的几个文件夹
在使用estimator训练和评估的过程中,涉及到几个dir的设置。首先就是在构建estimator的时候,参数(model_fn, model_dir=None, config=None, params=None, warm_start_from=None)里面有一个model_dir,这个model_dir是保存checkpoints的文件夹,在train_and_evaluate的参数中的tf.estimator.EvalSpec的参数中有一个exporter的参数,设置这个参数时有一个
exporter = tf.estimator.FinalExporter(name='final_export', serving_input_receiver_fn=serving_input_fn),
这时会把模型保存在checkpoints文件夹下的export/final_export文件下。
▉7 离线预测和模型评估
当前TensorFlow实现了还没提供离线评估的接口,因此通常需要先导出模型,然后加载模型预测结果并自己实现相关评价的接口(比如AUC,Accuracy等),对模型效果进行评估。
import tensorflow as tf
import numpy as np
from sklearn import metrics
data = []
y_true = []
for line in open('./data/eval_data'):
data.append(line.strip())
y_true.append(int(line.strip().split(',')[1]))
y_true = np.asarray(y_true)
data = np.array(data)
with tf.Session(graph=tf.Graph()) as sess:
tf.saved_model.loader.load(sess, ['serve'], './model_dir/1539937287')
y_pred = sess.run('linear/head/predictions/probabilities:0', feed_dict={'inputs:0': data})
y_pred = y_pred[:, 1]
fpr, tpr, thresholds = metrics.roc_curve(y_true, y_pred, pos_label=1)
auc_score = metrics.auc(fpr, tpr)
print(auc_score)