Last active
October 29, 2021 06:46
-
-
Save hehehwang/9e75c254a57d2c8c820e7b49bbea266c to your computer and use it in GitHub Desktop.
rpi-comm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
from time import time | |
def client(): | |
clientSocket = socket.socket() | |
serverHost = "192.168.0.26" | |
serverPort = 5689 | |
clientSocket.connect((serverHost, serverPort)) | |
print("connection successful") | |
while 1: | |
times = [] | |
howMany = input("iterations?") | |
if not howMany: | |
clientSocket.sendall("close".encode()) | |
break | |
for _ in range(int(howMany)): | |
message = "send me landscape" | |
timer = time() | |
clientSocket.send(message.encode()) | |
data = clientSocket.recv(4096).decode() | |
duration = time() - timer | |
print(f"{data} : {duration:.5f} sec") | |
times.append(duration) | |
print(f"Average duration per request: {sum(times)/len(times):.3f}") | |
def main(): | |
client() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import cv2 | |
import mediapipe as mp | |
def capturePose(): | |
mp_drawing = mp.solutions.drawing_utils | |
mp_drawing_styles = mp.solutions.drawing_styles | |
mp_pose = mp.solutions.pose | |
cap = cv2.VideoCapture(0) | |
with mp_pose.Pose( | |
min_detection_confidence=0.5, min_tracking_confidence=0.5 | |
) as pose: | |
while cap.isOpened(): | |
success, image = cap.read() | |
if not success: | |
print("Ignoring empty camera frame.") | |
# If loading a video, use 'break' instead of 'continue'. | |
continue | |
# Flip the image horizontally for a later selfie-view display, and convert | |
# the BGR image to RGB. | |
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB) | |
# To improve performance, optionally mark the image as not writeable to | |
# pass by reference. | |
image.flags.writeable = False | |
results = pose.process(image) | |
# Draw the pose annotation on the image. | |
image.flags.writeable = True | |
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
mp_drawing.draw_landmarks( | |
image, | |
results.pose_landmarks, | |
mp_pose.POSE_CONNECTIONS, | |
landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style(), | |
) | |
cv2.imshow("MediaPipe Pose", image) | |
cv2.waitKey(5) | |
yield results | |
def server(): | |
HOST = "192.168.0.26" | |
PORT = 5689 | |
server_socket = socket.socket() | |
print(HOST) | |
server_socket.bind((HOST, PORT)) | |
server_socket.listen() | |
server_socket.settimeout(30) | |
print("waiting for connection") | |
conn, address = server_socket.accept() | |
print("connection successful, loading mediapipe") | |
poseGen = capturePose() | |
next(poseGen) | |
print("loaded mediapipe successful") | |
while 1: | |
clientMessage = conn.recv(4096).decode() | |
print(f"{address} :: {clientMessage}") | |
if clientMessage == "close": | |
conn.close() | |
break | |
result = next(poseGen) | |
if result: | |
print("pose world landmark sent successfully") | |
else: | |
print("No landmarks available") | |
conn.sendall(bytes(str(result.pose_world_landmarks), "utf-8")) | |
if __name__ == "__main__": | |
server() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment