Skip to content

Instantly share code, notes, and snippets.

@hugotkk
Created May 8, 2023 00:33
Show Gist options
  • Save hugotkk/9438baf731a53fc56451ed0d861c9686 to your computer and use it in GitHub Desktop.
Save hugotkk/9438baf731a53fc56451ed0d861c9686 to your computer and use it in GitHub Desktop.
simple rpc - server and client
import time
import grpc
import video_service_pb2
import video_service_pb2_grpc
def generate_messages():
for i in range(5):
print("Get Video " + str(i))
yield video_service_pb2.VideoID(id=str(i))
print("Sleeping for 5 seconds")
time.sleep(5)
def send_messages(stub):
request_iterator = generate_messages()
for response in stub.GetVideo(request_iterator):
print("NEW Response:")
print(response)
def add_video():
with grpc.insecure_channel('localhost:50051') as channel:
stub = video_service_pb2_grpc.VideoServiceStub(channel)
for i in range(5):
video = video_service_pb2.Video(
id=str(i),
title="Sample Video " + str(i),
author="John Doe",
url="https://example.com/sample.mp4",
duration=120
)
i+=1
response = stub.AddVideo(video)
print("Video added successfully:", response)
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = video_service_pb2_grpc.VideoServiceStub(channel)
send_messages(stub)
if __name__ == '__main__':
add_video()
run()
from concurrent import futures
import grpc
import video_service_pb2
import video_service_pb2_grpc
import google.protobuf.empty_pb2
from grpc_reflection.v1alpha import reflection
import logging
class VideoService(video_service_pb2_grpc.VideoServiceServicer):
def __init__(self):
self.videos = {}
def AddVideo(self, request, context):
video = video_service_pb2.Video(
id=request.id,
title=request.title,
author=request.author,
url=request.url,
duration=request.duration
)
self.videos[video.id] = video
return video
def RemoveVideo(self, request, context):
if request.id in self.videos:
del self.videos[request.id]
return google.protobuf.empty_pb2.Empty()
else:
context.abort(grpc.StatusCode.NOT_FOUND, "Video not found")
def ListVideos(self, request, context):
video_list = video_service_pb2.VideoList(videos=self.videos.values())
return video_list
def GetVideo(self, request_iterator, context):
for request in request_iterator:
if request.id in self.videos:
yield self.videos[request.id]
else:
context.abort(grpc.StatusCode.NOT_FOUND, "Video not found")
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
video_service_pb2_grpc.add_VideoServiceServicer_to_server(VideoService(), server)
server.add_insecure_port('[::]:50051')
SERVICE_NAMES = (
video_service_pb2.DESCRIPTOR.services_by_name['VideoService'].full_name,
reflection.SERVICE_NAME,
)
reflection.enable_server_reflection(SERVICE_NAMES, server)
logging.basicConfig(level=logging.DEBUG)
# Start the server
server.start()
try:
server.wait_for_termination()
except KeyboardInterrupt:
logger.info('Server stopped by keyboard interrupt')
if __name__ == '__main__':
serve()
syntax = "proto3";
import "google/protobuf/empty.proto";
package video_service;
service VideoService {
rpc AddVideo(Video) returns (Video) {}
rpc RemoveVideo(VideoID) returns (google.protobuf.Empty) {}
rpc ListVideos(google.protobuf.Empty) returns (VideoList) {}
rpc GetVideo(stream VideoID) returns (stream Video) {}
}
message Video {
string id = 1;
string title = 2;
string author = 3;
string url = 4;
int32 duration = 5;
}
message VideoID {
string id = 1;
}
message VideoList {
repeated Video videos = 1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment