Skip to content

Instantly share code, notes, and snippets.

@RJ
Created May 11, 2018 15:29
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save RJ/7acba5b06a03c9b521601e08d0327d56 to your computer and use it in GitHub Desktop.
Save RJ/7acba5b06a03c9b521601e08d0327d56 to your computer and use it in GitHub Desktop.
Python to control cheap USB relay from amazon: QinHeng Electronics HL-340 USB-Serial adapter
# To run with pyusb debugging:
#
# PYUSB_DEBUG=debug python relay.py
#
# Grab the vendor and product codes from syslog when plugging in the relay:
#
# usb 3-1: New USB device found, idVendor=1a86, idProduct=7523
#
import time
import usb.core
import usb.util
dev = usb.core.find(idVendor=0x1a86, idProduct=0x7523)
if dev is None:
raise ValueError("Device not found")
if dev.is_kernel_driver_active(0):
dev.detach_kernel_driver(0)
cfg = dev.get_active_configuration()
intf = cfg[(0,0)]
ep = usb.util.find_descriptor(
intf,
# match the first OUT endpoint
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_OUT)
assert ep is not None
close_relay_cmd = [0xA0, 0x01, 0x01, 0xA2]
open_relay_cmd = [0xA0, 0x01, 0x00, 0xA1]
# Since I'm using this to send a signal to a gate controller, I'm simulating just
# pressing a button to make the circuit for 2 seconds, then releasing:
ep.write(close_relay_cmd)
time.sleep(2)
ep.write(open_relay_cmd)
@StarTestTM
Copy link

Hello. I try to run your code, but get following error message:

Traceback (most recent call last):
File "D:\Projects\Python\Phytec\WBRD300-REV5\Python Scripts\Modules\RelayControl.py", line 20, in
if dev.is_kernel_driver_active(0):
File "C:\Python\Python39-32\lib\site-packages\usb\core.py", line 989, in is_kernel_driver_active
self._ctx.managed_open()
File "C:\Python\Python39-32\lib\site-packages\usb\core.py", line 105, in managed_open
self.handle = self.backend.open_device(self.dev)
File "C:\Python\Python39-32\lib\site-packages\usb\backend\libusb1.py", line 722, in open_device
return _DeviceHandle(dev)
File "C:\Python\Python39-32\lib\site-packages\usb\backend\libusb1.py", line 600, in init
_check(_lib.libusb_open(self.devid, byref(self.handle)))
File "C:\Python\Python39-32\lib\site-packages\usb\backend\libusb1.py", line 550, in _check
raise NotImplementedError(_strerror(ret))
NotImplementedError: Operation not supported or unimplemented on this platform

What wrong?

@RJ
Copy link
Author

RJ commented May 16, 2022

for future-me, here's how it's wired up to mqtt:

#!/usr/bin/python
# To run with pyusb debugging:
#
#   PYUSB_DEBUG=debug python relay.py
#
# Grab the vendor and product codes from syslog when plugging in the relay:
#
#   usb 3-1: New USB device found, idVendor=1a86, idProduct=7523
#
import time
import usb.core
import usb.util
import paho.mqtt.subscribe as subscribe

dev = usb.core.find(idVendor=0x1a86, idProduct=0x7523)

if dev is None:
	raise ValueError("Device not found")

if dev.is_kernel_driver_active(0):
	dev.detach_kernel_driver(0)

cfg = dev.get_active_configuration()
intf = cfg[(0,0)]

ep = usb.util.find_descriptor(
    intf,
    # match the first OUT endpoint
    custom_match = \
    lambda e: \
        usb.util.endpoint_direction(e.bEndpointAddress) == \
        usb.util.ENDPOINT_OUT)

assert ep is not None

close_relay_cmd = [0xA0, 0x01, 0x01, 0xA2]
open_relay_cmd = [0xA0, 0x01, 0x00, 0xA1]

def handle_mqtt(client, userdata, message):
	print("topic='%s' payload='%s'" % (message.topic, message.payload))
	if message.payload == "ON":
		print "Closing relay (gate -> open)"
		ep.write(close_relay_cmd)
		client.publish("hass/front_gate/state", payload="ON", retain=True)
		return

	if message.payload == "OFF":
		print "Opening relay (gate -> close)"
		ep.write(open_relay_cmd)
		client.publish("hass/front_gate/state", payload="OFF", retain=True)
		return

	print "Unhandled msg"

auth_data = {'username': "gaterelay", 'password': "xxxxxxxxxxx"}

subscribe.callback(handle_mqtt, "hass/front_gate", hostname="zorro", port=1883, auth=auth_data, client_id="gates")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment