Exploring Amazon SageMaker Processing
Collecting and labeling data samples is only the first step in preparing a dataset. Indeed, it's very likely that you'll have to pre-process your dataset in order to do the following, for example:
Convert it to the input format expected by the machine learning algorithm you're using.
Rescale or normalize numerical features.
Engineer higher-level features, for example, one-hot encoding.
Clean and tokenize text for natural language processing applications.
And more!
Once training is complete, you may want to run additional jobs to post-process the predicted data and to evaluate your model on different datasets.
In this section, you'll learn about Amazon SageMaker Processing, a SageMaker capability that helps you run batch jobs related to your machine learning project.
Discovering the Amazon SageMaker Processing API
The Amazon SageMaker Processing API is part of the SageMaker SDK, which we already installed in Chapter 1, Introducing Amazon SageMaker. Its documentation is available at https://sagemaker.readthedocs.io.
SageMaker Processing provides you with a built-in Docker container that can run Python batch jobs written with scikit-learn (https://scikit-learn.org). You can also use your own container if you'd like. Logs are available in Amazon CloudWatch Logs in the /aws/sagemaker/ProcessingJobs log group.
Let's first see how we can use scikit-learn and SageMaker Processing to prepare a dataset for training.
Note:
You can run this example, and all future examples, either on your local machine, on a Notebook instance, or in SageMaker Studio. Please make sure to enable the appropriate conda or virtualenv environment, as explained in Chapter 1, Getting Started with Amazon SageMaker .
For the rest of the book, I also recommend that you follow along and run the code available in the companion GitHub repository. Every effort has been made to check all code samples present in the text. However, for those of you who have an electronic version, copying and pasting may have unpredictable results: formatting issues, weird quotes, and so on.
Processing a dataset with scikit-learn
Here's the high-level process:
Upload your unprocessed dataset to Amazon S3.
Write a script with scikit-learn in order to load the dataset, process it, and save the processed features and labels.
Run this script with SageMaker Processing on managed infrastructure.
Uploading the dataset to Amazon S3
First, we need a dataset. We'll use the direct marketing dataset published by S. Moro, P. Cortez, and P. Rita in "A Data-Driven Approach to Predict the Success of Bank Telemarketing", Decision Support Systems, Elsevier, 62:22-31, June 2014.
This dataset describes a binary classification problem: will a customer accept a marketing offer, yes or no? It contains a little more than 41,000 labeled customer samples. Let's dive in:
Creating a new Jupyter notebook, let's first download and extract the dataset:
%%sh
apt-get -y install unzip # Only needed in SageMaker Studio
wget -N https://sagemaker-sample-data-us-west-2.s3-us-west-2.amazonaws.com/autopilot/direct_marketing/bank-additional.zip
unzip -o bank-additional.zip
Then, we load it with pandas:
import pandas as pd
data = pd.read_csv('./bank-additional/bank-additional-full.csv')
print(data.shape)(41188, 21)
Now, let's display the first five lines:
data[:5]
This prints out the table visible in the following figure:
Scrolling to the right, we can see a column named y, storing the labels.
Now, let's upload the dataset to Amazon S3. We'll use a default bucket automatically created by SageMaker in the region we're running in. We'll just add a prefix to keep things nice and tidy:
import sagemaker
prefix = 'sagemaker/DEMO-smprocessing/input'
input_data = sagemaker.Session().upload_data(path='./bank-additional/bank-additional-full.csv', key_prefix=prefix)
Writing a processing script with scikit-learn
As SageMaker Processing takes care of all infrastructure concerns, we can focus on the script itself. We don't have to worry about Amazon S3 either: SageMaker Processing will automatically copy the input dataset from S3 into the container, and the processed datasets from the container to S3.
Container paths are provided when we configure the job itself. Here's what we'll use:
The input dataset: /opt/ml/processing/input
The processed training set: /opt/ml/processing/train
The processed test set: /opt/ml/processing/test
In our Jupyter environment, let's start writing a new Python file named preprocessing.py. As you would expect, this script will load the dataset, perform basic feature engineering, and save the processed dataset:
First, we read our single command-line parameter with the argparse library (https://docs.python.org/3/library/argparse.html): the ratio for the training and test datasets. The actual value will be passed to the script by the SageMaker Processing SDK:
import argparse
parser = argparse.ArgumentParser()parser.add_argument('--train-test-split-ratio', type=float, default=0.3)args, _ = parser.parse_known_args()print('Received arguments {}'.format(args))split_ratio = args.train_test_split_ratio
We load the input dataset using pandas. At startup, SageMaker Processing automatically copied it from S3 to a user-defined location inside the container; here, it is /opt/ml/processing/input:
import os import pandas as pd
input_data_path = os.path.join('/opt/ml/processing/input', 'bank-additional-full.csv')
df = pd.read_csv(input_data_path)
Then, we remove any line with missing values, as well as duplicate lines:
df.dropna(inplace=True)df.drop_duplicates(inplace=True)
Then, we count negative and positive samples, and display the class ratio. This will tell us how unbalanced the dataset is:
one_class = df[df['y']=='yes']one_class_count = one_class.shape[0]zero_class = df[df['y']=='no']zero_class_count = zero_class.shape[0]zero_to_one_ratio = zero_class_count/one_class_count print("Ratio: %.2f" % zero_to_one_ratio)
Looking at the dataset, we can see a column named pdays, telling us how long ago a customer has been contacted. Some lines have a 999 value, and that looks pretty suspicious: indeed, this is a placeholder value meaning that a customer has never been contacted. To help the model understand this assumption, let's add a new column stating it explicitly:
import numpy as np
df['no_previous_contact'] = np.where(df['pdays'] == 999, 1, 0)
In the job column, we can see three categories (student, retired, and unemployed) that should probably be grouped to indicate that these customers don't have a full-time job. Let's add another column:
df['not_working'] = np.where(np.in1d(df['job'], ['student', 'retired', 'unemployed']), 1, 0)
Now, let's split the dataset into training and test sets. Scikit-learn has a convenient API for this, and we set the split ratio according to a command-line argument passed to the script:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split( df.drop('y', axis=1), df['y'], test_size=split_ratio, random_state=0)
The next step is to scale numerical features and to one-hot encode the categorical features. We'll use StandardScaler for the former, and OneHotEncoder for the latter:
from sklearn.compose import make_column_transformer from sklearn.preprocessing import StandardScaler,OneHotEncoder
preprocess = make_column_transformer( (['age', 'duration', 'campaign', 'pdays', 'previous'], StandardScaler()), (['job', 'marital', 'education', 'default', 'housing', 'loan','contact', 'month', 'day_of_week', 'poutcome'], OneHotEncoder(sparse=False)))
Then, we process the training and test datasets:
train_features = preprocess.fit_transform(X_train)
test_features = preprocess.transform(X_test)
Finally, we save the processed datasets, separating the features and labels. They're saved to user-defined locations in the container, and SageMaker Processing will automatically copy the files to S3 before terminating the job:
train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_features.csv')train_labels_output_path = os.path.join('/opt/ml/processing/train', 'train_labels.csv')
test_features_output_path = os.path.join('/opt/ml/processing/test', 'test_features.csv')test_labels_output_path = os.path.join('/opt/ml/processing/test', 'test_labels.csv')
pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False)pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False)
y_train.to_csv(train_labels_output_path, header=False, index=False)y_test.to_csv(test_labels_output_path, header=False, index=False)
That's it. As you can see, this code is vanilla scikit-learn, so it shouldn't be difficult to adapt your own scripts for SageMaker Processing. Now let's see how we can actually run this.
Running a processing script
Coming back to our Jupyter notebook, we use the SKLearnProcessor object from the SageMaker SDK to configure the processing job:
First, we define which version of scikit-learn we want to use, and what our infrastructure requirements are. Here, we go for an ml.m5.xlarge instance, an all-round good choice:
from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor( framework_version='0.20.0', role=sagemaker.get_execution_role(), instance_type='ml.m5.xlarge', instance_count=1)
Then, we simply launch the job, passing the name of the script, the dataset input path in S3, the user-defined dataset paths inside the SageMaker Processing environment, and the command-line arguments:
from sagemaker.processing import ProcessingInput, ProcessingOutput
sklearn_processor.run( code='preprocessing.py', inputs=[ProcessingInput( source=input_data, # Our data in S3 destination='/opt/ml/processing/input') ], outputs=[ ProcessingOutput( source='/opt/ml/processing/train', output_name='train_data'), ProcessingOutput( source='/opt/ml/processing/test', output_name='test_data' ) ], arguments=['--train-test-split-ratio', '0.2'])
As the job starts, SageMaker automatically provisions a managed ml.m5.xlarge instance, pulls the appropriate container to it, and runs our script inside the container. Once the job is complete, the instance is terminated, and we only pay for the amount of time we used it. There is zero infrastructure management, and we'll never leave idle instances running for no reason.
After a few minutes, the job is complete, and we can see the output of the script as follows:
Received arguments Namespace(train_test_split_ratio=0.2)Reading input data from /opt/ml/processing/input/bank-additional-full.csv Positive samples: 4639 Negative samples: 36537 Ratio: 7.88 Splitting data into train and test sets with ratio 0.2 Running preprocessing and feature engineering transformations Train data shape after preprocessing: (32940, 58)Test data shape after preprocessing: (8236, 58)Saving training features to /opt/ml/processing/train/train_features.csv Saving test features to /opt/ml/processing/test/test_features.csv Saving training labels to /opt/ml/processing/train/train_labels.csv Saving test labels to /opt/ml/processing/test/test_labels.csv
The following screenshot shows the log in CloudWatch:
Finally, we can describe the job and see the location of the processed datasets:
preprocessing_job_description = sklearn_processor.jobs[-1].describe()
output_config = preprocessing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']: print(output['S3Output']['S3Uri'])
This results in the following output:
s3://sagemaker-eu-west-1-123456789012/sagemaker-scikit-learn-2020-04-22-10-09-43-146/output/train_data
s3://sagemaker-eu-west-1-123456789012/sagemaker-scikit-learn-2020-04-22-10-09-43-146/output/test_data
In a terminal, we can use the AWS CLI to fetch the processed training set located at the preceding path, and take a look at the first sample and label:
$ aws s3 cp s3://sagemaker-eu-west-1-123456789012/sagemaker-scikit-learn-2020-04-22-09-45-05-711/output/train_data/train_features.csv .
$ aws s3 cp s3://sagemaker-eu-west-1-123456789012/sagemaker-scikit-learn-2020-04-22-09-45-05-711/output/train_data/train_labels.csv .
$ head -1 train_features.csv 0.09604515376959515,-0.6572847857673993,-0.20595554104907898,0.19603112301129622,-0.35090125695736246,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0
$ head -1 train_labels.csv no
Now that the dataset has been processed with our own code, we could use it to train a machine learning model. In real life, we would also automate these steps instead of running them manually inside a notebook.
Processing a dataset with your own code
In the previous example, we used a built-in container to run our scikit-learn code. SageMaker Processing also makes it possible to use your own container. Here's the high-level process:
Upload your dataset to Amazon S3.
Write a processing script with your language and library of choice: load the dataset, process it, and save the processed features and labels.
Define a Docker container that contains your script and its dependencies. As you would expect, the processing script should be the entry point of the container.
Build the container and push it to Amazon ECR (https://aws.amazon.com/ecr/), AWS' Docker registry service.
Using your container, run your script on the infrastructure managed by Amazon SageMaker.
Here are some additional resources if you'd like to explore SageMaker Processing:
A primer on Amazon ECR: https://docs.aws.amazon.com/AmazonECR/latest/userguide/what-is-ecr.html
Documentation on building your own container: https://docs.aws.amazon.com/sagemaker/latest/dg/build-your-own-processing-container.html
A full example based on Spark MLlib: https://github.com/awslabs/amazon-sagemaker-examples/tree/master/sagemaker_processing/feature_transformation_with_sagemaker_processing.
Additional notebooks: https://github.com/awslabs/amazon-sagemaker-examples/tree/master/sagemaker_processing.
As you can see, SageMaker Processing makes it really easy to run data processing jobs. You can focus on writing and running your script, without having to worry about provisioning and managing infrastructure.