In this post, we walk through the process of building an async notification service in a Golang microservice-based application that includes websockets, React and DragonflyDB and is deployed in Kubernetes. Then, once we have the project deployed in our cluster, we'll take a look at the streamlined, in-cluster development that Velocity makes possible straight from your local IDE.
DragonflyDB is a drop-in replacement for supercharging your Redis workloads. It works with any Redis SDK, and works just like Redis — caching, task queues, you name it — with a fraction of the compute overhead. Because of this, you can handle much higher volumes of throughput without the need to scale your Redis deployment, so you can reduce your cloud footprint and, with it, your overall deployment costs.
Today, we'll demonstrate this with a microservice app written in Go with a React frontend that leverages DragonflyDB to send async status notifications to the React frontend as shown in the diagram below.
What we're building
Specifically, the React frontend will accept a file upload, which will be sent to the Web-API service, and it will also open a websocket connection with a separate Notifications service. When the Web-API service receives the file, it creates a task queue in DragonflyDB for a given userID with the status “processing.” It then sends the file to a File-Processing service, which simulates a file-processing workflow, and then sends a status of “complete” to the same userID-based task queue in DragonflyDB.
As this is happening, the Notifications service is listening to the various task queues in DragonflyDB, and — via the existing websocket connection(s) with the browser — it relays the file processing status in real time to the user.
package main
import (
"bytes"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"os"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis"
)
var (
dragonflyQueuePrefix = "fileStatusQueue:"
dragonflyStatus = "processing"
dragonflyHost = os.Getenv("DRAGONFLYDB_HOST")
dragonflyPort = os.Getenv("DRAGONFLYDB_PORT")
dragonflyAddr = fmt.Sprintf("%s:%s", dragonflyHost, dragonflyPort)
uploadAPIURL = os.Getenv("UPLOAD_API_URL")
)
func main() {
// Initialize Redis client
client := redis.NewClient(&redis.Options{
Addr: dragonflyAddr,
DB: 0,
})
// Initialize Gin router
router := gin.Default()
// Apply CORS middleware
router.Use(cors.Default())
// API route to handle file uploads
router.POST("/api/upload", func(c *gin.Context) {
// Get the file from the form data
file, err := c.FormFile("file")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Get the userID from the form data
userID := c.PostForm("userID")
// Create the user-specific Redis queue key
queueKey := dragonflyQueuePrefix + userID
// Push the file upload task to the user-specific Redis queue
err = client.RPush(queueKey, file.Filename).Err()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue file"})
log.Println("Failed to enqueue file:", err)
return
}
// Publish the "processing" status to the user-specific Redis queue
err = client.RPush(queueKey, dragonflyStatus).Err()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish status"})
log.Println("Failed to publish status:", err)
return
}
// Send the file to the separate API
go sendFileToAPI(file, userID)
c.JSON(http.StatusOK, gin.H{"message": "File uploaded and processing"})
})
// Run the Gin server
err := router.Run(":8000")
if err != nil {
log.Fatal(err)
}
}
// Helper function to send the file to the separate API
func sendFileToAPI(file *multipart.FileHeader, userID string) {
// The body of this function has been omitted for brevity,
// but is available in the repo
...
log.Println("File sent to API:", file.Filename)
}
In the above file, we connect to DragonflyDB, and define our Gin router. Then we create a single endpoint to handle a file upload and a userID as form data. From that, we parse the incoming file, and send it, along with the userID, to the file-processing service via an HTTP POST request. We also create a DragonflyDB task queue — the key of which is specific to the incoming userID.
We then send a “processing” status to the task queue, and log the success or failure of each of the above.
Notifications
package main
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis"
"github.com/gorilla/websocket"
"log"
"net/http"
"os"
"sync"
)
var (
dragonflyQueuePrefix = "fileStatusQueue:"
dragonflyHost = os.Getenv("DRAGONFLYDB_HOST")
dragonflyPort = os.Getenv("DRAGONFLYDB_PORT")
dragonflyAddr = fmt.Sprintf("%s:%s", dragonflyHost, dragonflyPort)
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
var connections sync.Map
func main() {
// Initialize DragonflyDB client
client := redis.NewClient(&redis.Options{
Addr: dragonflyAddr,
DB: 0,
})
// Initialize Gin router
router := gin.Default()
// WebSocket route to handle file status updates
router.GET("/notifications/ws/:userID", func(c *gin.Context) {
userID := c.Param("userID")
log.Println(userID)
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println("WebSocket upgrade error:", err)
return
}
// Store the WebSocket connection
connections.Store(userID, conn)
// Close the WebSocket connection and remove it from the map when the client disconnects
defer func() {
conn.Close()
connections.Delete(userID)
// Remove the DragonflyDB queue channel
queueKey := dragonflyQueuePrefix + userID
client.Del(queueKey)
}()
// Create the user-specific DragonflyDB queue
queueKey := dragonflyQueuePrefix + userID
// Read messages from DragonflyDB queue and send file status updates to the WebSocket client
for {
result, err := client.BLPop(0, queueKey).Result()
log.Println(result)
if err != nil {
log.Println("Error while popping item from DragonflyDB queue:", err)
return
}
fileStatus := result[1]
log.Println(fileStatus)
// Send the file status update to the WebSocket client
err = conn.WriteMessage(websocket.TextMessage, []byte(fileStatus))
if err != nil {
log.Println("WebSocket send error:", err)
return
}
// Check if the sent file status update indicates completion
// If yes, break the loop and close the WebSocket connection
if isCompletionStatus(fileStatus) {
break
}
}
})
// Run the Gin server
err = router.Run(":8000")
if err != nil {
log.Fatal(err)
}
}
// Helper function to check if the file status update indicates completion
func isCompletionStatus(fileStatus string) bool {
// Customize this logic based on your file status update format
// Return true if the status indicates completion, false otherwise
return fileStatus == "completed"
}
Above, we again connect to DragonflyDB and define a Gin API with a single endpoint. This time, though, it is written to handle an incoming websocket request from the React frontend. Once the websocket connection is established, the Notifications service then starts a Go routine — a parallel process — in which it listens to the DragonflyDB task queue that was created when the Web-API service received the file upload. When it detects a message in the queue, it “pops” it from the queue, i.e. removes the first item in the queue in a first in, first out fashion.
This “popped” item is then relayed to the frontend via the existing websocket connection. These items include a “processing” status, which is sent by the Web-API and a second “completed” or “failed” status, which is sent by the File-Processing service.
File-Processing
package main
import (
"fmt"
"log"
"net/http"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis"
"os"
"time"
)
var (
dragonflyQueuePrefix = "fileStatusQueue:"
dragonflyStatus = "completed"
dragonflyHost = os.Getenv("DRAGONFLYDB_HOST")
dragonflyPort = os.Getenv("DRAGONFLYDB_PORT")
dragonflyAddr = fmt.Sprintf("%s:%s", dragonflyHost, dragonflyPort)
)
func main() {
// Initialize DragonflyDB client
client := redis.NewClient(&redis.Options{
Addr: dragonflyAddr,
Password: "", // Add password if required
DB: 0, // Select appropriate DragonflyDB database
})
// Initialize Gin router
router := gin.Default()
// API route to handle file uploads
router.POST("/upload", func(c *gin.Context) {
// Get the file from the form data
file, err := c.FormFile("file")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Simulate file processing time
time.Sleep(time.Second * 10)
// Get the userID from the form data
userID := c.PostForm("userID")
// Create the user-specific DragonflyDB queue key
queueKey := dragonflyQueuePrefix + userID
// Push the file upload task to the user-specific DragonflyDB queue
err = client.RPush(queueKey, file.Filename).Err()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue file"})
return
}
// Publish the "processing" status to the user-specific DragonflyDB queue
err = client.RPush(queueKey, dragonflyStatus).Err()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish status"})
return
}
})
// Run the Gin server
err := router.Run(":8000")
if err != nil {
log.Fatal(err)
}
}
Finally, we have the File-Processing service, which again connects to DragonflyDB, and exposes a file upload endpoint, which receives the file from the Web-API. After simulating the file processing workflow, it sends a status update to the DragonflyDB task queue. It is worth noting that any async process could be executed here — for example, a file could just as easily be uploaded to an S3 bucket.
The React frontend
The Frontend is a simple React app with a single component — fileUpload.js. When the userID is entered via the UI, the websocket request is sent to the Notifications service. Likewise, when the upload button is clicked, the file and userID are sent as form data to the Web-API.
import UploadComponent from './components/fileUpload'
function App() {
return (
<div className="App">
<UploadComponent/>
</div>
);
}
export default App;
Containerizing the services
To deploy our application in Kubernetes, we'll first need to containerize it. For this we'll use Docker, which will require a Dockerfile for each service. We'll build and then push the resulting image from each Dockerfile to a remote registry, so that they can then be pulled by Kubernetes when the various services start up.
The frontend container will consist of a Nginx base image into which we copy the build artifacts generated by running `npm run build` locally, and it will also include a `nginx.conf` file which will allow the Nginx web server to route traffic to our React app.
Dockerfile
# Use an official Nginx image as the base
FROM nginx:alpine
# Remove default Nginx configuration
RUN rm -rf /etc/nginx/conf.d
# Copy custom Nginx configuration
COPY nginx.conf /etc/nginx/conf.d/default.conf
# Copy the built React app from the local machine to the container
COPY build /usr/share/nginx/html
# Expose a port for the container
EXPOSE 80
# Start the Nginx web server
CMD ["nginx", "-g", "daemon off;"]
The backend services
As each of the backend services are built similarly, the following example is representative of all three. It is a multi-stage build in which the Go binary is built in a “builder” stage, and is then copied into an Alpine container to be run directly as a binary. Add this file to the root directories of the Web-API, Notifications and File-Processing services.
Dockerfile
FROM golang:1.18 as builder
# first (build) stage
WORKDIR /app
COPY . .
RUN go mod download
RUN CGO_ENABLED=0 go build -v -o app .
# final (target) stage
FROM alpine:latest
WORKDIR /root/
COPY --from=builder /app ./
CMD ["./app"]
Deploying to Kubernetes
To deploy the application to Kubernetes, we'll use Helm, a popular package manager for K8s.
You can read more about Helm Charts in the Helm docs, but the gist of it is that there are two core components involved. There are templates, and there is a values.yaml file.
The templates are fundamentally standard Kubernetes resource definitions, but they are templates in the sense that they dynamically resolve to include values defined in a separate values file — the values.yaml file. This way, you can change the configuration values associated with a given application via the values file, rather than having to update the resource definitions directly.
Kubernetes Manifests
Next, we will deploy the resources we defined above. Below, we have an example of a Kubernetes manifest for the frontend service. It contains definitions for three Kubernetes resources — a deployment, a service, and an ingress. Each of the other microservices — i.e. the Web-API, Notifications and File-Processing services — will require only a Kubernetes deployment and Kubernetes service definition, which will be very similar to that shown below. Again, the full application resources are available for download in GitHub.
And we can deploy these resources to the cluster withe the following command:
kubectl apply -f frontend.yaml
Developing with Velocity
Currently, the File-Processing service just sleeps for 10 seconds to simulate a file processing workflow, and to update it we would historically have needed to go through most of the above deployment steps — building and pushing the images and deploying to Kubernetes — again in order to update the running microservice.
But with Velocity, we can simply start a development session, update our local source code and the changes will be reflected in the cluster.
As an example, let's update the File-Processing service to parse a CSV file that we upload and print the lines of the file to our Velocity console.
To do so, simply start a Velocity development session as follows:
In your GoLand IDE, navigate to the JetBrains Marketplace, each for Velocity, and click “Install.”
Next, click “Login” to log in with either a Google or a GitHub account. Finally, click the debug icon with the default run configuration, “Setup Velocity.”
Check to make sure that the auto populated fields are correct, click “Next” and then click “Create.”
Next, with the Velocity development session running, update the File-Processing service with the following changes:
func main(){
...
// Simulate file processing time
//time.Sleep(time.Second * 10)
r := parseCSV(file)
if !r {
dragonflyStatus = "failed"
} else {
dragonflyStatus = "completed"
}
...
}
func parseCSV(file *multipart.FileHeader) bool {
srcFile, err := file.Open()
if err != nil {
log.Println("Failed to open file:", err)
return false
}
reader := csv.NewReader(srcFile)
records, err := reader.ReadAll()
if err != nil {
fmt.Println("Error reading CSV:", err)
return false
}
// Print each record
for _, record := range records {
fmt.Println(record)
}
return true
Then, save your changes, and upload a CSV file, and you'll see output like the following in your Velocity console!
Conclusion
DragonflyDB is a high-performance, drop-in replacement for Redis. It works with any Redis SDK, and it does everything Redis does, such as caching, task queues and more. Above, we looked at one use case for DragonflyDB — an async notifications service to relay status messages to the frontend for async backend processes, such as file upload processing.
And we also saw how Velocity can dramatically simplify and accelerate development and debugging of micro service applications running in Kubernetes! Before Velocity, in order to update the file processing service, we would have had to update our local source code, delete and then rebuild the file-processing image in Minikube, delete the file-processing Kubernetes deployment, and then redeploy our updated code.
But with Velocity, we were able to simply update our code as needed, and our changes were immediately reflected in the remote environment!