tgaddair released this
Jan 14, 2020
In version 0.19.0, Horovod adds tighter integration with Apache Spark, including a new high-level Horovod Spark Estimator framework and support for accelerator-aware task-level scheduling in the upcoming Spark 3.0 release. This release also contains experimental new features including a join operation for PyTorch and the ability to launch Horovod jobs programmatically from environments like notebooks using a new interactive run mode.
To bridge the gap between large-scale data processing in Spark and large-scale deep learning training with Horovod, we’re introducing a new open source API called Horovod Spark Estimators.
With Horovod Spark Estimators, you can train your deep neural network directly on your existing Spark DataFrame, leveraging Horovod’s ability to scale to hundreds of workers in parallel without any specialized code for distributed training:
from tensorflow import keras
import tensorflow as tf
import horovod.spark.keras as hvd
model = keras.models.Sequential()
# NOTE: unscaled learning rate
optimizer = keras.optimizers.SGD(lr=0.1)
loss = 'binary_crossentropy'
store = HDFSStore('/user/username/experiments')
keras_estimator = hvd.KerasEstimator(
keras_model = keras_estimator.fit(train_df) \
predict_df = keras_model.transform(test_df)
Horovod Spark Estimators provide a single abstraction — the Estimator — which hides the complexity of gluing Spark DataFrames to a deep learning training script, reading data into a format interpretable by the training framework, and distributing the training using Horovod. The user only needs to provide a model written in the deep learning framework of their choice, and the Estimator will do the work of fitting it to the DataFrame.
After training, the Estimator returns a Transformer representation of the trained model. The model transformer can be used like any Spark ML transformer to make predictions on an input DataFrame, writing them as new columns in the output DataFrame.
Estimators can be used to track experiment history through model checkpointing, hot start retraining, and metric logging (for Tensorboard) using the Estimator Store abstraction. Stores persist all training artifacts including intermediate representations of the training data. Horovod natively supports stores for HDFS and local filesystems.
Horovod Spark Estimators are available for Keras (both tf.keras and standalone Keras) and PyTorch, with more frameworks (including pure TensorFlow) coming soon.
Apache Spark 3.0 introduces a new accelerator-aware scheduling capability, allowing a production ETL job running on CPUs to hand off data to Horovod running distributed deep learning training on GPUs within the same pipeline, breaking down the barriers between ETL and continuous model training.
Horovod users can now request GPU resources directly from their Spark application, without assuming which tasks should map to which GPUs:
from horovod.spark.task import get_available_devices
import horovod.tensorflow.keras as hvd
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = get_available_devices()
Check out the keras_spark3_rossmann.py script for a complete example.
Spark 3.0 is currently in preview release, with the full release forthcoming.
The ability for different workers to train on a different number of batches in each epoch has been one of the most requested features for Horovod. This problem frequently arises when a dataset doesn’t evenly split among all workers, requiring the user to truncate any extra examples or risk deadlock during training.
With the new join operation, users no longer need to worry about how evenly their dataset divides when training. Just add a join step at the end of each epoch, and Horovod will train on any extra batches without causing the waiting workers to deadlock:
for epoch in range(epochs):
for batch in dataset:
The join operation is currently supported in Horovod for PyTorch, with support for TensorFlow and Apache MXNet coming soon.
With horovod.spark.run, Horovod was made to support launching training jobs programmatically by defining Python functions that were executed on Spark Executors. Within Horovod Interactive Run Mode, we created a similar API that can launch training jobs on any visible hosts, similar to the command-line horovodrun tool:
from horovod.run import run as hvdrun
import horovod.tensorflow as hvd
results = hvdrun(train, np=2)
Interactive mode supports most of the functionality provided by horovodrun. See the API for a complete reference.