import sys
if "google.colab" in sys.modules:
= True
IS_COLAB from google.colab import auth as google_auth
google_auth.authenticate_user()else:
# If you are running this notebook locally, replace the string below with the
# path to your service account key and run this cell to authenticate your GCP
# account.
%env GOOGLE_APPLICATION_CREDENTIALS ''
Overview
This post is about training a Matrix Factorization model with BigQuery ML and deploying it as Docker container. The end-to-end process is orchestrated through a Vertex AI pipeline.
The post is strongly based on this tutorial: - YouTube: Recommendation Engine Pipeline with BigQuery ML and Vertex AI Pipelines using Matrix Factorization
But there are some key differences:
- This post documents the whole process, from loading the data into BigQuery to how to make recommendations in different ways.
- On July 5th there was a Transition to BigQuery editions which resulted in some changes being made to adapt the scripts shown in the original video.
- When trying to replicate the video tutorial I had to solve some issues with the pipeline failing to run. Most of the issues were very hard to debug, with misleading error messages and layers over layer of abstraction between python libraries and component definitions. At the end most of the errors were about the Default Service Account not having the requiered permission. So,
- In this post we can see and run each step and command to create a Service Account and grant this account granular permissions to the Google Cloud resources needed for the end to end process to run. This is what the Google documentation recommends.
- There are also the commands to enable each service API needed for the project.
At the end we shoud get a flow like this:
Additional Resources
- YouTube: Lesson 7: Practical Deep Learning for Coders 2022 - Collaborative filtering deep dive
- Tutorial: Use BigQuery ML to make recommendations from Google analytics data
This notebook is better suited for running in Colab.
Blog Post Parts
Part 1. Train and Deploy
- Loading MovieLens data into BigQuery
- Setup services and accounts for the Vertex AI Pipeline to run
- Create and run the Pipeline and its Components
- Make sure no reservation or slot assignment remains active
Part 2. Inference
- Use the Cloud Run endpoint to get recommendations
- Get recommendations from BigQuery ML
- Get Weights and Bias (Embeddings) and do the math
Before you begin
- Select or create a Google Cloud project.
- Make sure that billing is enabled for your project.
- You can run the code locally or in Colab. If you are locally you need to install the gcloud CLI.
Loading data into BigQuery
Reference: Load the Movielens dataset into BigQuery
Authenticate your Google Cloud account
Set your project ID
= "[your-project-id]" # @param {type:"string"} PROJECT_ID
Download MovieLens 1M movie ratings dataset
! curl -O 'http://files.grouplens.org/datasets/movielens/ml-1m.zip'
! unzip -o ml-1m.zip
Change te existing ::
delimiter to comma and save the files as .csv
:
! sed 's/::/,/g' ml-1m/ratings.dat > ratings.csv
! sed 's/::/@/g' ml-1m/movies.dat > movie_titles.csv
Create BigQuery datasets and populate the tables
Define the names of the datasets.
= "[bq-model-dataset]" # @param {type:"string"}
MODEL_DATASET = "[bq-data-dataset]" # @param {type:"string"} MOVIELENS_DATASET
# To store the model
! bq mk --location=US --dataset {PROJECT_ID}:{MODEL_DATASET}
# To store movies and reviews tables
! bq mk --location=US --dataset {PROJECT_ID}:{MOVIELENS_DATASET}
Create and populate the tables movielens_1m
and movie_titles
.
# Reviews table
! bq load --project_id={PROJECT_ID} --source_format=CSV {PROJECT_ID}:{MOVIELENS_DATASET}.movielens_1m ratings.csv user_id:INT64,item_id:INT64,rating:FLOAT64,timestamp:TIMESTAMP
# Movies table
! bq load --project_id={PROJECT_ID} --source_format=CSV --field_delimiter=@ {PROJECT_ID}:{MOVIELENS_DATASET}.movie_titles movie_titles.csv movie_id:INT64,movie_title:STRING,genre:STRING
Vertex AI Pipeline. Install Libraries and Setup Services and Accounts
Install Libraries
%%capture
! pip install google-cloud-aiplatform==1.21.0 --upgrade
! pip install kfp==2.0.1 --upgrade
! pip install google-cloud-pipeline-components==2.0.0 --upgrade
Restart the Kernel
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython
= IPython.Application.instance()
app True) app.kernel.do_shutdown(
Set Project Variables
= "[your-project-id]" # @param {type:"string"}
PROJECT_ID = "us-central1" # @param {type: "string"}
REGION
= "[pipeline-bucket]" # @param {type: "string"}
BUCKET_NAME
= f"gs://{BUCKET_NAME}/"
PIPELINE_ROOT
= PIPELINE_ROOT + "recommender_model" # @param {type: "string"} MODEL_DIR
Authenticate your Google Cloud account (again)
Since the kernel was restarted, authenticate again.
import sys
if "google.colab" in sys.modules:
= True
IS_COLAB from google.colab import auth as google_auth
google_auth.authenticate_user()else:
# If you are running this notebook locally, replace the string below with the
# path to your service account key and run this cell to authenticate your GCP
# account.
%env GOOGLE_APPLICATION_CREDENTIALS <path-to-json-credential>
! gcloud config set project {PROJECT_ID}
Enable Service APIs
Enabling the APIs with commands instead of doing it in the UI keeps everythong documented. For this project the following services are needed: - Identity and Access Management (IAM) API - Vertex AI API - Cloud Build API - BigQuery API (Although this should be enabled already if the datasets were created and the tables populated) - BigQuery Reservation API - Cloud Run Admin API - Cloud Storage API
! gcloud services enable iam.googleapis.com --project={PROJECT_ID}
! gcloud services enable aiplatform.googleapis.com --project={PROJECT_ID}
! gcloud services enable cloudbuild.googleapis.com --project={PROJECT_ID}
! gcloud services enable bigquery.googleapis.com --project={PROJECT_ID}
! gcloud services enable bigqueryreservation.googleapis.com --project={PROJECT_ID}
! gcloud services enable run.googleapis.com --project={PROJECT_ID}
! gcloud services enable storage-component.googleapis.com --project={PROJECT_ID}
Create a Service Account for the pipeline to run
= "[your-service-account-id]" # @param {type:"string"} SERVICE_ACCOUNT_ID
If the following cell returns an error in Colab, run it in your local machine. Or create the account directly in the Google Cloud Console UI.
! gcloud iam service-accounts create {SERVICE_ACCOUNT_ID} --description="Vertex AI Pipeline Service Account" --display-name="vertex_service_account"
= f"{SERVICE_ACCOUNT_ID}@{PROJECT_ID}.iam.gserviceaccount.com" SERVICE_ACCOUNT
If not, use the Default Service Account instead
= ! gcloud projects describe {PROJECT_ID}
shell_output = shell_output[-1].split(":")[1].strip().replace("'", "")
PROJECT_NUMBER = f"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com" DEFAULT_SERVICE_ACCOUNT
import sys
= "google.colab" in sys.modules
IS_COLAB
if (
== ""
SERVICE_ACCOUNT_ID or SERVICE_ACCOUNT_ID is None
or SERVICE_ACCOUNT_ID == "[your-service-account-id]"
):# Get your service account from gcloud
if not IS_COLAB:
= !gcloud auth list 2>/dev/null
shell_output = shell_output[2].replace("*", "").strip()
DEFAULT_SERVICE_ACCOUNT
if IS_COLAB:
= ! gcloud projects describe {PROJECT_ID}
shell_output = shell_output[-1].split(":")[1].strip().replace("'", "")
PROJECT_NUMBER = f"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com"
DEFAULT_SERVICE_ACCOUNT
print("Service Account:", DEFAULT_SERVICE_ACCOUNT)
= DEFAULT_SERVICE_ACCOUNT SERVICE_ACCOUNT
Grant the Service Account granular permissions to GCP resources
The most challenging part of the project was figuring out how to give the service account the right granular permissions. I didn’t want to give the service account the Editor or Owner rol. It took me a while to find the right roles. As I mentioned some errors in the pipeline were because lack of permissions, but it was hard to troubleshoot as there wasn’t any mention to the actual rol needed.
Grant aiplatform.user
rol
= f"serviceAccount:{SERVICE_ACCOUNT}"
service_arg ! gcloud projects add-iam-policy-binding {PROJECT_ID} --member={service_arg} --role="roles/aiplatform.user"
Create the GCS bucket and assign storage.objectAdmin
rol
I first assigned storage.objectCreator
and objectViewer
roles. Took me hours to find out that the trained model couldn’t be exported without the rol of storage.objectAdmin
, which is needed to modify existing data in the bucket.
! gsutil mb -p {PROJECT_ID} -l {REGION} {PIPELINE_ROOT}
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectAdmin {PIPELINE_ROOT}
Cloud Run’s run.developer
rol
! gcloud projects add-iam-policy-binding {PROJECT_ID} --member={service_arg} --role="roles/run.developer"
Cloud Build’s cloudbuild.builds.editor
rol
I thought that Cloud Run developer rol was enough for the model to be deployed. Took me a while to find out the service account needed a permission from Cloud Build service also.
! gcloud projects add-iam-policy-binding {PROJECT_ID} --member={service_arg} --role="roles/cloudbuild.builds.editor"
Assign roles to Cloud Build’s service account
When Cloud Build’s API is enabled, its service account is automatically created.
By default – for security reasons – the Cloud Build Service Account does not have the permissions to manage Cloud Run. Google Cloud Build + Google Cloud Run
First lets grab the default Cloud Build and Compute Engine service accounts.
# Cloud Build default service account
= f"{PROJECT_NUMBER}@cloudbuild.gserviceaccount.com"
cloud_build_sa = f"serviceAccount:{cloud_build_sa}"
service_build_arg # Compute Engine default service account
= f"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com" compute_engine_sa
! gcloud projects add-iam-policy-binding {PROJECT_ID} --member={service_build_arg} --role="roles/run.admin"
And the last part was to “Grant the IAM Service Account User role to the Cloud Build service account on the Cloud Run runtime service account”.
Meaning that Cloud Build service account is going to be able to impersonate Cloud Run runtime service account, which is Compute Engine default service account. More on impersonation here: Youtube: Service Account Impersonation in Google Cloud - IAM in GCP
! gcloud iam service-accounts add-iam-policy-binding {compute_engine_sa} --member={service_build_arg} --role="roles/iam.serviceAccountUser"
And that’s it, all services are enabled and the service accounts has the granular permission for the pipeline to run.
Vertex AI Pipeline - Create and Run the Pipeline
Import Libraries
import kfp
from typing import NamedTuple
from kfp.dsl import (
pipeline, component, OutputPath, InputPath, Model,
Input, Artifact, Output, Metrics
)from kfp import compiler
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
Initialize Vertex AI SDK for Python
=PROJECT_ID, location=REGION) aiplatform.init(project
Declare Pipeline Components
Create BigQuery Reservation Component
@component(packages_to_install=["google-cloud-bigquery-reservation==1.11.2"])
def create_reservation(
str,
project: str,
location: int,
commitment_slots: str
reservation_id: -> NamedTuple("outputs", [("reservation_name", str), ("assignment_name", str)]):
) """
Create BigQuery Reservation and Assignment
"""
print('project', project, 'location', location, 'commitment_slots', commitment_slots, 'reservation_id', reservation_id)
# import libraries
import time
from google.cloud.bigquery_reservation_v1 import (
CapacityCommitment, Reservation, Assignment, ReservationServiceClient
)
= ReservationServiceClient()
reservation_client = f"projects/{project}/locations/{location}"
parent_arg
# Reservation (autoscaling)
= Reservation.Autoscale(current_slots=0, max_slots=commitment_slots) # NEW Jul-05 Update
autosc #reservation_slots = commitment_slots
= 0
slot_capacity = Reservation(
reservation_config #slot_capacity=reservation_slots,
=slot_capacity, autoscale=autosc, # NEW Jul-05 Update
slot_capacity='ENTERPRISE', # NEW Jul-05 Update (Could be "STANDARD" ?)
edition=False
ignore_idle_slots
)= reservation_client.create_reservation(
reservation =parent_arg, reservation_id=reservation_id, reservation=reservation_config
parent
)= reservation.name
reservation_name print('reservation_name', reservation_name)
# Assignment
print("Creating Assignment...")
= Assignment(
assignment_config ='QUERY', assignee='projects/{}'.format(project)
job_type
)= reservation_client.create_assignment(
assignment =reservation_name, assignment=assignment_config
parent
)= assignment.name
assignment_name print('assignment_name', assignment_name)
# it can take a lot for the slots to be available
print("Waiting for 300 seconds...")
300)
time.sleep(
return reservation_name, assignment_name
After the reservation is created, the account is going to be billed by time and number of slots. It can cost about $3 for each pipeline run, but you need to be very cautious about the pipeline failing before the reservation is deleted. To be sure the reservation is deleted you can go to https://console.cloud.google.com/bigquery/admin/reservations and select the project of interest. Check that SLOT RESERVATIONS and SLOT COMMITMENTS are empty and look like this:
SLOT RESERVATIONS
SLOT COMMITMENTS
If not empty, delete them in the UI. Below you can find some scripts you can run to delete the reservations in case the pipeline fails before doing it.
Delete Reservation Component
@component(packages_to_install=["google-cloud-bigquery-reservation==1.11.2"])
def delete_reservation(
str,
reservation_name: str
assignment_name:
):# import libraries
from google.cloud.bigquery_reservation_v1 import ReservationServiceClient
# Delete Assignment, Reservation and Capacity
= ReservationServiceClient()
reservation_client =assignment_name)
reservation_client.delete_assignment(name=reservation_name)
reservation_client.delete_reservation(name
print('reservation_name', reservation_name, 'deleted')
print('assignment_name', assignment_name, 'deleted')
Log Eval Metrics Component
@component()
def log_eval_metrics(
eval_metrics: Input[Artifact], metrics: Output[Metrics]-> dict:
) # import libraries
import math
= eval_metrics.metadata
metadata for r in metadata["rows"]:
= r["f"]
rows = metadata["schema"]["fields"]
schema = {}
output for metric, value in zip(schema, rows):
= metric["name"]
metric_name = float(value["v"])
val = val
output[metric_name]
metrics.log_metric(metric_name, val)
print(output)
Deploy Model Component
The names in this function are hard coded. It assumes you keep recommender_model
as the model dir. If it was changed previously, change it here accordingly. Also edit [PIPELINE_ROOT] with your chosen name.
@component(packages_to_install=[
"google-cloud-build==3.18.0",
"google-api-python-client==2.93.0"])
def deploy_recommendations_model(
str,
artifact_uri: str
project:
):# import libraries
from google.cloud.devtools import cloudbuild
from googleapiclient.discovery import build
# Deploy Model
= cloudbuild.CloudBuildClient()
client = cloudbuild.Build()
build
# Fill [PIPELINE_ROOT] with your selected parameter
# If you didn't keep `recommender_model` change it accordingly
= [
build.steps "name": "gcr.io/cloud-builders/gsutil",
{ "args": ["cp", "-r", '[PIPELINE_ROOT]/recommender_model', "."] },
"name": "gcr.io/cloud-builders/gsutil",
{ "args": ["cp", "-r", '[PIPELINE_ROOT]/dockerfile/Dockerfile/', "Dockerfile"] },
"name": "gcr.io/cloud-builders/docker",
{ "args": ["build", "-t", f"gcr.io/{project}/recommender_model", "." ] },
"name": "gcr.io/cloud-builders/docker",
{ "args": ["push", f"gcr.io/{project}/recommender_model"] },
"name": "gcr.io/cloud-builders/gcloud",
{ "args": [
"run", "deploy",
"recommender-model", "--image", f"gcr.io/{project}/recommender_model",
"--region", "us-central1", "--platform", "managed", "--memory", "500Mi",
"--allow-unauthenticated", "--max-instances", "5", "--port", "8501"
]
}
]
= client.create_build(project_id=project, build=build)
operation # Print the in-progress operation
print("IN PROGRESS:")
print(operation.metadata)
= operation.result()
result # Print the completed status
print("RESULT:", result.status)
Create the Docker file and copy it to GCS
Keep recommender_model
or change it accordingly.
%%writefile Dockerfile
/serving:2.4.4
FROM tensorflow"/usr/bin/env"]
ENTRYPOINT [=recommender_model
ENV MODEL_NAME=8501
ENV PORT/models/recommender_model/1
COPY recommender_model --port=8500 --rest_api_port=$PORT --model_base_path=/models/recommender_model --model_name=$MODEL_NAME CMD tensorflow_model_server
= PIPELINE_ROOT + 'dockerfile/'
docker_path ! gsutil cp Dockerfile {docker_path}
Pipeline and Job
Declare the Pipeline
@pipeline(
="bigquery-recommender-pipeline",
name=PIPELINE_ROOT + "bigquery-recommender-pipeline"
pipeline_root
)def recommendation_pipeline(
str,
artifact_uri: str
display_name:
):
# import libraries
from google_cloud_pipeline_components.v1.bigquery import (
BigqueryCreateModelJobOp,
BigqueryEvaluateModelJobOp,
BigqueryExportModelJobOp
)
# NODE create-reservation
= PROJECT_ID
project = "us"
location = 100
slots = "matrix-factorization-reservation"
reservation_id
= create_reservation(
create_reservation_task =project, location=location, commitment_slots=slots, reservation_id=reservation_id
project
)
# NODE create-model-task
# Query for training
= 60
num_factors = f"""
q CREATE OR REPLACE MODEL {MODEL_DATASET}.model_00
OPTIONS
(model_type='matrix_factorization',
user_col='user_id',
item_col='item_id',
RATING_COL='rating',
feedback_type='EXPLICIT',
l2_reg=9.83,
num_factors={num_factors}) AS
SELECT user_id, item_id, rating FROM {MOVIELENS_DATASET}.movielens_1m
"""
= BigqueryCreateModelJobOp(
create_model_task =project,
project=location,
location=q,
query
).after(create_reservation_task)
# NODE evaluate-model-task
= BigqueryEvaluateModelJobOp(
bq_evaluate_task =project, location=location, model=create_model_task.outputs["model"]
project
).after(create_model_task)
# NODE delete-reservation
= delete_reservation(
delete_reservation_task =create_reservation_task.outputs["reservation_name"],
reservation_name=create_reservation_task.outputs["assignment_name"]
assignment_name
).after(bq_evaluate_task)
# NODE log-eval-metrics
= bq_evaluate_task.outputs["evaluation_metrics"]
bqml_eval_metrics_raw = log_eval_metrics(
log_eval_metrics_task =bqml_eval_metrics_raw
eval_metrics
)
# NODE export-bq-model-to-gcs
= BigqueryExportModelJobOp(
bq_export_task =project,
project=location,
location=create_model_task.outputs["model"],
model=artifact_uri,
model_destination_path
).after(create_model_task)
# NODE deploy-model-cloud-run
= deploy_recommendations_model(
deploy_recommendations_model_task =artifact_uri,
artifact_uri=project
project ).after(bq_export_task)
Compile the Pipeline
compile(
compiler.Compiler().=recommendation_pipeline,
pipeline_func="recommendation_pipeline.json"
package_path )
Create and Run the Job
= pipeline_jobs.PipelineJob(
job =False,
enable_caching="recommendation-pipeline",
display_name="recommendation_pipeline.json",
template_path={
parameter_values"artifact_uri": MODEL_DIR,
"display_name": "recommendation",
} )
job.run(=SERVICE_ACCOUNT,
service_account=False
sync )
The pipeline can be monitored here: - GCP COnsole: Vertex AI Pipelines
And if all went fine, you must see a beautiful picture like this:
Make sure no reservation or slot assignment is active
If the pipeline fails before deleting the reservation, this command lists the reservations, if any.
! bq ls --reservation --project_id={PROJECT_ID} --location=us
And this command shows the assignment.
! bq show --project_id={PROJECT_ID} --location=us --reservation_assignment --job_type=QUERY --assignee_id={PROJECT_ID} --assignee_type=PROJECT
Delete any reservation and assignment
= ! bq show --project_id={PROJECT_ID} --location=us --reservation_assignment --job_type=QUERY --assignee_id={PROJECT_ID} --assignee_type=PROJECT
shell_output
= shell_output[2].split(' ')[2].split('.')[-1]
assignment_id = 'matrix-factorization-reservation' # As declared in the pipeline
reservation_id
# remove assignment
! bq rm --project_id={PROJECT_ID} --location=us --reservation_assignment {reservation_id}.{assignment_id}
# remove reservation
! bq rm --project_id={PROJECT_ID} --location=us --reservation {reservation_id}
Monitor the process
- https://console.cloud.google.com/cloud-build/builds?project=[your-project-id]
- https://console.cloud.google.com/run?project=[your-project-id]
- https://console.cloud.google.com/vertex-ai/pipelines/runs?project=[your-project-id]
Conclusions
An that’s about it. The automated process for training an deploying the model using BigQuery ML and Vertex AI pipeline. In Part 2 we will see how to do inference and how to get insight from the embeddings (latent factors) learned by the model for each item and user.