Skip to content

Petastorm hangs forever in DataBricks #804

Open
@juzzmac

Description

I've tried several different versions of the following code, all of which work when running locally but hang forever in DataBricks
(single node, 13.3 LTS ML runtime):

import os
import pandas as pd
import numpy as np
from petastorm.spark import SparkDatasetConverter, make_spark_converter
from pyspark.sql import SparkSession
from tensorflow.keras.optimizers import Adam

from tensorflow.keras.layers import (
    Dense,
    Input,
    Dropout,
    concatenate,
)

from tensorflow.keras.models import Model

spark = SparkSession.builder.getOrCreate()

n_rows = 100
n_outcomes = 15
input_shapes = [10, 5, 2]
possible_outcomes = np.eye(n_outcomes).astype(int)
df = pd.DataFrame(
    {
        'c1': [[np.random.rand() for _ in range(input_shapes[0])] for __ in range(n_rows)],
        'c2': [[np.random.rand() for _ in range(input_shapes[1])] for __ in range(n_rows)],
        'c3': [[np.random.rand() for _ in range(input_shapes[2])] for __ in range(n_rows)],
        'target': [[int(x) for x in possible_outcomes[np.random.choice(possible_outcomes.shape[0])]] for __ in range(n_rows)],
    }
)

from pyspark.sql.types import IntegerType, ArrayType, StructType, StructField, FloatType
schema = StructType([
    StructField("c1", ArrayType(FloatType())),
    StructField("c2", ArrayType(FloatType())),
    StructField("c3", ArrayType(FloatType())),
    StructField("target", ArrayType(IntegerType()))
])
sdf = spark.createDataFrame(df, schema=schema)
sdf = sdf.repartition(5)
sdf.show(4)

inputs = []
outputs = []
for shape in input_shapes:
    input_layer = Input(shape=(shape,))
    x = Dense(24, activation='relu')(input_layer)
    x = Dropout(0.2)(x)
    output_layer = Dense(24, activation='relu')(x)
    inputs.append(input_layer)
    outputs.append(output_layer)

combined_input = concatenate(outputs)
z = Dense(256, activation='relu')(combined_input)
z = Dropout(0.2)(z)
z = Dense(256, activation='relu')(z)
final_output = Dense(n_outcomes, activation='softmax')(z)
model = Model(inputs=inputs, outputs=final_output)

model.compile(
    loss='categorical_crossentropy',
    optimizer=Adam(),
    experimental_run_tf_function=False,
)

is_local_mode = True

if is_local_mode:
    cwd = os.getcwd()
    spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, f"file://{cwd}/cache")
else:
    spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")

converter_train = make_spark_converter(sdf)
BATCH_SIZE = 10
NUM_EPOCHS = 5

with converter_train.make_tf_dataset(batch_size=BATCH_SIZE) as train_dataset:
    train_dataset = train_dataset.map(lambda x: (x[:-1], x[-1]))
    steps_per_epoch = len(converter_train) // BATCH_SIZE
    print([i for i in train_dataset.take(1)])

    hist = model.fit(train_dataset,
                     steps_per_epoch=steps_per_epoch,
                     epochs=NUM_EPOCHS,
                     verbose=2)

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions