A Cloud Run service in Go calling a Workflows callback endpoint

It’s all Richard Seroter’s fault, I ended up dabbling with Golang! We were chatting about a use case using Google Cloud Workflows and a Cloud Run service implemented in Go. So it was the occasion to play a bit with Go. Well, I still don’t like error handling… But let’s rewind the story a bit!


Workflows is a fully-managed service/API orchestrator on Google Cloud. You can create some advanced business workflows using YAML syntax. I’ve built numerous little projects using it, and blogged about it. I particularly like its ability to pause a workflow execution, creating a callback endpoint that you can call from an external system to resume the execution of the workflow. With callbacks, you’re able to implement human validation steps, for example in an expense report application where a manager validates or rejects an expense from someone in their team (this is what I implemented in this article).


For my use case with Richard, we had a workflow that was creating such a callback endpoint. This endpoint is called from a Cloud Run service implemented in Go. Let’s see how to implement the workflow:


main:
    params: [input]
    steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - log_callback_creation: call: sys.log args: text: ${"Callback created, awaiting calls on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 86400 result: callback_request - log_callback_received: call: sys.log args: json: ${callback_request.http_request} - return_callback_request: return: ${callback_request.http_request}



The above workflow definition creates a callback endpoint. The URL of the callback endpoint is returned by that first step. Then the workflow is waiting for the callback endpoint to be called externally. The execution then resumes and logs some info about the incoming call and returns.


I deployed that workflow with a service account that has the Workflows Editor role, the Log Writer role (to log information), and the Service Account Token Creator role (to create OAuth2 tokens), as explained in the documentation.


Now let’s look at the Go service. I did a go mod init to create a new project. I created a main.go source file with the following content:


package main


import (
metadata "cloud.google.com/go/compute/metadata"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
)

The metadata module is used to fetch an OAuth2 token from the Cloud Run metadata server.


// OAuth2 JSON struct
type OAuth2TokenInfo struct {
// defining struct variables
Token      string `json:"access_token"`
TokenType  string `json:"token_type"`
Expiration uint32 `json:"expires_in"`
}

The metadata information in instance/service-accounts/default/token returns a JSON document that we map with the above struct. We’re interested in the access_token field, that we use further down to make the authenticated call to the Workflows callback endpoint.


func main() {
log.Print("Starting server...")
http.HandleFunc("/", handler)

// Determine port for HTTP service.

port := os.Getenv("PORT")

if port == "" {

port = "8080"

log.Printf("Defaulting to port %s", port)

} // Start HTTP server.

log.Printf("Listening on port %s", port)

if err := http.ListenAndServe(":"+port, nil); err != nil {

log.Fatal(err)

}

}




The main() function starts our Go service. Let’s now see the handler() function in more detail:


func handler(w http.ResponseWriter, r *http.Request) {
callbackUrl := r.URL.Query().Get("callback_url")
log.Printf("Callback URL: %s", callbackUrl)

We retrieve the ?callback_url query parameter that will contain our callback endpoint URL.


	// Fetch an OAuth2 access token from the metadata server
oauthToken, errAuth := metadata.Get("instance/service-accounts/default/token")
if errAuth != nil {
log.Fatal(errAuth)
}

Above, we make a call to the metadata server thanks to the metadata Go module. And then we unmarshall the returned JSON document in our previously defined struct, with the following code:


	data := OAuth2TokenInfo{}
errJson := json.Unmarshal([]byte(oauthToken), &data)
if errJson != nil {
fmt.Println(errJson.Error())
}
log.Printf("OAuth2 token: %s", data.Token)

Now it’s time to prepare the call to our workflow callback endpoint, with a POST request:


	workflowReq, errWorkflowReq := http.NewRequest("POST", callbackUrl, strings.NewReader("{}"))
if errWorkflowReq != nil {
fmt.Println(errWorkflowReq.Error())
}

We add the OAuth2 token as a bearer authorization via headers:


	workflowReq.Header.Add("authorization", "Bearer "+data.Token)
workflowReq.Header.Add("accept", "application/json")
workflowReq.Header.Add("content-type", "application/json")

client := &http.Client{}

workflowResp, workflowErr := client.Do(workflowReq)

if workflowErr != nil {

fmt.Printf("Error making callback request: %s\n", workflowErr)

}

log.Printf("Status code: %d", workflowResp.StatusCode)


fmt.Fprintf(w, "Workflow callback called. Status code: %d", workflowResp.StatusCode)

}



We simply return the status code at the end of our Go service.


To deploy the Go service, I simply used the source deployment approach, by running gcloud run deploy, and answering some questions (service name, region deployment, etc.) After a couple of minutes, the service is up and running.


I create a new execution of the workflow from the Google Cloud console. Once it’s started, it logs the callback endpoint URL. I copy its value, then I’m calling my Cloud Run service with the ?callback_url= query string pointing at that URL. And voila, the service resumes the execution of the workflow, and the workflow finishes.


Monitoring website changes with Workflows, Cloud Functions and SendGrid

Every year in France, around mid-September, there’s a special weekend where everyone can visit some famous places, usually closed the rest of the year. That’s “Journée du Patrimoine”. For example, you can visit places like the Elysée Palace or the Matignon Palace, if you want to see where the French president, or the French prime minister work. However, for some of those places, it’s tricky to register online to book a slot, as there’s always a high demand for them. Furthermore, you have to be there at the right time to register, and often, you don’t even know when that day or time is! So I thought I could monitor the website of the Elysée Palace to see when the registration would open, by tracking changes on the Elysée website.


To monitor web page or website changes, there are a ton of online services available. There are often some limitations to the number of free requests, or to the frequency of the change checks. Being a developer on Google Cloud, I decided to write a simple solution that would take advantage of various Google Cloud services, namely:

  • Workflows: to define the various steps of my site change workflow,

  • Cloud Scheduler: to call execute my workflow on a regular basis,

  • Cloud Functions: to compute a hash of the webpage, to see if the page changed,

  • Cloud Storage: to store the hashes,

  • SendGrid (not a Google Cloud product): to send me an email when changes have appeared,

  • Secret Manager: to store my SendGrid API key securely.


Let’s have a look first at a function that computes the hash of a webpage. As there’s no hash function in the Workflows standard library, I decided to use a function to do that job. I used the Node.js runtime, with the crypto module, which contains a sha1 implementation:


const crypto = require('crypto');

exports.checksum = (req, res) => {
const webpageBody = req.body.webpage; const shasum = crypto.createHash('sha1');
shasum.update(webpageBody); const sha1 = shasum.digest('hex'); res.status(200).send({sha1: sha1});
};

The function receives the web page content from the workflow. Then I create the sha1 hash with that content, and return it in hexadecimal form, in a small JSON payload.


I created a Google Cloud Storage bucket to contain my web page hashes:



Since I’m using SendGrid to notify me by email on changes, I store the API key securely in Secret Manager:



Now let’s zoom on our workflow, piece by piece.


First, I define some variables, like the name of my bucket, the name of my hashes text file, and I retrieve my SendGrid API key (see this previous article about using Secret Manager with Workflows):

main:
params: [input]
steps:
- assignment:
assign:
- bucket: hash_results
- file_name: hash.txt
- get_email_api_key:
call: googleapis.secretmanager.v1.projects.secrets.versions.accessString
args:
secret_id: SENDGRID_API_KEY
result: EMAIL_API_KEY

Then I read the content of the previous hash in GCS (you can also check this article on how to read and write JSON data to a file in a bucket from a workflow):

    - read_hash_from_gcs:
        call: http.get
        args:
            url: ${"https://storage.googleapis.com/download/storage/v1/b/" + bucket + "/o/" + file_name}
            auth:
                type: OAuth2
            query:
                alt: media
        result: hash_from_gcs

It’s time to make a simple HTTP GET call to the website. Currently, the URL is hard-coded, but we could parameterize the workflow to get that URL from the workflow execution input parameters instead.

    - retrieve_web_page:
call: http.get
args:
url: https://evenements.elysee.fr/
result: web_page

Once I retrieved the content of the URL (the result of that request is stored in the web_page variable), I can compute my hash, by calling my cloud function:

    - compute_hash:
call: http.post
args:
url: https://europe-west1-patrimoine-check.cloudfunctions.net/checksum
body:
webpage: ${web_page.body}
result: hash_result

That’s where we introduce some branching in the workflow. If the web page hasn’t changed we finish early, but if it has changed, then we’re going to store the new hash in GCS:

    - assign_hashes:
        assign:
            - old_hash: ${hash_from_gcs.body.sha1}
            - new_hash: ${hash_result.body.sha1}
            - hash_msg: ${"Old hash = " + old_hash + " / New hash = " + new_hash}
    - conditionalSwitch:
        switch:
        - condition: ${new_hash != old_hash}
          next: write_hash_to_gcs
        next: returnOutput
    - write_hash_to_gcs:
        call: http.post
        args:
            url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + bucket + "/o"}
            auth:
                type: OAuth2
            query:
                name: ${file_name}
            body:
                sha1: ${hash_result.body.sha1}

I log the fact the website has changed, and I’m calling the SendGrid API (like in this article on using SendGrid for sending emails from Workflows):

    - site_changed_log:
call: sys.log
args:
text: Website has changed
- notify_by_email:
call: http.post
args:
url: https://api.sendgrid.com/v3/mail/send
headers:
Content-Type: "application/json"
Authorization: ${"Bearer " + EMAIL_API_KEY}
body:
personalizations:
- to:
- email: me@gmail.com
from:
email: you@gmail.com
subject: "Elysée, page mise à jour"
content:
- type: text/plain
value: "La page de l'Élysée a été mise à jour"
- log_hashes:
call: sys.log
args:
text: ${hash_msg}
- returnOutput:
return: ${hash_msg}

The workflows need to be invoked at a regular interval of time. Workflows can be configured to be invoked on a schedule via Cloud Scheduler (again, check this article on scheduling workflow executions). I configured my workflow to be triggered every minute, with the * * * * * cron pattern.


And voila! I have my little workflow being invoked every minute to check if a web page has changed, and send me an email if so!


To be honest with you, the workflow worked perfectly… but the true story is that I wasn’t monitoring the right URL, I should have monitored the front page instead. Furthermore, the page I was monitoring included some dynamic JavaScript code, but the HTML fetched wasn’t really changing. I missed the registration window, and all the slots filled super rapidly before I even had the time to register my family for a visit! Shame on me, better check my URL next time, or create webpage screenshots with a headless Chrome running in Cloud Run or in Cloud Functions! Or, of course, use online services that have solved those problems with their years of experience! Hopefully, next year, I won’t miss the registration! But it was fun to glue together all those useful services from Google Cloud, to solve a concrete problem.

Schedule a workflow execution

There are different ways to launch the execution of a workflow. In previous articles, we mentioned that you can use the gcloud command-line tool to create an execution, you can also use the various client libraries to invoke Workflows, or use the REST API. A workflow itself can also invoke other workflows! 


But today, I’d like to tell you how to schedule the execution of a workflow. For that purpose, we’ll take advantage of Cloud Scheduler. The documentation is actually covering this topic in detail, so be sure to grab all the info there. However, I’ll go quickly through the steps, and tell you about a nice new feature in the cloud console to ease the scheduling of workflows!


First, you need to have both Workflows and Cloud Scheduler enabled:


gcloud services enable cloudscheduler.googleapis.com workflows.googleapis.com


Cloud Scheduler will need a service account with workflows.invoker role, to be allowed to call Workflows:


gcloud iam service-accounts create workflows_caller_sa
gcloud projects add-iam-policy-binding MY_PROJECT_ID \
  --member serviceAccount:workflows_caller_sa@MY_PROJECT_ID.iam.gserviceaccount.com \
  --role roles/workflows.invoker


Now it’s time to create the cron job:


gcloud scheduler jobs create http every_5_minute_schedule \
    --schedule="*/5 * * * *" \
    --uri="https://workflowexecutions.googleapis.com/v1/projects/MY_PROJECT_ID/locations/REGION_NAME/workflows/WORKFLOW_NAME/executions" \
    --message-body="{\"argument\": \"DOUBLE_ESCAPED_JSON_STRING\"}" \
    --time-zone="America/New_York" \
    --oauth-service-account-email="workflows_caller_sa@MY_PROJECT_ID.iam.gserviceaccount.com"


Here, you can see that Scheduler will run every 5 minutes (using the cron notation), and that it’s going to call the Workflows REST API to create a new execution. You can also pass an argument for the workflow input. 


The cool new feature I was eager to mention today was the direct integration of the scheduling as part of the Workflows creation flow, in the cloud console.


Now, when you create a new workflow, you can select a trigger:



Click on the “ADD NEW TRIGGER” button, and select “Scheduler”. A side panel on the right will show up, and you will be able to specify the schedule to create, directly integrated, instead of having to head over to the Cloud Scheduler product section:

And there, you can specify the various details of the schedule! It’s nice to see both products nicely integrated, to ease the flow of creating a scheduled workflow.

Using the Secret Manager connector for Workflows to call an authenticated service

Workflows allows you to call APIs, whether from or hosted on Google Cloud, or any external API in the wild. A few days ago, for example, we saw an example on how to use the SendGrid API to send emails from a workflow. However, in that article, I had the API key hard-coded into my workflow, which is a bad practice. Instead, we can store secrets in Secret Manager. Workflows has a specific connector for Secret Manager, and a useful method to access secrets.


In this article, we’ll learn two things:

  • How to access secrets stored in Secret Manager with the Workflows connector

  • How to call an API that requires basic authentication


Let's access the secrets I need to do my basic auth call to the API I need to call:


- get_secret_user:
    call: googleapis.secretmanager.v1.projects.secrets.versions.accessString
    args:
      secret_id: basicAuthUser
    result: secret_user


- get_secret_password:
    call: googleapis.secretmanager.v1.projects.secrets.versions.accessString
    args:
      secret_id: basicAuthPassword
    result: secret_password


The user login and password are now stored in variables that I can reuse in my workflow. I will create the Base64 encoded user:password string required to pass in the authorization header:


- assign_user_password:
    assign:
    - encodedUserPassword: ${base64.encode(text.encode(secret_user + ":" + secret_password))}


Equipped with my encoded user:password string, I can now call my API (here a cloud function) by added an authorization header with basic authentication (and return the output of the function):


- call_function:
    call: http.get
    args:
        url: https://europe-west1-workflows-days.cloudfunctions.net/basicAuthFn
        headers:
            Authorization: ${"Basic " + encodedUserPassword}
    result: fn_output
- return_result:

    return: ${fn_output.body}


Workflows has built-in OAuth2 and OIDC support for authenticating to Google hosted APIs, functions and Cloud Run services, but it’s also useful to know how to invoke other authenticated services, like those requiring basic auth, or other bearer tokens.

Load and use JSON data in your workflow from GCS

Following up the article on writing and reading JSON files in cloud storage buckets, we saw that we could access the data of the JSON file, and use it in our workflow. Let’s have a look at a concrete use of this.


Today, we’ll take advantage of this mechanism to avoid hard-coding the URLs of the APIs we call from our workflow. That way, it makes the workflow more portable across environments.


Let’s regroup the logic for reading and loading the JSON data in a reusable subworkflow:


read_env_from_gcs:
    params: [bucket, object]
    steps:
    - read_from_gcs:
        call: http.get
        args:
            url: ${"https://storage.googleapis.com/download/storage/v1/b/" + bucket + "/o/" + object}
            auth:
                type: OAuth2
            query:
                alt: media
        result: env_file_json_content
    - return_content:
        return: ${env_file_json_content.body}


You call this subworkflow with two parameters: the bucket name, and the object or file name that you want to load. 


Now let’s use it from the main workflow. We need a first step to call the subworkflow to load a specific file from a specific bucket. The subworkflow below will return the content of the JSON data in the env_details variable.


​​main:
    params: [input]
    steps:
    - load_env_details:
        call: read_env_from_gcs
        args:
            bucket: workflow_environment_info
            object: env-info.json
        result: env_details


Imagine the JSON file contains a JSON object with a SERVICE_URL key, pointing at the URL of a service, then you can call the service with the following expression: ${env_details.SERVICE_URL} as shown below.


    - call_service:
        call: http.get
        args:
            url: ${env_details.SERVICE_URL}
        result: service_result
    - return_result:
        return: ${service_result.body}


This is great for avoiding hardcoding certain values in your workflow definitions. However, for true environment-specific deployments, this is not yet ideal, as you would have to point to a different file in the bucket, or use a different bucket. And that information is currently hardcoded in the definition when you make the call to the subworkflow. But if you follow some naming conventions for the project names and bucket names, that map to environments, this can work! (ie. PROD_bucket vs DEV_bucket, or PROD-env-info.json vs DEV-env-info.json)


Let’s wait for the support of environment variables in Workflows!


 
© 2012 Guillaume Laforge | The views and opinions expressed here are mine and don't reflect the ones from my employer.