Skip to content

Instantly share code, notes, and snippets.

@hehehwang
Last active October 29, 2021 06:46
Show Gist options
  • Save hehehwang/9e75c254a57d2c8c820e7b49bbea266c to your computer and use it in GitHub Desktop.
Save hehehwang/9e75c254a57d2c8c820e7b49bbea266c to your computer and use it in GitHub Desktop.
rpi-comm
import socket
from time import time
def client():
clientSocket = socket.socket()
serverHost = "192.168.0.26"
serverPort = 5689
clientSocket.connect((serverHost, serverPort))
print("connection successful")
while 1:
times = []
howMany = input("iterations?")
if not howMany:
clientSocket.sendall("close".encode())
break
for _ in range(int(howMany)):
message = "send me landscape"
timer = time()
clientSocket.send(message.encode())
data = clientSocket.recv(4096).decode()
duration = time() - timer
print(f"{data} : {duration:.5f} sec")
times.append(duration)
print(f"Average duration per request: {sum(times)/len(times):.3f}")
def main():
client()
if __name__ == "__main__":
main()
import socket
import cv2
import mediapipe as mp
def capturePose():
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose
cap = cv2.VideoCapture(0)
with mp_pose.Pose(
min_detection_confidence=0.5, min_tracking_confidence=0.5
) as pose:
while cap.isOpened():
success, image = cap.read()
if not success:
print("Ignoring empty camera frame.")
# If loading a video, use 'break' instead of 'continue'.
continue
# Flip the image horizontally for a later selfie-view display, and convert
# the BGR image to RGB.
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
# To improve performance, optionally mark the image as not writeable to
# pass by reference.
image.flags.writeable = False
results = pose.process(image)
# Draw the pose annotation on the image.
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
mp_drawing.draw_landmarks(
image,
results.pose_landmarks,
mp_pose.POSE_CONNECTIONS,
landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style(),
)
cv2.imshow("MediaPipe Pose", image)
cv2.waitKey(5)
yield results
def server():
HOST = "192.168.0.26"
PORT = 5689
server_socket = socket.socket()
print(HOST)
server_socket.bind((HOST, PORT))
server_socket.listen()
server_socket.settimeout(30)
print("waiting for connection")
conn, address = server_socket.accept()
print("connection successful, loading mediapipe")
poseGen = capturePose()
next(poseGen)
print("loaded mediapipe successful")
while 1:
clientMessage = conn.recv(4096).decode()
print(f"{address} :: {clientMessage}")
if clientMessage == "close":
conn.close()
break
result = next(poseGen)
if result:
print("pose world landmark sent successfully")
else:
print("No landmarks available")
conn.sendall(bytes(str(result.pose_world_landmarks), "utf-8"))
if __name__ == "__main__":
server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment