Get started with Velocity
Join the Waitlist
Join Our Discord
Blogs

Async Notifications in Go and React with DragonflyDB

Jeff Vincent
Jeff Vincent
  
November 24, 2023

In this post, we walk through the process of building an async notification service in a Golang microservice-based application that includes websockets, React and DragonflyDB and is deployed in Kubernetes. Then, once we have the project deployed in our cluster, we'll take a look at the streamlined, in-cluster development that Velocity makes possible straight from your local IDE.

Async Notifications in Go and React with DragonflyDB

DragonflyDB is a drop-in replacement for supercharging your Redis workloads. It works with any Redis SDK, and works just like Redis — caching, task queues, you name it — with a fraction of the compute overhead. Because of this, you can handle much higher volumes of throughput without the need to scale your Redis deployment, so you can reduce your cloud footprint and, with it, your overall deployment costs.

Today, we'll demonstrate this with a microservice app written in Go with a React frontend that leverages DragonflyDB to send async status notifications to the React frontend as shown in the diagram below.

What we're building

Specifically, the React frontend will accept a file upload, which will be sent to the Web-API service, and it will also open a websocket connection with a separate Notifications service. When the Web-API service receives the file, it creates a task queue in DragonflyDB for a given userID with the status “processing.” It then sends the file to a File-Processing service, which simulates a file-processing workflow, and then sends a status of “complete” to the same userID-based task queue in DragonflyDB.

As this is happening, the Notifications service is listening to the various task queues in DragonflyDB, and — via the existing websocket connection(s) with the browser — it relays the file processing status in real time to the user.

The full project is available in GitHub.

Topics we'll cover

The Go services

Web-API


package main

import (
	"bytes"
	"fmt"
	"io"
	"log"
	"mime/multipart"
	"net/http"
	"os"

	"github.com/gin-contrib/cors"
	"github.com/gin-gonic/gin"
	"github.com/go-redis/redis"
)

var (
	dragonflyQueuePrefix = "fileStatusQueue:"
	dragonflyStatus      = "processing"
	dragonflyHost        = os.Getenv("DRAGONFLYDB_HOST")
	dragonflyPort        = os.Getenv("DRAGONFLYDB_PORT")
	dragonflyAddr        = fmt.Sprintf("%s:%s", dragonflyHost, dragonflyPort)
	uploadAPIURL         = os.Getenv("UPLOAD_API_URL")
)

func main() {
	// Initialize Redis client
	client := redis.NewClient(&redis.Options{
		Addr:     dragonflyAddr,
		DB:       0,
	})

	// Initialize Gin router
	router := gin.Default()

	// Apply CORS middleware
	router.Use(cors.Default())

	// API route to handle file uploads
	router.POST("/api/upload", func(c *gin.Context) {
		// Get the file from the form data
		file, err := c.FormFile("file")
		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}

		// Get the userID from the form data
		userID := c.PostForm("userID")

		// Create the user-specific Redis queue key
		queueKey := dragonflyQueuePrefix + userID

		// Push the file upload task to the user-specific Redis queue
		err = client.RPush(queueKey, file.Filename).Err()
		if err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue file"})
			log.Println("Failed to enqueue file:", err)
			return
		}

		// Publish the "processing" status to the user-specific Redis queue
		err = client.RPush(queueKey, dragonflyStatus).Err()
		if err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish status"})
			log.Println("Failed to publish status:", err)
			return
		}

		// Send the file to the separate API
		go sendFileToAPI(file, userID)

		c.JSON(http.StatusOK, gin.H{"message": "File uploaded and processing"})
	})

	// Run the Gin server
	err := router.Run(":8000")
	if err != nil {
		log.Fatal(err)
	}
}

// Helper function to send the file to the separate API
func sendFileToAPI(file *multipart.FileHeader, userID string) {
// The body of this function has been omitted for brevity, 
// but is available in the repo
... 

	log.Println("File sent to API:", file.Filename)
}

In the above file, we connect to DragonflyDB, and define our Gin router. Then we create a single endpoint to handle a file upload and a userID as form data. From that, we parse the incoming file, and send it, along with the userID, to the file-processing service via an HTTP POST request. We also create a DragonflyDB task queue — the key of which is specific to the incoming userID.

We then send a “processing” status to the task queue, and log the success or failure of each of the above.

Notifications


package main

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"github.com/go-redis/redis"
	"github.com/gorilla/websocket"
	"log"
	"net/http"
	"os"
	"sync"
)

var (
	dragonflyQueuePrefix = "fileStatusQueue:"
	dragonflyHost        = os.Getenv("DRAGONFLYDB_HOST")
	dragonflyPort        = os.Getenv("DRAGONFLYDB_PORT")
	dragonflyAddr        = fmt.Sprintf("%s:%s", dragonflyHost, dragonflyPort)
)

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

var connections sync.Map

func main() {
	// Initialize DragonflyDB client
	client := redis.NewClient(&redis.Options{
		Addr:     dragonflyAddr,
		DB:       0,
	})

	// Initialize Gin router
	router := gin.Default()

	// WebSocket route to handle file status updates
	router.GET("/notifications/ws/:userID", func(c *gin.Context) {
		userID := c.Param("userID")
		log.Println(userID)

		conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
		if err != nil {
			log.Println("WebSocket upgrade error:", err)
			return
		}
		// Store the WebSocket connection
		connections.Store(userID, conn)

		// Close the WebSocket connection and remove it from the map when the client disconnects
		defer func() {
			conn.Close()
			connections.Delete(userID)

			// Remove the DragonflyDB queue channel
			queueKey := dragonflyQueuePrefix + userID
			client.Del(queueKey)
		}()

		// Create the user-specific DragonflyDB queue
		queueKey := dragonflyQueuePrefix + userID

		// Read messages from DragonflyDB queue and send file status updates to the WebSocket client
		for {
			result, err := client.BLPop(0, queueKey).Result()
			log.Println(result)
			if err != nil {
				log.Println("Error while popping item from DragonflyDB queue:", err)
				return
			}

			fileStatus := result[1]
			log.Println(fileStatus)

			// Send the file status update to the WebSocket client
			err = conn.WriteMessage(websocket.TextMessage, []byte(fileStatus))
			if err != nil {
				log.Println("WebSocket send error:", err)
				return
			}

			// Check if the sent file status update indicates completion
			// If yes, break the loop and close the WebSocket connection
			if isCompletionStatus(fileStatus) {
				break
			}
		}
	})

	// Run the Gin server
	err = router.Run(":8000")
	if err != nil {
		log.Fatal(err)
	}
}

// Helper function to check if the file status update indicates completion
func isCompletionStatus(fileStatus string) bool {
	// Customize this logic based on your file status update format
	// Return true if the status indicates completion, false otherwise
	return fileStatus == "completed"
}

Above, we again connect to DragonflyDB and define a Gin API with a single endpoint. This time, though, it is written to handle an incoming websocket request from the React frontend. Once the websocket connection is established, the Notifications service then starts a Go routine — a parallel process — in which it listens to the DragonflyDB task queue that was created when the Web-API service received the file upload. When it detects a message in the queue, it “pops” it from the queue, i.e. removes the first item in the queue in a first in, first out fashion.

This “popped” item is then relayed to the frontend via the existing websocket connection. These items include a “processing” status, which is sent by the Web-API and a second “completed” or “failed” status, which is sent by the File-Processing service.

File-Processing


package main

import (
	"fmt"
	"log"
	"net/http"

	"github.com/gin-gonic/gin"
	"github.com/go-redis/redis"
	"os"
	"time"
)

var (
	dragonflyQueuePrefix = "fileStatusQueue:"
	dragonflyStatus      = "completed"
	dragonflyHost        = os.Getenv("DRAGONFLYDB_HOST")
	dragonflyPort        = os.Getenv("DRAGONFLYDB_PORT")
	dragonflyAddr        = fmt.Sprintf("%s:%s", dragonflyHost, dragonflyPort)
)

func main() {
	// Initialize DragonflyDB client
	client := redis.NewClient(&redis.Options{
		Addr:     dragonflyAddr,
		Password: "", // Add password if required
		DB:       0,  // Select appropriate DragonflyDB database
	})

	// Initialize Gin router
	router := gin.Default()

	// API route to handle file uploads
	router.POST("/upload", func(c *gin.Context) {
		// Get the file from the form data
		file, err := c.FormFile("file")
		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}

		// Simulate file processing time
		time.Sleep(time.Second * 10)

		// Get the userID from the form data
		userID := c.PostForm("userID")

		// Create the user-specific DragonflyDB queue key
		queueKey := dragonflyQueuePrefix + userID

		// Push the file upload task to the user-specific DragonflyDB queue
		err = client.RPush(queueKey, file.Filename).Err()
		if err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue file"})
			return
		}

		// Publish the "processing" status to the user-specific DragonflyDB queue
		err = client.RPush(queueKey, dragonflyStatus).Err()
		if err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish status"})
			return
		}

	})
	// Run the Gin server
	err := router.Run(":8000")
	if err != nil {
		log.Fatal(err)
	}
}

Finally, we have the File-Processing service, which again connects to DragonflyDB, and exposes a file upload endpoint, which receives the file from the Web-API. After simulating the file processing workflow, it sends a status update to the DragonflyDB task queue. It is worth noting that any async process could be executed here — for example, a file could just as easily be uploaded to an S3 bucket.

The React frontend

The Frontend is a simple React app with a single component — fileUpload.js. When the userID is entered via the UI, the websocket request is sent to the Notifications service. Likewise, when the upload button is clicked, the file and userID are sent as form data to the Web-API.

fileUpload.js


import React, { useState } from 'react';


const UploadComponent = () => {
   const [file, setFile] = useState(null);
   const [userID, setUserID] = useState('');
   const [responses, setResponses] = useState([]);
   const [componentKey, setComponentKey] = useState(0); // Unique key for the component


   const handleFileChange = (event) => {
       setFile(event.target.files[0]);
   };


   const handleUserIDChange = (event) => {
       setUserID(event.target.value);
   };


   const handleUpload = async () => {
       try {
           const formData = new FormData();
           formData.append('file', file);
           formData.append('userID', userID);


           // Send file and userID to the upload API
           await fetch('http://localhost/api/upload', {
               method: 'POST',
               body: formData,
           });


           // Clear the file input and userID field
           setFile(null);
           setUserID('');


           // Increment the component key to force a reload
           setComponentKey((prevKey) => prevKey + 1);
       } catch (error) {
           console.error('Error occurred during file upload:', error);
       }


       // Set up WebSocket connection when uploading completes
       const ws = new WebSocket(`ws://localhost/notifications/ws/${userID}`);


       // Handle WebSocket message
       ws.onmessage = (event) => {
           const response = event.data;
           setResponses((prevResponses) => [...prevResponses, response]);
       };


       // Clean up WebSocket connection
       return () => {
           ws.close();
       };
   };


   return (
      <div key={componentKey} style={containerStyle}>
           <input
               type="text"
               placeholder="User ID"
               value={userID}
               onChange={handleUserIDChange}
               style={inputStyle}
           />
           <br />
           <h2>File Upload</h2>
           <input type="file" onChange={handleFileChange} style={inputStyle} />
           <br />
           <button onClick={handleUpload} style={buttonStyle}>
               Upload
           </button>
           <br />
           <h2>Responses:</h2>
           <div style={responseContainerStyle}>
               {responses.map((response, index) => (
                   <p key={index} style={responseStyle}>
                       {response}
                   </p>
               ))}
           </div>
       </div>
   );
};


// Styling
const containerStyle = {
   fontFamily: 'Arial, sans-serif',
   maxWidth: '400px',
   margin: '0 auto',
   padding: '20px',
};


const inputStyle = {
   padding: '5px',
   marginBottom: '10px',
};


const buttonStyle = {
   padding: '10px',
   backgroundColor: '#007bff',
   color: '#fff',
   border: 'none',
   borderRadius: '5px',
   cursor: 'pointer',
};


const responseContainerStyle = {
   border: '1px solid #ccc',
   padding: '10px',
};


const responseStyle = {
   margin: '5px 0',
};


export default UploadComponent;

App.js


import UploadComponent from './components/fileUpload'

function App() {
  return (
    <div className="App">
      <UploadComponent/>
    </div>
  );
}

export default App;


Containerizing the services

To deploy our application in Kubernetes, we'll first need to containerize it. For this we'll use Docker, which will require a Dockerfile for each service. We'll build and then push the resulting image from each Dockerfile to a remote registry, so that they can then be pulled by Kubernetes when the various services start up.

Note that all of the above services are available in the full project in GitHub.

The Frontend

The frontend container will consist of a Nginx base image into which we copy the build artifacts generated by running `npm run build` locally, and it will also include a `nginx.conf` file which will allow the Nginx web server to route traffic to our React app.

Dockerfile


# Use an official Nginx image as the base
FROM nginx:alpine

# Remove default Nginx configuration
RUN rm -rf /etc/nginx/conf.d

# Copy custom Nginx configuration
COPY nginx.conf /etc/nginx/conf.d/default.conf

# Copy the built React app from the local machine to the container
COPY build /usr/share/nginx/html

# Expose a port for the container
EXPOSE 80

# Start the Nginx web server
CMD ["nginx", "-g", "daemon off;"]

The backend services

As each of the backend services are built similarly, the following example is representative of all three. It is a multi-stage build in which the Go binary is built in a “builder” stage, and is then copied into an Alpine container to be run directly as a binary. Add this file to the root directories of the Web-API, Notifications and File-Processing services.

Dockerfile


FROM golang:1.18 as builder

# first (build) stage

WORKDIR /app
COPY . .
RUN go mod download
RUN CGO_ENABLED=0 go build -v -o app .

# final (target) stage

FROM alpine:latest
WORKDIR /root/
COPY --from=builder /app ./
CMD ["./app"]

Deploying to Kubernetes

To deploy the application to Kubernetes, we'll use Helm, a popular package manager for K8s.

You can read more about Helm Charts in the Helm docs, but the gist of it is that there are two core components involved. There are templates, and there is a values.yaml file.

The templates are fundamentally standard Kubernetes resource definitions, but they are templates in the sense that they dynamically resolve to include values defined in a separate values file — the values.yaml file. This way, you can change the configuration values associated with a given application via the values file, rather than having to update the resource definitions directly.

Kubernetes Manifests

Next, we will deploy the resources we defined above. Below, we have an example of a Kubernetes manifest for the frontend service. It contains definitions for three Kubernetes resources — a deployment, a service, and an ingress. Each of the other microservices — i.e. the Web-API, Notifications and File-Processing services — will require only a Kubernetes deployment and Kubernetes service definition, which will be very similar to that shown below. Again, the full application resources are available for download in GitHub.


---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: frontend
  labels:
    app: frontend
spec:
  selector:
    matchLabels:
      app: frontend
  replicas: 1
  template:
    metadata:
      labels:
        app: frontend
    spec:
      containers:
        - name: frontend
          image: jdvincent/dragonfly-frontend:latest
          imagePullPolicy: Always
          env:
            - name: HOST
              value: {{ .Values.frontend.envVars.HOST | quote }}
          ports:
            - name: frontend
              containerPort: 80
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: frontend
spec:
  ports:
    - port: 80
      targetPort: 80
      name: frontend
  selector:
    app: frontend
  type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: frontend
spec:
	ingressClassName: kong
  rules:
  - http:
      paths:
        - path: /
          pathType: Prefix
          backend:
            service:
              name: frontend
              port:
                number: 80
        - path: /api
          pathType: Prefix
          backend:
            service:
              name: web-api
              port:
                number: 8000
        - path: /notifications
          pathType: Prefix
          backend:
            service:
              name: notifications
              port:
                number: 8000

And we can deploy these resources to the cluster withe the following command:


kubectl apply -f frontend.yaml

Developing with Velocity

Currently, the File-Processing service just sleeps for 10 seconds to simulate a file processing workflow, and to update it we would historically have needed to go through most of the above deployment steps — building and pushing the images and deploying to Kubernetes — again in order to update the running microservice.

But with Velocity, we can simply start a development session, update our local source code and the changes will be reflected in the cluster.

As an example, let's update the File-Processing service to parse a CSV file that we upload and print the lines of the file to our Velocity console.

To do so, simply start a Velocity development session as follows:

In your GoLand IDE, navigate to the JetBrains Marketplace, each for Velocity, and click “Install.”

Next, click “Login” to log in with either a Google or a GitHub account. Finally, click the debug icon with the default run configuration, “Setup Velocity.”

Check to make sure that the auto populated fields are correct, click “Next” and then click “Create.”

Next, with the Velocity development session running, update the File-Processing service with the following changes:


func main(){
...
		// Simulate file processing time
		//time.Sleep(time.Second * 10)
		r := parseCSV(file)
		if !r {
			dragonflyStatus = "failed"
		} else {
			dragonflyStatus = "completed"
		}
...
}

func parseCSV(file *multipart.FileHeader) bool {
	srcFile, err := file.Open()
	if err != nil {
		log.Println("Failed to open file:", err)
		return false
	}
	reader := csv.NewReader(srcFile)
	records, err := reader.ReadAll()
	if err != nil {
		fmt.Println("Error reading CSV:", err)
		return false
	}

	// Print each record
	for _, record := range records {
		fmt.Println(record)
	}
	return true

Then, save your changes, and upload a CSV file, and you'll see output like the following in your Velocity console!

Conclusion

DragonflyDB is a high-performance, drop-in replacement for Redis. It works with any Redis SDK, and it does everything Redis does, such as caching, task queues and more. Above, we looked at one use case for DragonflyDB — an async notifications service to relay status messages to the frontend for async backend processes, such as file upload processing.

And we also saw how Velocity can dramatically simplify and accelerate development and debugging of micro service applications running in Kubernetes! Before Velocity, in order to update the file processing service, we would have had to update our local source code, delete and then rebuild the file-processing image in Minikube, delete the file-processing Kubernetes deployment, and then redeploy our updated code.

But with Velocity, we were able to simply update our code as needed, and our changes were immediately reflected in the remote environment!

Python class called ProcessVideo

Python class called ProcessVideo

Get started with Velocity