Skip to content

Instantly share code, notes, and snippets.

@freol35241
Last active January 4, 2023 19:08
Show Gist options
  • Save freol35241/945ec590fa40610544d8a5cfbac87921 to your computer and use it in GitHub Desktop.
Save freol35241/945ec590fa40610544d8a5cfbac87921 to your computer and use it in GitHub Desktop.
from itertools import zip_longest
SEPARATOR = "/"
SINGLE = "+"
ALL = "#"
def mqtt_match(pattern: str, topic: str) -> bool:
"""Evaluate if a topic matches a pattern
Args:
pattern (str): The pattern to match against
topic (str): The topic to be matched
Returns:
bool: Match or no match
"""
pattern_levels = pattern.split(SEPARATOR)
topic_levels = topic.split(SEPARATOR)
last_index = len(pattern_levels) - 1
for index, (current_pattern, current_topic) in enumerate(
zip_longest(pattern_levels, topic_levels)
):
# Only allow '#' wildcard at the end
if current_pattern == ALL:
return index == last_index
# Pattern and topic should be equal unless pattern is a '+' wildcard
if current_pattern not in (SINGLE, current_topic):
return False
# If we get to here, we have no earlier mismatches
return len(pattern_levels) == len(topic_levels)
from mqtt_topic_matcher import mqtt_match
def test_mqtt_topic_match():
# No wildcards
assert mqtt_match("foo/bar/baz", "foo/bar/baz")
assert not mqtt_match("foo/bar/baz", "baz/bar/foo")
# With leading slash
assert mqtt_match("/foo/bar", "/foo/bar")
assert not mqtt_match("/foo/bar", "/bar/foo")
# With leading and trailing slash
assert mqtt_match("/foo/bar/", "/foo/bar/")
assert not mqtt_match("/foo/bar/", "/bar/foo/")
# Wildcard ALL
assert mqtt_match("#", "foo/bar/baz")
assert mqtt_match("foo/#", "foo/bar/baz")
assert mqtt_match("foo/bar/#", "foo/bar") # Should match the parent level
assert not mqtt_match("foo/bar/#", "foo") # Should not match two levels above
assert not mqtt_match("#/bar/baz", "foo/bar/baz")
# Wildcard SINGLE
assert mqtt_match("+/bar/baz", "foo/bar/baz")
assert mqtt_match("foo/bar/+", "foo/bar/baz")
assert not mqtt_match("foo/bar/+", "foo/bar/baz/")
# Wildcard ALL and SINGLE
assert mqtt_match("foo/+/baz", "foo/bar/baz")
assert mqtt_match("foo/+/#", "foo/bar/baz")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment