Streamline Kubernetes Native Application Development with Velocity
Kubernetes native applications are those that leverage Kubernetes-specific API objects, such as Batch/Jobs, Pods and the like, not only as a means of deploying an application, but rather as a means of building the application itself. This architectural approach offers numerous advantages, such as the ability to run application processes asynchronously and to allow the Kubernetes scheduler to distribute those processes efficiently across your full cluster’s resources.
However, while developing such an application, your individual services can only be effectively written and debugged in a running Kubernetes cluster, as they rely on the Kubernetes API and any additional cluster resources your architecture includes.
Historically, this would have required you to update your code locally, commit your changes, wait for any relevant CI process to complete, build and push a container image, pull the image into your Kubernetes cluster and only then would you be able to see if your code runs as expected.
Velocity dramatically simplifies and speeds up the development workflow for Kubernetes native applications by allowing you to simply write and debug code in your local IDE, which it then automatically syncs to your cluster, so you can effectively develop directly in your cluster while working from your local machine.
This post will walk you through the process of developing a Kubernetes native application, and it will also demonstrate how much more easily and efficiently such applications can be developed with Velocity.
What we’re building
Today we'll build a video frame stabilizer written in Python, which will require a significant amount of compute time and resources to process the videos we'll be uploading. To keep the application responsive while this video processing workflow is being executed, we'll use Kubernetes Batch/Jobs to allow the Kubernetes controller to schedule this processing workflow asynchronously and in a way that maximizes our cluster's compute resources.
That is, rather than running the video processing workflow in our “core” FastAPI application, which would cause the API to bog down with any significant traffic load, the FastAPI application will call the Kubernetes API and request the creation of a Batch/Job that will run the video processing workflow as an independent and isolated workflow. This way, the Kubernetes scheduler can spread the running of this distinct workflow across the full cluster's resources, and the FastAPI application will be able to handle additional incoming requests during the video processing workflow.
To start, let's spin up a local cluster with Minikube. Because the video processing workflow is a bit resource intensive, let's also increase the default memory and CPUs of our cluster, like so:
minikube config set memory 3500
minikube config set cpus 4
minikube start
minikube addons enable kong
minikube tunnel
Above, after starting the cluster, we have enabled the kong add-on and started a tunnel session in order to later deploy a Kubernetes Ingress, so we can then send requests from outside of the cluster.
Next, we'll create a virtual env locally to make it simple to manage our dependencies as we work.
pip install pipenv
pipenv shell
The FastAPI video upload and play app (Kubernetes Deployment)
Let's create the “core” FastAPI application which will allow us to upload videos, store them in MongoDB's GridFS, and then stream the videos to the browser. First, we'll install our Python dependencies, and add those packages to our requirements.txt:
pip install fastapi motor python-multipart uvicorn
pip freeze > requirements.txt
And now, let's define the API in a file called main.py:
import logging
import os
from fastapi import FastAPI, UploadFile
from fastapi.responses import StreamingResponse
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = FastAPI()
MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')
@app.on_event('startup')
async def get_mongo():
video_db = AsyncIOMotorClient(f'mongodb://{MONGO_HOST}:{MONGO_PORT}').video
app.fs = AsyncIOMotorGridFSBucket(video_db)
@app.post('/api/upload')
async def upload(file: UploadFile):
if file.filename:
grid_in = app.fs.open_upload_stream(
file.filename, metadata={'contentType': 'video/mp4'})
data = await file.read()
await grid_in.write(data)
await grid_in.close()
base_filename = os.path.splitext(file.filename)[0]
output_filename = f"{base_filename}_stabilized.mp4"
response_text = f"""Successfully uploaded {file.filename}
View it in your browser at http://localhost/api/stream/{file.filename}"""
return response_text
return ''
@app.get('/api/stream/{filename}')
async def stream(filename: str):
grid_out = await app.fs.open_download_stream_by_name(filename)
async def read():
while grid_out.tell() < grid_out.length:
yield await grid_out.readchunk()
return StreamingResponse(
read(), media_type='video/mp4', headers={
'Content-Length': str(grid_out.length)})
Above, we've defined a simple FastAPI application that connects to MongoDB at startup, and that then exposes two HTTP endpoints — upload and stream. The upload endpoint accepts a POST request that contains a video file as form data, which it then writes to MongoDB. The stream endpoint accepts a GET request with the name of a file that has previously been uploaded, and then it streams the file back to the browser.
Notice that to stream the video, we are returning a FastAPI streaming response, and within that, we are reading the data in chunks asynchronously from MongoDB, which allows us to stream data to the browser as it is read from Mongo.
Dockerize the app
Next, we'll need to containerize the application we just wrote above with the following Dockerfile.
FROM python:3.10
COPY . .
RUN pip install -r requirements.txt
CMD uvicorn main:app --host 0.0.0.0 --port 8000
Then we can run the following to build and push the image:
Now, to deploy the app to Kubernetes, we'll need to write two files which will define the MongoDB deployment and service, the FastAPI deployment and service, and the ingress that will route traffic to the FastAPI app.
Next, let's write the logic that will call the Kubernetes API in a class called CreateProcessVideoJob with the Python SDK for Kubernetes. This base_api.py file will include a Kubernetes Job manifest defined with the Python SDK for Kubernetes, which in turn will contain the container image batch-process-image:latest that we’ll define in just a bit.
import binascii
import os
from kubernetes import client
class CreateProcessVideoJob:
def __init__(self, filename):
self.filename = filename
self.job_name = binascii.hexlify(os.urandom(16)).decode('utf-8')
self.job = None
self.batch_v1 = client.BatchV1Api()
def create_job_object(self):
env_vars = [
client.V1EnvVar(name="MONGO_HOST", value="mongo"),
client.V1EnvVar(name="MONGO_PORT", value="27017"),
]
# Configure Pod template container
container = client.V1Container(
name=self.job_name,
image="batch-process-image:latest", # We will build this image shortly.
command=["python", "main.py", "--filename", self.filename],
env=env_vars)
# Create and configure a spec section
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": self.job_name}),
spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
# Create the specification of deployment
spec = client.V1JobSpec(
template=template,
backoff_limit=4)
# Instantiate the job object
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=self.job_name),
spec=spec)
self.job = job
def create_job(self):
api_response = self.batch_v1.create_namespaced_job(
body=self.job,
namespace="default")
print("Job created. status='%s'" % str(api_response.status))
Notice that above, we've defined all the sections that you would see in a standard YAML manifest for a Batch/Job, but we've done so in Python.
main.py
And now, let's import the above module — base_api.py, and update the upload function so that it will instantiate the CreateProcessVideoJob class and then use it to call the Kubernetes API as a FastAPI background task:
import logging
import os
from fastapi import FastAPI, BackgroundTasks, UploadFile
from fastapi.responses import StreamingResponse
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
from kubernetes import config
from base_api import CreateProcessVideoJob
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = FastAPI()
MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')
config.load_incluster_config()
@app.on_event('startup')
async def get_mongo():
video_db = AsyncIOMotorClient(f'mongodb://{MONGO_HOST}:{MONGO_PORT}').video
app.fs = AsyncIOMotorGridFSBucket(video_db)
async def _process_video(filename):
create_batch_job = CreateProcessVideoJob(filename)
create_batch_job.create_job_object()
create_batch_job.create_job()
@app.post('/api/upload')
async def upload(file: UploadFile, background_tasks: BackgroundTasks):
if file.filename:
grid_in = app.fs.open_upload_stream(
file.filename, metadata={'contentType': 'video/mp4'})
data = await file.read()
await grid_in.write(data)
await grid_in.close()
background_tasks.add_task(_process_video, file.filename)
base_filename = os.path.splitext(file.filename)[0]
output_filename = f"{base_filename}_stabilized.mp4"
response_text = f"""Successfully uploaded {file.filename}
View it in your browser at http://localhost/api/stream/{file.filename}
Kubernetes batch job is running; when complete,
you can view the processed video at http://localhost/api/stream/{output_filename}"""
return response_text
return ''
@app.get('/api/stream/{filename}')
async def stream(filename: str):
grid_out = await app.fs.open_download_stream_by_name(filename)
async def read():
while grid_out.tell() < grid_out.length:
yield await grid_out.readchunk()
return StreamingResponse(
read(), media_type='video/mp4', headers={
'Content-Length': str(grid_out.length)})
Develop the Batch/Job workflow
Next, we'll need to define the video processing workflow, which will run as a container in the Batch/Job we're creating. For this, we'll define a Python class called ProcessVideo, which will take a filename as a parameter when we instantiate the class.
import argparse
import asyncio
import os
import cv2
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
from vidgear.gears import VideoGear
MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')
class ProcessVideo:
def __init__(self, filename):
self.raw_video = None
self.processed_video = None
self.filename = filename
self.base_filename = None
self.video_db = AsyncIOMotorClient(
f'mongodb://{MONGO_HOST}:{MONGO_PORT}').video
self.library = self.video_db.library
self.fs = AsyncIOMotorGridFSBucket(self.video_db)
async def download_video(self):
cursor = self.fs.find(
{'filename': self.filename}, no_cursor_timeout=True)
while (await cursor.fetch_next):
grid_out = cursor.next_object()
self.raw_video = await grid_out.read()
with open('video.mp4', 'wb') as f:
f.write(self.raw_video)
def _stream_to_file(self):
self.base_filename = os.path.splitext(self.filename)[0]
output_filename = f"{self.base_filename}_stabilized.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
# Open the VideoWriter
output_writer = None
frame_size = None
while True:
frame = self.processed_video.read()
if frame is None:
break
if output_writer is None:
frame_size = (frame.shape[1], frame.shape[0])
output_writer = cv2.VideoWriter(
output_filename, fourcc, 30.0, frame_size)
output_writer.write(frame)
if output_writer is not None:
output_writer.release()
print(f"Processed frames saved to {output_filename}")
else:
print("No frames were processed.")
def process_video(self):
self.processed_video = \
VideoGear(source='video.mp4', stabilize=True).start()
self._stream_to_file()
async def upload_video(self):
filename = f"{self.base_filename}_stabilized.mp4"
grid_in = self.fs.open_upload_stream(
filename, metadata={'contentType': 'video/mp4'})
with open(filename, 'rb') as file:
data = file.read()
await grid_in.write(data)
await grid_in.close()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--filename', dest='filename', type=str, required=True)
_args = parser.parse_args()
return _args
async def main(filename):
pv = ProcessVideo(filename=filename)
await pv.download_video()
pv.process_video()
await pv.upload_video()
if __name__ == '__main__':
args = parse_args()
asyncio.run(main(args.filename))
Build the container image
Create the following Dockerfile:
FROM python:3.10
RUN pip install motor opencv-python vidgear
RUN apt-get update && apt-get install -y libgl1-mesa-glx
COPY main.py .
And then build the image in Minikube as follows:
minikube image build batch-process-image:latest
Upload another video
curl -X POST -F "file=@Documents/wobbly2.mov" http://localhost/api/upload
Now, when you upload a video, you'll see the following error in the Velocity console:
This is the power of Velocity! You can develop Kubernetes native applications and see Kubernetes-specific tracebacks as if you were developing locally!
If you look closely, the above error is a 403 response from the Kubernetes API, which means that the FastAPI API is sending a request to the Kubernetes API, but the default Kubernetes Service Account (which is trying to execute the command the FastAPI app is sending) doesn't have permission to create the Job that it is trying to create. To fix this, we'll need to give the default Service Account permission to create a Batch/Job in the default namespace.
The Kubernetes Role and RoleBinding
To do this, we'll need to create the following Role and RoleBinding. The Role we'll create will be called jobs-creator and it will specify the “default” namespace, the “batch” API group, the “jobs” resource type within that API group, and the actions or “verbs” that this Role will be able to carry out.
Then, we'll associate this Role with the default Service Account by creating a related RoleBinding called the jobs-creator-binding. Notice that this RoleBinding specifies the default Service Account and the Role we just defined — the “jobs-creator.”
We can create the above resources with kubectl like we did with the others above:
kubectl apply -f role.yaml -n default
Upload a Video
Now, upload another video, and you will see output similar to that shown below, as the FastAPI application will now have permission to create a Batch/Jobs API object in the default namespace!
curl -X POST -F "file=@Documents/wobbly3.mov" http://localhost/api/upload
"Successfully uploaded wobbly.mov
View it in your browser at http://localhost/api/stream/wobbly3.mov
Kubernetes batch job is running; when complete,
you can view the processed video at http://localhost/api/stream/wobbly3_stabilized.mp4"
Check the Status of the Kubernetes Job
Now, in the Minikube dashboard, you can track the progress of the video processing job.
The running Batch/Job in the above screenshot is our video being processed! When this Batch/Job completes, you'll be able to navigate to the link that was returned from the Curl upload — i.e. http://localhost/api/stream/<video_basename>_stabilized.mp4 to stream the stabilized video!
Conclusion
Kubernetes native applications are those that are not only deployed in Kubernetes, but rather are built from Kubernetes API objects. Above, we saw how this application design approach can allow compute-intensive processes to run as independent and asynchronous workflows via Batch/Jobs, and we saw how Velocity can dramatically simplify the process of developing complex workflows such as this within a running Kubernetes cluster.
Without Velocity, we would have had to update our local code, wait for all relevant CI processes to complete, build a container image, and deploy that new image to Kubernetes in order to see if our updated code worked. But with Velocity, we were able to simply develop the service while it was deployed to our cluster, and see our updated code running almost immediately.