Machine Learning At Scale With Apache Spark MLlib Python Example

June 30, 2019

Machine Learning At Scale With Apache Spark MLlib Python Example

For most of their history, computer processors became faster every year. Unfortunately, this trend in hardware stopped around 2005. Due to limits in heat dissipation, hardware developers stopped increasing the clock frequency of individual processors and opted for parallel CPU cores. This is fine for playing video games on a desktop computer. However, when it involves processing petabytes of data, we have to go a step further and pool the processing power from multiple computers together in order to complete tasks in any reasonable amount of time. The need for horizontal scaling led to the Apache Hadoop project. Apache Hadoop provides a way of breaking up a given task, concurrently executing it across multiple nodes inside of a cluster and aggregating the result.

Apache Spark began at UC Berkeley AMPlab in 2009. At the time, Hadoop MapReduce was the dominant parallel programming engine for clusters. The AMPlab created Apache Spark to address some of the drawbacks to using Apache Hadoop. One of the most notable limitations of Apache Hadoop is the fact that it writes intermediate results to disk. In contrast, Spark keeps everything in memory and in consequence tends to be much faster. In 2013, the project had grown to widespread use, with more than 100 contributors from more than 30 organizations outside UC Berkeley. The AMPlab contributed Spark to the Apache Software Foundation. The early AMPlab team also launched a company, Databricks, to improve the project.

Although Python libraries such as scikit-learn are great for Kaggle competitions and the like, they are rarely used, if ever, at scale. In my own personal experience, I’ve run in to situations where I could only load a portion of the data since it would otherwise fill my computer’s RAM up completely and crash the program. Spark has the ability to perform machine learning at scale with a built-in library called MLlib. The MLlib API, although not as inclusive as scikit-learn, can be used for classification, regression and clustering problems. In the proceeding article, we’ll train a machine learning model using the traditional scikit-learn/pandas stack and then repeat the process using Spark.

Jupyter Notebook

import pandas as pd

In the proceeding example, we’ll attempt to predict whether an adult’s income exceeds $50K/year based on census data. The data can be downloaded from the UC Irvine Machine Learning Repository.

The dataset we’re working with contains 14 features and 1 label. A header isn’t included in the csv file by default, therefore, we must define the column names ourselves.

column_names = [  
    'age',  
    'workclass',  
    'fnlwgt',  
    'education',  
    'education-num',  
    'marital-status',  
    'occupation',  
    'relationship',  
    'race',  
    'sex',  
    'capital-gain',  
    'capital-loss',  
    'hours-per-week',  
    'native-country',  
    'salary'  
]
train_df = pd.read_csv('adult.data', names=column_names)
test_df = pd.read_csv('adult.test', names=column_names)

You’ll notice that every feature is separated by a comma and a space. Although Pandas can handle this under the hood, Spark cannot. Therefore, we remove the spaces. In addition, we remove any rows with a native country of Holand-Neitherlands from our training set because there aren’t any instances in our testing set and it will cause issues when we go to encode our categorical variables. We save the resulting dataframe to a csv file so that we can use it at a later point.

train_df = train_df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
train_df_cp = train_df.copy()
train_df_cp = train_df_cp.loc[train_df_cp['native-country'] != 'Holand-Netherlands']
train_df_cp.to_csv('train.csv', index=False, header=False)
test_df = test_df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
test_df.to_csv('test.csv', index=False, header=False)

Next, let’s take a look to see what we’re working with. The training set contains a little over 30 thousand rows.

print('Training data shape: ', train_df.shape)  
train_df.head()

On the other hand, the testing set contains a little over 15 thousand rows.

print('Testing data shape: ', test_df.shape)  
test_df.head()

Often times, we’ll have to handle missing data prior to training our model. The following line returns the number of missing values for each feature. Fortunately, the dataset is complete.

train_df.isnull().sum()

Categorical variables will have a type of object. Categorical variables must be encoded in order to be interpreted by machine learning models (other than decision trees).

train_df.dtypes.value_counts()

The following code prints the distinct number of categories for each categorical variable.

train_df.select_dtypes('object').apply(pd.Series.nunique, axis=0)

``` test_df.select_dtypes('object').apply(pd.Series.nunique, axis=0) ```

We manually encode salary to avoid having it create two columns when we perform one hot encoding. After transforming our data, every string is replaced with an array of 1s and 0s where the location of the 1 corresponds to a given category.

train_df['salary'] = train_df['salary'].apply(lambda x: 0 if x == ' <=50K' else 1)  
test_df['salary'] = test_df['salary'].apply(lambda x: 0 if x == ' <=50K' else 1)
train_df = pd.get_dummies(train_df)  
test_df = pd.get_dummies(test_df)
print('Training Features shape: ', train_df.shape)  
print('Testing Features shape: ', test_df.shape)

There is a discrepancy between the distinct number of native-country categories in the testing and training sets (the testing set doesn’t have a person whose native country is Holand). As a result, when we applied one hot encoding, we ended up with a different number of features. Before we can use logistic regression, we must ensure that the number of features in our training and testing sets match. We can do so by performing an inner join.

# Align the training and testing data, keep only columns present in both dataframes  
train_df, test_df = train_df.align(test_df, join = 'inner', axis = 1)
print('Training Features shape: ', train_df.shape)  
print('Testing Features shape: ', test_df.shape)

``` train_df.head() ```

Next, we break up the dataframes into dependent and independent variables.

X_train = train_df.drop('salary', axis=1)  
y_train = train_df['salary']
X_test = test_df.drop('salary', axis=1)  
y_test = test_df['salary']

We don’t need to scale variables for normal logistic regression as long as we keep units in mind when interpreting the coefficients. However, by default, the scikit-learn implementation of logistic regression uses L2 regularization. L2 regularization penalizes large values of all parameters equally. Hence, a feature for height in metres would be penalized much more than another feature in millimetres. Therefore, we scale our data, prior to sending it through our model.

from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range = (0, 1))
scaler.fit(X_train)  
X_train = scaler.transform(X_train)  
X_test = scaler.transform(X_test)

Finally, we can train our model and measure its performance on the testing set.

from sklearn.linear_model import LogisticRegression
lr = LogisticRegression()
lr.fit(X_train, y_train)
lr_pred = lr.predict(X_test)
from sklearn.metrics import accuracy_score
accuracy_score(y_test, lr_pred)

Pyspark

Let’s see how we could go about accomplishing the same thing using Spark. Depending on your preference, you can write Spark code in Java, Scala or Python. Given that most data scientist are used to working with Python, we’ll use that.

All of the code in the proceeding section will be running on our local machine. However, if we were to setup a Spark clusters with multiple nodes, the operations would run concurrently on every computer inside the cluster without any modifications to the code.

The easiest way to start using Spark is to use the Docker container provided by Jupyter. For simplicity, we create a docker-compose.yml file with the following content. Make sure to modify the path to match the directory that contains the data downloaded from the UCI Machine Learning Repository.

version: '2'  
services:  
  spark:  
    image: jupyter/pyspark-notebook:latest  
    ports:  
      - 8888:8888  
    volumes:  
      - /home/cory/kaggle/adult:/home/jovyan/work

Then, run the proceeding command.

docker-compose up

To access the Jupyter Notebook, open a browser and go to localhost:8888.

Go ahead and import the following libraries.

from pyspark import SparkConf, SparkContext  
from pyspark.sql import SparkSession  
from pyspark.ml.classification import LogisticRegression  
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler  
from pyspark.ml import Pipeline  
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

Prior, to doing anything else, we need to initialize a Spark session.

spark = SparkSession.builder.appName("Predict Adult Salary").getOrCreate()

Just like before, we define the column names which we’ll use when reading in the data.

schema = StructType([  
    StructField("age", IntegerType(), True),  
    StructField("workclass", StringType(), True),  
    StructField("fnlwgt", IntegerType(), True),  
    StructField("education", StringType(), True),  
    StructField("education-num", IntegerType(), True),  
    StructField("marital-status", StringType(), True),  
    StructField("occupation", StringType(), True),  
    StructField("relationship", StringType(), True),  
    StructField("race", StringType(), True),  
    StructField("sex", StringType(), True),  
    StructField("capital-gain", IntegerType(), True),  
    StructField("capital-loss", IntegerType(), True),  
    StructField("hours-per-week", IntegerType(), True),  
    StructField("native-country", StringType(), True),  
    StructField("salary", StringType(), True)  
])

Like Pandas, Spark provides an API for loading the contents of a csv file into our program. We use the files that we created in the beginning.

train_df = spark.read.csv('train.csv', header=False, schema=schema)
test_df = spark.read.csv('test.csv', header=False, schema=schema)

We can run the following line to view the first 5 rows.

train_df.head(5)

If, for whatever reason, you’d like to convert the Spark dataframe into a Pandas dataframe, you can do so. Personally, I find the output cleaner and easier to read.

train_df.limit(5).toPandas()

There are a couple of important dinstinction between Spark and Scikit-learn/Pandas which must be understood before moving forward.

  • Spark DataFrames are immutable. Thus, whenever we want to apply transformations, we must do so by creating new columns.
  • MLlib expects all features to be contained within a single column.

The proceeding code block is where we apply all of the necessary transformations to the categorical variables.

The StringIndexer class performs label encoding and must be applied before the OneHotEncoderEstimator which in turn performs one hot encoding. The VectorAssembler class takes multiple columns as input and outputs a single column whose contents is an array containing the values for all of the input columns.

categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']
indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]
encoder = OneHotEncoderEstimator(  
    inputCols=[indexer.getOutputCol() for indexer in indexers],  
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]  
)
assembler = VectorAssembler(  
    inputCols=encoder.getOutputCols(),  
    outputCol="categorical-features"  
)
pipeline = Pipeline(stages=indexers + [encoder, assembler])
train_df = pipeline.fit(train_df).transform(train_df)
test_df = pipeline.fit(test_df).transform(test_df)

Let’s view all the different columns that were created in the previous step.

train_df.printSchema()

After applying the transformations, we end up with a single column that contains an array with every encoded categorical variable.

df = train_df.limit(5).toPandas()  
df['scaled-categorical-features'][1]

We combine our continuous variables with our categorical variables into a single column.

continuous_variables = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']
assembler = VectorAssembler(  
    inputCols=['categorical-features', *continuous_variables],  
    outputCol='features'  
)
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

Let’s take a look at the final column which we’ll use to train our model. As you can see it outputs a SparseVector. To save space, sparse vectors do not contain the 0s from one hot encoding.

train_df.limit(5).toPandas()['features'][0]

Finally, we encode our target label.

indexer = StringIndexer(inputCol='salary', outputCol='label')
train_df = indexer.fit(train_df).transform(train_df)
test_df = indexer.fit(test_df).transform(test_df)
train_df.limit(10).toPandas()['label']

We fit and train our model.

lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_df)

The transform method is used to make predictions for the testing set.

pred = model.transform(test_df)
pred.limit(10).toPandas()[['label', 'prediction']]

Final Thoughts

Spark is a distributed computing platform which can be used to perform operations on dataframes and train machine learning models at scale.


Profile picture

Written by Cory Maklin Genius is making complex ideas simple, not making simple ideas complex - Albert Einstein You should follow them on Twitter