Created
November 3, 2022 13:39
-
-
Save anteverse/a7c24d9df77a2cad6a17b836826ebf1b to your computer and use it in GitHub Desktop.
Example of Pylint custom checker that verifies tasks definition names
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FlowObjectNameChecker(checkers.BaseChecker): | |
__implements__ = interfaces.IAstroidChecker | |
name = "invalid-flow-object-definition-name" | |
msgs = { | |
"C9002": ( | |
"The task name is not suffixed by `_task`", | |
"invalid-task-definition-name", | |
"Emitted when a flow function definition is not suffixed by `_task` as per the naming convention requires", | |
), | |
} | |
def visit_functiondef(self, node): | |
if not node.decorators: | |
return | |
for decorator in node.decorators.get_children(): | |
if not hasattr(decorator, "func") or not decorator.func: | |
return | |
if is_prefect_task_decorator(decorator.func): | |
if not has_task_valid_suffix(node.name): | |
self.add_message("invalid-task-definition-name", node=node) | |
def register(linter): | |
linter.register_checker(FlowObjectNameChecker(linter)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment