Skip to content

Instantly share code, notes, and snippets.

@jmhobbs
Last active July 15, 2022 18:15
Show Gist options
  • Star 41 You must be signed in to star a gist
  • Fork 14 You must be signed in to fork a gist
  • Save jmhobbs/11276249 to your computer and use it in GitHub Desktop.
Save jmhobbs/11276249 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import socket
import os
print("Connecting...")
if os.path.exists("/tmp/python_unix_sockets_example"):
client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
client.connect("/tmp/python_unix_sockets_example")
print("Ready.")
print("Ctrl-C to quit.")
print("Sending 'DONE' shuts down the server and quits.")
while True:
try:
x = input("> ")
if "" != x:
print("SEND:", x)
client.send(x.encode('utf-8'))
if "DONE" == x:
print("Shutting down.")
break
except KeyboardInterrupt as k:
print("Shutting down.")
client.close()
break
else:
print("Couldn't Connect!")
print("Done")
# -*- coding: utf-8 -*-
import socket
import os
if os.path.exists("/tmp/python_unix_sockets_example"):
os.remove("/tmp/python_unix_sockets_example")
print("Opening socket...")
server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
server.bind("/tmp/python_unix_sockets_example")
print("Listening...")
while True:
datagram = server.recv(1024)
if not datagram:
break
else:
print("-" * 20)
print(datagram.decode('utf-8'))
if "DONE" == datagram.decode('utf-8'):
break
print("-" * 20)
print("Shutting down...")
server.close()
os.remove("/tmp/python_unix_sockets_example")
print("Done")
@jmhobbs
Copy link
Author

jmhobbs commented Apr 25, 2014

Example run:

jmhobbs@Cordelia:~/Desktop/p3 ✪ python3 server.py 
Opening socket...
Listening...
--------------------
Hello World!
--------------------
DONE
--------------------
Shutting down...
Done
jmhobbs@Cordelia:~/Desktop/p3 ✪
jmhobbs@Cordelia:~/Desktop/p3 ✪ python3 client.py 
Connecting...
Ready.
Ctrl-C to quit.
Sending 'DONE' shuts down the server and quits.
> Hello World!
SEND: Hello World!
> DONE
SEND: DONE
Shutting down.
jmhobbs@Cordelia:~/Desktop/p3 ✪

@DtxdF
Copy link

DtxdF commented Dec 15, 2019

<3

@crabvk
Copy link

crabvk commented Feb 13, 2020

And for echo server

import os
import socket
import time

socket_path = os.path.join(os.environ['XDG_RUNTIME_DIR'], 'echo.socket')

if os.path.exists(socket_path):
    os.remove(socket_path)

with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
    sock.bind(socket_path)
    sock.listen()
    while True:
        conn, _addr = sock.accept()
        data = conn.recv(1024)
        time.sleep(1) # do some work
        conn.send(data)
        conn.close()

tested with socat

echo hello | socat -t 2 $XDG_RUNTIME_DIR/echo.socket -

NOTE: don't run this server in interactive mode, it won't work properly.

@ron-vertiv
Copy link

@crabvk, can you provide a python client that sends the data and receives the response?

@jmhobbs
Copy link
Author

jmhobbs commented Jun 18, 2021

@ron-vertiv The client from the gist can be modified slightly to work:

# -*- coding: utf-8 -*-
import socket
import os

socket_path = os.path.join(os.environ['XDG_RUNTIME_DIR'], 'echo.socket')

print("Connecting...")
if os.path.exists(socket_path):
    client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    client.connect(socket_path)
    print("Ready.")
    print("Ctrl-C to quit.")
    print("Sending 'DONE' shuts down the server and quits.")
    while True:
        try:
            x = input("> ")
            if "" != x:
                print("SEND:", x)
                client.send(x.encode('utf-8'))
                if "DONE" == x:
                    print("Shutting down.")
                    break
                data = client.recv(1024)
                print("RECV:", data)
        except KeyboardInterrupt as k:
            print("Shutting down.")
            client.close()
            break
else:
    print("Couldn't Connect!")
    print("Done")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment