poethepoet.task

 1from .cmd import CmdTask
 2from .expr import ExprTask
 3from .ref import RefTask
 4from .script import ScriptTask
 5from .sequence import SequenceTask
 6from .shell import ShellTask
 7from .switch import SwitchTask
 8
 9__all__ = [
10    "CmdTask",
11    "ExprTask",
12    "RefTask",
13    "ScriptTask",
14    "SequenceTask",
15    "ShellTask",
16    "SwitchTask",
17]
class CmdTask(poethepoet.task.base.PoeTask):
15class CmdTask(PoeTask):
16    """
17    A task consisting of a reference to a shell command
18    """
19
20    __key__ = "cmd"
21
22    class TaskOptions(PoeTask.TaskOptions):
23        use_exec: bool = False
24
25        def validate(self):
26            """
27            Validation rules that don't require any extra context go here.
28            """
29            super().validate()
30            if self.use_exec and self.capture_stdout:
31                raise ConfigValidationError(
32                    "'use_exec' and 'capture_stdout'"
33                    " options cannot be both provided on the same task."
34                )
35
36    class TaskSpec(PoeTask.TaskSpec):
37        content: str
38        options: "CmdTask.TaskOptions"
39
40        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
41            """
42            Perform validations on this TaskSpec that apply to a specific task type
43            """
44            if not self.content.strip():
45                raise ConfigValidationError("Task has no content")
46
47    spec: TaskSpec
48
49    def _handle_run(
50        self,
51        context: "RunContext",
52        env: "EnvVarsManager",
53    ) -> int:
54        named_arg_values, extra_args = self.get_parsed_arguments(env)
55        env.update(named_arg_values)
56
57        cmd = (*self._resolve_commandline(context, env), *extra_args)
58
59        self._print_action(shlex.join(cmd), context.dry)
60
61        return self._get_executor(context, env).execute(
62            cmd, use_exec=self.spec.options.get("use_exec", False)
63        )
64
65    def _resolve_commandline(self, context: "RunContext", env: "EnvVarsManager"):
66        from ..helpers.command import parse_poe_cmd, resolve_command_tokens
67        from ..helpers.command.ast_core import ParseError
68
69        try:
70            command_lines = parse_poe_cmd(self.spec.content).command_lines
71        except ParseError as error:
72            raise PoeException(
73                f"Couldn't parse command line for task {self.name!r}: {error.args[0]}"
74            ) from error
75
76        if not command_lines:
77            raise PoeException(
78                f"Invalid cmd task {self.name!r} does not include any command lines"
79            )
80        if any(line._terminator == ";" for line in command_lines[:-1]):
81            # lines terminated by a line break or comment are implicitly joined
82            raise PoeException(
83                f"Invalid cmd task {self.name!r} includes multiple command lines"
84            )
85
86        working_dir = self.get_working_dir(env)
87
88        result = []
89        for cmd_token, has_glob in resolve_command_tokens(command_lines, env):
90            if has_glob:
91                # Resolve glob pattern from the working directory
92                result.extend([str(match) for match in working_dir.glob(cmd_token)])
93            else:
94                result.append(cmd_token)
95
96        return result

A task consisting of a reference to a shell command

class CmdTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
22    class TaskOptions(PoeTask.TaskOptions):
23        use_exec: bool = False
24
25        def validate(self):
26            """
27            Validation rules that don't require any extra context go here.
28            """
29            super().validate()
30            if self.use_exec and self.capture_stdout:
31                raise ConfigValidationError(
32                    "'use_exec' and 'capture_stdout'"
33                    " options cannot be both provided on the same task."
34                )

A special kind of config object that parses options ...

use_exec: bool = False
def validate(self):
25        def validate(self):
26            """
27            Validation rules that don't require any extra context go here.
28            """
29            super().validate()
30            if self.use_exec and self.capture_stdout:
31                raise ConfigValidationError(
32                    "'use_exec' and 'capture_stdout'"
33                    " options cannot be both provided on the same task."
34                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class CmdTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
36    class TaskSpec(PoeTask.TaskSpec):
37        content: str
38        options: "CmdTask.TaskOptions"
39
40        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
41            """
42            Perform validations on this TaskSpec that apply to a specific task type
43            """
44            if not self.content.strip():
45                raise ConfigValidationError("Task has no content")
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'CmdTask'>
class ExprTask(poethepoet.task.base.PoeTask):
 25class ExprTask(PoeTask):
 26    """
 27    A task consisting of a python expression
 28    """
 29
 30    content: str
 31
 32    __key__ = "expr"
 33
 34    class TaskOptions(PoeTask.TaskOptions):
 35        imports: Sequence[str] = tuple()
 36        assert_: Union[bool, int] = False
 37        use_exec: bool = False
 38
 39        def validate(self):
 40            super().validate()
 41            if self.use_exec and self.capture_stdout:
 42                raise ConfigValidationError(
 43                    "'use_exec' and 'capture_stdout'"
 44                    " options cannot be both provided on the same task."
 45                )
 46
 47    class TaskSpec(PoeTask.TaskSpec):
 48        content: str
 49        options: "ExprTask.TaskOptions"
 50
 51        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
 52            """
 53            Perform validations on this TaskSpec that apply to a specific task type
 54            """
 55            try:
 56                # ruff: noqa: E501
 57                self.task_type._substitute_env_vars(self.content.strip(), {})  # type: ignore[attr-defined]
 58            except (ValueError, ExpressionParseError) as error:
 59                raise ConfigValidationError(f"Invalid expression: {error}")
 60
 61    spec: TaskSpec
 62
 63    def _handle_run(
 64        self,
 65        context: "RunContext",
 66        env: "EnvVarsManager",
 67    ) -> int:
 68        from ..helpers.python import format_class
 69
 70        named_arg_values, extra_args = self.get_parsed_arguments(env)
 71        env.update(named_arg_values)
 72
 73        imports = self.spec.options.imports
 74
 75        expr, env_values = self.parse_content(named_arg_values, env, imports)
 76        argv = [
 77            self.spec.name,
 78            *(env.fill_template(token) for token in self.invocation[1:]),
 79        ]
 80
 81        script = [
 82            f"import sys;" f"sys.path.append('src');" f"sys.argv = {argv!r};",
 83            (f"import {', '.join(imports)}; " if imports else ""),
 84            f"{format_class(named_arg_values)}",
 85            f"{format_class(env_values, classname='__env')}",
 86            f"result = ({expr});",
 87            "print(result);",
 88        ]
 89
 90        falsy_return_code = int(self.spec.options.get("assert"))
 91        if falsy_return_code:
 92            script.append(f"exit(0 if result else {falsy_return_code});")
 93
 94        # Exactly which python executable to use is usually resolved by the executor
 95        # It's important that the script contains no line breaks to avoid issues on
 96        # windows
 97        cmd = ("python", "-c", "".join(script))
 98
 99        self._print_action(self.spec.content.strip(), context.dry)
100        return self._get_executor(context, env).execute(
101            cmd, use_exec=self.spec.options.use_exec
102        )
103
104    def parse_content(
105        self,
106        args: Optional[Dict[str, Any]],
107        env: "EnvVarsManager",
108        imports=Iterable[str],
109    ) -> Tuple[str, Dict[str, str]]:
110        """
111        Returns the expression to evaluate and the subset of env vars that it references
112
113        Templated referenced to env vars are resolve before parsing the expression.
114
115        Will raise an exception if the content contains invalid syntax or references
116        python variables that are not in scope.
117        """
118
119        from ..helpers.python import resolve_expression
120
121        expression, accessed_vars = self._substitute_env_vars(
122            self.spec.content.strip(), env.to_dict()
123        )
124
125        expression = resolve_expression(
126            source=expression,
127            arguments=set(args or tuple()),
128            call_only=False,
129            allowed_vars={"sys", "__env", *imports},
130        )
131        # Strip out any new lines because they can be problematic on windows
132        expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression)
133        expression = re.sub(r"(\r\n|\r|\n)", " ", expression)
134
135        return expression, accessed_vars
136
137    @classmethod
138    def _substitute_env_vars(cls, content: str, env: Mapping[str, str]):
139        """
140        Substitute ${template} references to env vars with a reference to a python class
141        attribute like __env.var, and collect the accessed env vars so we can construct
142        that class with the required attributes later.
143        """
144
145        from ..env.template import SpyDict, apply_envvars_to_template
146
147        # Spy on access to the env, so that instead of replacing template ${keys} with
148        # the corresponding value, replace them with a python name and keep track of
149        # referenced env vars.
150        accessed_vars: Dict[str, str] = {}
151
152        def getitem_spy(obj: SpyDict, key: str, value: str):
153            accessed_vars[key] = value
154            return f"__env.{key}"
155
156        expression = apply_envvars_to_template(
157            content=content,
158            env=SpyDict(env, getitem_spy=getitem_spy),
159            require_braces=True,
160        )
161
162        return expression, accessed_vars

A task consisting of a python expression

content: str
def parse_content( self, args: Optional[Dict[str, Any]], env: poethepoet.env.manager.EnvVarsManager, imports=typing.Iterable[str]) -> Tuple[str, Dict[str, str]]:
104    def parse_content(
105        self,
106        args: Optional[Dict[str, Any]],
107        env: "EnvVarsManager",
108        imports=Iterable[str],
109    ) -> Tuple[str, Dict[str, str]]:
110        """
111        Returns the expression to evaluate and the subset of env vars that it references
112
113        Templated referenced to env vars are resolve before parsing the expression.
114
115        Will raise an exception if the content contains invalid syntax or references
116        python variables that are not in scope.
117        """
118
119        from ..helpers.python import resolve_expression
120
121        expression, accessed_vars = self._substitute_env_vars(
122            self.spec.content.strip(), env.to_dict()
123        )
124
125        expression = resolve_expression(
126            source=expression,
127            arguments=set(args or tuple()),
128            call_only=False,
129            allowed_vars={"sys", "__env", *imports},
130        )
131        # Strip out any new lines because they can be problematic on windows
132        expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression)
133        expression = re.sub(r"(\r\n|\r|\n)", " ", expression)
134
135        return expression, accessed_vars

Returns the expression to evaluate and the subset of env vars that it references

Templated referenced to env vars are resolve before parsing the expression.

Will raise an exception if the content contains invalid syntax or references python variables that are not in scope.

class ExprTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
34    class TaskOptions(PoeTask.TaskOptions):
35        imports: Sequence[str] = tuple()
36        assert_: Union[bool, int] = False
37        use_exec: bool = False
38
39        def validate(self):
40            super().validate()
41            if self.use_exec and self.capture_stdout:
42                raise ConfigValidationError(
43                    "'use_exec' and 'capture_stdout'"
44                    " options cannot be both provided on the same task."
45                )

A special kind of config object that parses options ...

imports: Sequence[str] = ()
assert_: Union[bool, int] = False
use_exec: bool = False
def validate(self):
39        def validate(self):
40            super().validate()
41            if self.use_exec and self.capture_stdout:
42                raise ConfigValidationError(
43                    "'use_exec' and 'capture_stdout'"
44                    " options cannot be both provided on the same task."
45                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class ExprTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
47    class TaskSpec(PoeTask.TaskSpec):
48        content: str
49        options: "ExprTask.TaskOptions"
50
51        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
52            """
53            Perform validations on this TaskSpec that apply to a specific task type
54            """
55            try:
56                # ruff: noqa: E501
57                self.task_type._substitute_env_vars(self.content.strip(), {})  # type: ignore[attr-defined]
58            except (ValueError, ExpressionParseError) as error:
59                raise ConfigValidationError(f"Invalid expression: {error}")
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'ExprTask'>
class RefTask(poethepoet.task.base.PoeTask):
 14class RefTask(PoeTask):
 15    """
 16    A task consisting of a reference to another task
 17    """
 18
 19    __key__ = "ref"
 20
 21    class TaskOptions(PoeTask.TaskOptions):
 22        def validate(self):
 23            """
 24            Validation rules that don't require any extra context go here.
 25            """
 26            if self.executor:
 27                raise ConfigValidationError(
 28                    "Option 'executor' cannot be set on a ref task"
 29                )
 30            if self.capture_stdout:
 31                raise ConfigValidationError(
 32                    "Option 'capture_stdout' cannot be set on a ref task"
 33                )
 34
 35    class TaskSpec(PoeTask.TaskSpec):
 36        content: str
 37        options: "RefTask.TaskOptions"
 38
 39        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
 40            """
 41            Perform validations on this TaskSpec that apply to a specific task type
 42            """
 43
 44            import shlex
 45
 46            task_name_ref = shlex.split(self.content)[0]
 47
 48            if task_name_ref not in task_specs:
 49                raise ConfigValidationError(
 50                    f"Includes reference to unknown task {task_name_ref!r}"
 51                )
 52
 53            if task_specs.get(task_name_ref).options.get("use_exec", False):
 54                raise ConfigValidationError(
 55                    f"Illegal reference to task with "
 56                    f"'use_exec' set to true: {task_name_ref!r}"
 57                )
 58
 59    spec: TaskSpec
 60
 61    def _handle_run(
 62        self,
 63        context: "RunContext",
 64        env: "EnvVarsManager",
 65    ) -> int:
 66        """
 67        Lookup and delegate to the referenced task
 68        """
 69        import shlex
 70
 71        named_arg_values, extra_args = self.get_parsed_arguments(env)
 72        env.update(named_arg_values)
 73
 74        ref_invocation = (
 75            *(
 76                env.fill_template(token)
 77                for token in shlex.split(env.fill_template(self.spec.content.strip()))
 78            ),
 79            *extra_args,
 80        )
 81
 82        task = self.ctx.specs.get(ref_invocation[0]).create_task(
 83            invocation=ref_invocation, ctx=TaskContext.from_task(self)
 84        )
 85
 86        if task.has_deps():
 87            return self._run_task_graph(task, context, env)
 88
 89        return task.run(context=context, parent_env=env)
 90
 91    def _run_task_graph(
 92        self,
 93        task: "PoeTask",
 94        context: "RunContext",
 95        env: "EnvVarsManager",
 96    ) -> int:
 97        from ..exceptions import ExecutionError
 98        from .graph import TaskExecutionGraph
 99
100        graph = TaskExecutionGraph(task, context)
101        plan = graph.get_execution_plan()
102        for stage in plan:
103            for stage_task in stage:
104                if stage_task == task:
105                    # The final sink task gets special treatment
106                    return task.run(context=context, parent_env=env)
107
108                task_result = stage_task.run(context=context)
109                if task_result:
110                    raise ExecutionError(
111                        f"Task graph aborted after failed task {stage_task.name!r}"
112                    )
113        return 0

A task consisting of a reference to another task

class RefTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
21    class TaskOptions(PoeTask.TaskOptions):
22        def validate(self):
23            """
24            Validation rules that don't require any extra context go here.
25            """
26            if self.executor:
27                raise ConfigValidationError(
28                    "Option 'executor' cannot be set on a ref task"
29                )
30            if self.capture_stdout:
31                raise ConfigValidationError(
32                    "Option 'capture_stdout' cannot be set on a ref task"
33                )

A special kind of config object that parses options ...

def validate(self):
22        def validate(self):
23            """
24            Validation rules that don't require any extra context go here.
25            """
26            if self.executor:
27                raise ConfigValidationError(
28                    "Option 'executor' cannot be set on a ref task"
29                )
30            if self.capture_stdout:
31                raise ConfigValidationError(
32                    "Option 'capture_stdout' cannot be set on a ref task"
33                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class RefTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
35    class TaskSpec(PoeTask.TaskSpec):
36        content: str
37        options: "RefTask.TaskOptions"
38
39        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
40            """
41            Perform validations on this TaskSpec that apply to a specific task type
42            """
43
44            import shlex
45
46            task_name_ref = shlex.split(self.content)[0]
47
48            if task_name_ref not in task_specs:
49                raise ConfigValidationError(
50                    f"Includes reference to unknown task {task_name_ref!r}"
51                )
52
53            if task_specs.get(task_name_ref).options.get("use_exec", False):
54                raise ConfigValidationError(
55                    f"Illegal reference to task with "
56                    f"'use_exec' set to true: {task_name_ref!r}"
57                )
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'RefTask'>
class ScriptTask(poethepoet.task.base.PoeTask):
 16class ScriptTask(PoeTask):
 17    """
 18    A task consisting of a reference to a python script
 19    """
 20
 21    content: str
 22
 23    __key__ = "script"
 24
 25    class TaskOptions(PoeTask.TaskOptions):
 26        use_exec: bool = False
 27        print_result: bool = False
 28
 29        def validate(self):
 30            super().validate()
 31            if self.use_exec and self.capture_stdout:
 32                raise ConfigValidationError(
 33                    "'use_exec' and 'capture_stdout'"
 34                    " options cannot be both provided on the same task."
 35                )
 36
 37    class TaskSpec(PoeTask.TaskSpec):
 38        content: str
 39        options: "ScriptTask.TaskOptions"
 40
 41        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
 42            """
 43            Perform validations on this TaskSpec that apply to a specific task type
 44            """
 45            from ..helpers.python import parse_and_validate
 46
 47            try:
 48                target_module, target_ref = self.content.split(":", 1)
 49                if not target_ref.isidentifier():
 50                    parse_and_validate(target_ref, call_only=True)
 51            except (ValueError, ExpressionParseError):
 52                raise ConfigValidationError(
 53                    f"Invalid callable reference {self.content!r}\n"
 54                    "(expected something like `module:callable` or `module:callable()`)"
 55                )
 56
 57    spec: TaskSpec
 58
 59    def _handle_run(
 60        self,
 61        context: "RunContext",
 62        env: "EnvVarsManager",
 63    ) -> int:
 64        from ..helpers.python import format_class
 65
 66        named_arg_values, extra_args = self.get_parsed_arguments(env)
 67        env.update(named_arg_values)
 68
 69        # TODO: do something about extra_args, error?
 70
 71        target_module, function_call = self.parse_content(named_arg_values)
 72        function_ref = function_call[: function_call.index("(")]
 73
 74        argv = [
 75            self.name,
 76            *(env.fill_template(token) for token in self.invocation[1:]),
 77        ]
 78
 79        # TODO: check whether the project really does use src layout, and don't do
 80        #       sys.path.append('src') if it doesn't
 81
 82        script = [
 83            "import asyncio,os,sys;",
 84            "from inspect import iscoroutinefunction as _c;",
 85            "from os import environ;",
 86            "from importlib import import_module as _i;",
 87            f"sys.argv = {argv!r}; sys.path.append('src');",
 88            f"{format_class(named_arg_values)}",
 89            f"_m = _i('{target_module}');",
 90            f"_r = asyncio.run(_m.{function_call}) if _c(_m.{function_ref})",
 91            f" else _m.{function_call};",
 92        ]
 93
 94        if self.spec.options.get("print_result"):
 95            script.append("_r is not None and print(_r);")
 96
 97        # Exactly which python executable to use is usually resolved by the executor
 98        # It's important that the script contains no line breaks to avoid issues on
 99        # windows
100        cmd = ("python", "-c", "".join(script))
101
102        self._print_action(shlex.join(argv), context.dry)
103        return self._get_executor(context, env).execute(
104            cmd, use_exec=self.spec.options.get("use_exec", False)
105        )
106
107    def parse_content(self, args: Optional[Dict[str, Any]]) -> Tuple[str, str]:
108        """
109        Returns the module to load, and the function call to execute.
110
111        Will raise an exception if the function call contains invalid syntax or
112        references variables that are not in scope.
113        """
114
115        from ..helpers.python import resolve_expression
116
117        try:
118            target_module, target_ref = self.spec.content.strip().split(":", 1)
119        except ValueError:
120            raise ExpressionParseError(
121                f"Invalid task content: {self.spec.content.strip()!r}"
122            )
123
124        if target_ref.isidentifier():
125            if args:
126                return target_module, f"{target_ref}(**({args}))"
127            return target_module, f"{target_ref}()"
128
129        function_call = resolve_expression(
130            target_ref,
131            set(args or tuple()),
132            call_only=True,
133            allowed_vars={"sys", "os", "environ"},
134        )
135        # Strip out any new lines because they can be problematic on windows
136        function_call = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", function_call)
137        function_call = re.sub(r"(\r\n|\r|\n)", " ", function_call)
138
139        return target_module, function_call

A task consisting of a reference to a python script

content: str
def parse_content(self, args: Optional[Dict[str, Any]]) -> Tuple[str, str]:
107    def parse_content(self, args: Optional[Dict[str, Any]]) -> Tuple[str, str]:
108        """
109        Returns the module to load, and the function call to execute.
110
111        Will raise an exception if the function call contains invalid syntax or
112        references variables that are not in scope.
113        """
114
115        from ..helpers.python import resolve_expression
116
117        try:
118            target_module, target_ref = self.spec.content.strip().split(":", 1)
119        except ValueError:
120            raise ExpressionParseError(
121                f"Invalid task content: {self.spec.content.strip()!r}"
122            )
123
124        if target_ref.isidentifier():
125            if args:
126                return target_module, f"{target_ref}(**({args}))"
127            return target_module, f"{target_ref}()"
128
129        function_call = resolve_expression(
130            target_ref,
131            set(args or tuple()),
132            call_only=True,
133            allowed_vars={"sys", "os", "environ"},
134        )
135        # Strip out any new lines because they can be problematic on windows
136        function_call = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", function_call)
137        function_call = re.sub(r"(\r\n|\r|\n)", " ", function_call)
138
139        return target_module, function_call

Returns the module to load, and the function call to execute.

Will raise an exception if the function call contains invalid syntax or references variables that are not in scope.

class ScriptTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
25    class TaskOptions(PoeTask.TaskOptions):
26        use_exec: bool = False
27        print_result: bool = False
28
29        def validate(self):
30            super().validate()
31            if self.use_exec and self.capture_stdout:
32                raise ConfigValidationError(
33                    "'use_exec' and 'capture_stdout'"
34                    " options cannot be both provided on the same task."
35                )

A special kind of config object that parses options ...

use_exec: bool = False
print_result: bool = False
def validate(self):
29        def validate(self):
30            super().validate()
31            if self.use_exec and self.capture_stdout:
32                raise ConfigValidationError(
33                    "'use_exec' and 'capture_stdout'"
34                    " options cannot be both provided on the same task."
35                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class ScriptTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
37    class TaskSpec(PoeTask.TaskSpec):
38        content: str
39        options: "ScriptTask.TaskOptions"
40
41        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
42            """
43            Perform validations on this TaskSpec that apply to a specific task type
44            """
45            from ..helpers.python import parse_and_validate
46
47            try:
48                target_module, target_ref = self.content.split(":", 1)
49                if not target_ref.isidentifier():
50                    parse_and_validate(target_ref, call_only=True)
51            except (ValueError, ExpressionParseError):
52                raise ConfigValidationError(
53                    f"Invalid callable reference {self.content!r}\n"
54                    "(expected something like `module:callable` or `module:callable()`)"
55                )
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'ScriptTask'>
class SequenceTask(poethepoet.task.base.PoeTask):
 26class SequenceTask(PoeTask):
 27    """
 28    A task consisting of a sequence of other tasks
 29    """
 30
 31    content: List[Union[str, Dict[str, Any]]]
 32
 33    __key__ = "sequence"
 34    __content_type__: ClassVar[Type] = list
 35
 36    class TaskOptions(PoeTask.TaskOptions):
 37        ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False
 38        default_item_type: Optional[str] = None
 39
 40        def validate(self):
 41            """
 42            Validation rules that don't require any extra context go here.
 43            """
 44            super().validate()
 45            if self.default_item_type is not None and not PoeTask.is_task_type(
 46                self.default_item_type, content_type=str
 47            ):
 48                raise ConfigValidationError(
 49                    "Unsupported value for option `default_item_type`,\n"
 50                    f"Expected one of {PoeTask.get_task_types(content_type=str)}"
 51                )
 52
 53    class TaskSpec(PoeTask.TaskSpec):
 54        content: list
 55        options: "SequenceTask.TaskOptions"
 56        subtasks: Sequence[PoeTask.TaskSpec]
 57
 58        def __init__(
 59            self,
 60            name: str,
 61            task_def: Dict[str, Any],
 62            factory: "TaskSpecFactory",
 63            source: "ConfigPartition",
 64            parent: Optional["PoeTask.TaskSpec"] = None,
 65        ):
 66            super().__init__(name, task_def, factory, source, parent)
 67
 68            self.subtasks = []
 69            for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]):
 70                if not isinstance(sub_task_def, (str, dict, list)):
 71                    raise ConfigValidationError(
 72                        f"Item #{index} in sequence task should be a value of "
 73                        "type: str | dict | list",
 74                        task_name=self.name,
 75                    )
 76
 77                subtask_name = (
 78                    sub_task_def
 79                    if (
 80                        isinstance(sub_task_def, str)
 81                        and (sub_task_def[0].isalpha() or sub_task_def[0] == "_")
 82                    )
 83                    else SequenceTask._subtask_name(name, index)
 84                )
 85                task_type_key = self.task_type.resolve_task_type(
 86                    sub_task_def,
 87                    factory.config,
 88                    array_item=task_def.get("default_item_type", True),
 89                )
 90
 91                try:
 92                    self.subtasks.append(
 93                        factory.get(
 94                            subtask_name, sub_task_def, task_type_key, parent=self
 95                        )
 96                    )
 97                except PoeException:
 98                    raise ConfigValidationError(
 99                        f"Failed to interpret subtask #{index} in sequence",
100                        task_name=self.name,
101                    )
102
103        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
104            """
105            Perform validations on this TaskSpec that apply to a specific task type
106            """
107            for index, subtask in enumerate(self.subtasks):
108                if subtask.args:
109                    raise ConfigValidationError(
110                        "Unsupported option 'args' for task declared inside sequence"
111                    )
112
113                subtask.validate(config, task_specs)
114
115    spec: TaskSpec
116
117    def __init__(
118        self,
119        spec: TaskSpec,
120        invocation: Tuple[str, ...],
121        ctx: TaskContext,
122        capture_stdout: bool = False,
123    ):
124        assert capture_stdout in (False, None)
125        super().__init__(spec, invocation, ctx)
126        self.subtasks = [
127            task_spec.create_task(
128                invocation=(self._subtask_name(task_spec.name, index),),
129                ctx=TaskContext.from_task(self),
130            )
131            for index, task_spec in enumerate(spec.subtasks)
132        ]
133
134    def _handle_run(
135        self,
136        context: "RunContext",
137        env: "EnvVarsManager",
138    ) -> int:
139        named_arg_values, extra_args = self.get_parsed_arguments(env)
140        env.update(named_arg_values)
141
142        if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]):
143            raise PoeException(f"Sequence task {self.name!r} does not accept arguments")
144
145        if len(self.subtasks) > 1:
146            # Indicate on the global context that there are multiple stages
147            context.multistage = True
148
149        ignore_fail = self.spec.options.ignore_fail
150        non_zero_subtasks: List[str] = list()
151        for subtask in self.subtasks:
152            task_result = subtask.run(context=context, parent_env=env)
153            if task_result and not ignore_fail:
154                raise ExecutionError(
155                    f"Sequence aborted after failed subtask {subtask.name!r}"
156                )
157            if task_result:
158                non_zero_subtasks.append(subtask.name)
159
160        if non_zero_subtasks and ignore_fail == "return_non_zero":
161            raise ExecutionError(
162                f"Subtasks {', '.join(non_zero_subtasks)} returned non-zero exit status"
163            )
164        return 0
165
166    @classmethod
167    def _subtask_name(cls, task_name: str, index: int):
168        return f"{task_name}[{index}]"

A task consisting of a sequence of other tasks

SequenceTask( spec: SequenceTask.TaskSpec, invocation: Tuple[str, ...], ctx: poethepoet.task.base.TaskContext, capture_stdout: bool = False)
117    def __init__(
118        self,
119        spec: TaskSpec,
120        invocation: Tuple[str, ...],
121        ctx: TaskContext,
122        capture_stdout: bool = False,
123    ):
124        assert capture_stdout in (False, None)
125        super().__init__(spec, invocation, ctx)
126        self.subtasks = [
127            task_spec.create_task(
128                invocation=(self._subtask_name(task_spec.name, index),),
129                ctx=TaskContext.from_task(self),
130            )
131            for index, task_spec in enumerate(spec.subtasks)
132        ]
content: List[Union[str, Dict[str, Any]]]
subtasks
class SequenceTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
36    class TaskOptions(PoeTask.TaskOptions):
37        ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False
38        default_item_type: Optional[str] = None
39
40        def validate(self):
41            """
42            Validation rules that don't require any extra context go here.
43            """
44            super().validate()
45            if self.default_item_type is not None and not PoeTask.is_task_type(
46                self.default_item_type, content_type=str
47            ):
48                raise ConfigValidationError(
49                    "Unsupported value for option `default_item_type`,\n"
50                    f"Expected one of {PoeTask.get_task_types(content_type=str)}"
51                )

A special kind of config object that parses options ...

ignore_fail: Literal[True, False, 'return_zero', 'return_non_zero'] = False
default_item_type: Optional[str] = None
def validate(self):
40        def validate(self):
41            """
42            Validation rules that don't require any extra context go here.
43            """
44            super().validate()
45            if self.default_item_type is not None and not PoeTask.is_task_type(
46                self.default_item_type, content_type=str
47            ):
48                raise ConfigValidationError(
49                    "Unsupported value for option `default_item_type`,\n"
50                    f"Expected one of {PoeTask.get_task_types(content_type=str)}"
51                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class SequenceTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
 53    class TaskSpec(PoeTask.TaskSpec):
 54        content: list
 55        options: "SequenceTask.TaskOptions"
 56        subtasks: Sequence[PoeTask.TaskSpec]
 57
 58        def __init__(
 59            self,
 60            name: str,
 61            task_def: Dict[str, Any],
 62            factory: "TaskSpecFactory",
 63            source: "ConfigPartition",
 64            parent: Optional["PoeTask.TaskSpec"] = None,
 65        ):
 66            super().__init__(name, task_def, factory, source, parent)
 67
 68            self.subtasks = []
 69            for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]):
 70                if not isinstance(sub_task_def, (str, dict, list)):
 71                    raise ConfigValidationError(
 72                        f"Item #{index} in sequence task should be a value of "
 73                        "type: str | dict | list",
 74                        task_name=self.name,
 75                    )
 76
 77                subtask_name = (
 78                    sub_task_def
 79                    if (
 80                        isinstance(sub_task_def, str)
 81                        and (sub_task_def[0].isalpha() or sub_task_def[0] == "_")
 82                    )
 83                    else SequenceTask._subtask_name(name, index)
 84                )
 85                task_type_key = self.task_type.resolve_task_type(
 86                    sub_task_def,
 87                    factory.config,
 88                    array_item=task_def.get("default_item_type", True),
 89                )
 90
 91                try:
 92                    self.subtasks.append(
 93                        factory.get(
 94                            subtask_name, sub_task_def, task_type_key, parent=self
 95                        )
 96                    )
 97                except PoeException:
 98                    raise ConfigValidationError(
 99                        f"Failed to interpret subtask #{index} in sequence",
100                        task_name=self.name,
101                    )
102
103        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
104            """
105            Perform validations on this TaskSpec that apply to a specific task type
106            """
107            for index, subtask in enumerate(self.subtasks):
108                if subtask.args:
109                    raise ConfigValidationError(
110                        "Unsupported option 'args' for task declared inside sequence"
111                    )
112
113                subtask.validate(config, task_specs)
SequenceTask.TaskSpec( name: str, task_def: Dict[str, Any], factory: poethepoet.task.base.TaskSpecFactory, source: poethepoet.config.ConfigPartition, parent: Optional[poethepoet.task.base.PoeTask.TaskSpec] = None)
 58        def __init__(
 59            self,
 60            name: str,
 61            task_def: Dict[str, Any],
 62            factory: "TaskSpecFactory",
 63            source: "ConfigPartition",
 64            parent: Optional["PoeTask.TaskSpec"] = None,
 65        ):
 66            super().__init__(name, task_def, factory, source, parent)
 67
 68            self.subtasks = []
 69            for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]):
 70                if not isinstance(sub_task_def, (str, dict, list)):
 71                    raise ConfigValidationError(
 72                        f"Item #{index} in sequence task should be a value of "
 73                        "type: str | dict | list",
 74                        task_name=self.name,
 75                    )
 76
 77                subtask_name = (
 78                    sub_task_def
 79                    if (
 80                        isinstance(sub_task_def, str)
 81                        and (sub_task_def[0].isalpha() or sub_task_def[0] == "_")
 82                    )
 83                    else SequenceTask._subtask_name(name, index)
 84                )
 85                task_type_key = self.task_type.resolve_task_type(
 86                    sub_task_def,
 87                    factory.config,
 88                    array_item=task_def.get("default_item_type", True),
 89                )
 90
 91                try:
 92                    self.subtasks.append(
 93                        factory.get(
 94                            subtask_name, sub_task_def, task_type_key, parent=self
 95                        )
 96                    )
 97                except PoeException:
 98                    raise ConfigValidationError(
 99                        f"Failed to interpret subtask #{index} in sequence",
100                        task_name=self.name,
101                    )
content: list
task_type: Type[poethepoet.task.base.PoeTask] = <class 'SequenceTask'>
class ShellTask(poethepoet.task.base.PoeTask):
 20class ShellTask(PoeTask):
 21    """
 22    A task consisting of a reference to a shell command
 23    """
 24
 25    content: str
 26
 27    __key__ = "shell"
 28
 29    class TaskOptions(PoeTask.TaskOptions):
 30        interpreter: Optional[Union[str, list]] = None
 31
 32        def validate(self):
 33            super().validate()
 34
 35            from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS
 36
 37            if (
 38                isinstance(self.interpreter, str)
 39                and self.interpreter not in VALID_INTERPRETERS
 40            ):
 41                raise ConfigValidationError(
 42                    "Invalid value for option 'interpreter',\n"
 43                    f"Expected one of {VALID_INTERPRETERS}"
 44                )
 45
 46            if isinstance(self.interpreter, list):
 47                if len(self.interpreter) == 0:
 48                    raise ConfigValidationError(
 49                        "Invalid value for option 'interpreter',\n"
 50                        "Expected at least one item in list."
 51                    )
 52                for item in self.interpreter:
 53                    if item not in VALID_INTERPRETERS:
 54                        raise ConfigValidationError(
 55                            f"Invalid item {item!r} in option 'interpreter',\n"
 56                            f"Expected one of {VALID_INTERPRETERS!r}"
 57                        )
 58
 59    class TaskSpec(PoeTask.TaskSpec):
 60        content: str
 61        options: "ShellTask.TaskOptions"
 62
 63    spec: TaskSpec
 64
 65    def _handle_run(
 66        self,
 67        context: "RunContext",
 68        env: "EnvVarsManager",
 69    ) -> int:
 70        named_arg_values, extra_args = self.get_parsed_arguments(env)
 71        env.update(named_arg_values)
 72
 73        if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]):
 74            raise PoeException(
 75                f"Shell task {self.spec.name!r} does not accept arguments"
 76            )
 77
 78        interpreter_cmd = self.resolve_interpreter_cmd()
 79        if not interpreter_cmd:
 80            config_value = self._get_interpreter_config()
 81            message = (
 82                f"Couldn't locate interpreter executable for {config_value!r} to run "
 83                "shell task. "
 84            )
 85            if self._is_windows and set(config_value).issubset({"posix", "bash"}):
 86                message += "Installing Git Bash or using WSL should fix this."
 87            else:
 88                message += "Some dependencies may be missing from your system."
 89            raise PoeException(message)
 90
 91        content = _unindent_code(self.spec.content).rstrip()
 92
 93        self._print_action(content, context.dry)
 94
 95        return self._get_executor(context, env).execute(
 96            interpreter_cmd, input=content.encode()
 97        )
 98
 99    def _get_interpreter_config(self) -> Tuple[str, ...]:
100        result: Union[str, Tuple[str, ...]] = self.spec.options.get(
101            "interpreter", self.ctx.config.shell_interpreter
102        )
103        if isinstance(result, str):
104            return (result,)
105        return tuple(result)
106
107    def resolve_interpreter_cmd(self) -> Optional[List[str]]:
108        """
109        Return a formatted command for the first specified interpreter that can be
110        located.
111        """
112        for item in self._get_interpreter_config():
113            executable = self._locate_interpreter(item)
114            if executable is None:
115                continue
116
117            if item in ("pwsh", "powershell"):
118                return [executable, "-NoLogo", "-Command", "-"]
119
120            return [executable]
121
122        return None
123
124    def _locate_interpreter(self, interpreter: str) -> Optional[str]:
125        from shutil import which
126
127        result = None
128        prog_files = environ.get("PROGRAMFILES", "C:\\Program Files")
129
130        # Try use $SHELL from the environment as a hint
131        shell_var = environ.get("SHELL", "")
132        if shell_var.endswith(f"/{interpreter}") and which(shell_var) == shell_var:
133            result = shell_var
134
135        elif interpreter == "posix":
136            # look for any known posix shell
137            result = (
138                self._locate_interpreter("sh")
139                or self._locate_interpreter("bash")
140                or self._locate_interpreter("zsh")
141            )
142
143        elif interpreter == "sh":
144            result = which("sh") or which("/bin/sh")
145
146            # Specifically look for git sh on windows
147            if result is None and self._is_windows:
148                result = which(f"{prog_files}\\Git\\bin\\sh.exe")
149
150        elif interpreter == "bash":
151            if self._is_windows:
152                # Specifically look for git bash on windows as the preferred option
153                # Don't trust bash from the path because it might be a useless decoy
154                result = (
155                    which(f"{prog_files}\\Git\\bin\\bash.exe")
156                    or which("/bin/bash")
157                    or which("bash")
158                )
159            else:
160                result = which("bash") or which("/bin/bash")
161
162        elif interpreter == "zsh":
163            result = which("zsh") or which("/bin/zsh")
164
165        elif interpreter == "fish":
166            result = which("fish") or which("/bin/fish")
167
168        elif interpreter in ("pwsh", "powershell"):
169            # Look for the pwsh executable and verify the version matches
170            result = (
171                which("pwsh")
172                or which(f"{prog_files}\\PowerShell\\7\\pwsh.exe")
173                or which(f"{prog_files}\\PowerShell\\6\\pwsh.exe")
174            )
175
176            if result is None and interpreter == "powershell" and self._is_windows:
177                # Look for older versions of powershell
178                result = which("powershell") or which(
179                    environ.get("WINDIR", "C:\\Windows")
180                    + "\\System32\\WindowsPowerShell\\v1.0\\powershell.EXE"
181                )
182
183        elif interpreter == "python":
184            # Exactly which python executable to use is usually resolved by the executor
185            result = "python"
186
187        return result

A task consisting of a reference to a shell command

content: str
def resolve_interpreter_cmd(self) -> Optional[List[str]]:
107    def resolve_interpreter_cmd(self) -> Optional[List[str]]:
108        """
109        Return a formatted command for the first specified interpreter that can be
110        located.
111        """
112        for item in self._get_interpreter_config():
113            executable = self._locate_interpreter(item)
114            if executable is None:
115                continue
116
117            if item in ("pwsh", "powershell"):
118                return [executable, "-NoLogo", "-Command", "-"]
119
120            return [executable]
121
122        return None

Return a formatted command for the first specified interpreter that can be located.

class ShellTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
29    class TaskOptions(PoeTask.TaskOptions):
30        interpreter: Optional[Union[str, list]] = None
31
32        def validate(self):
33            super().validate()
34
35            from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS
36
37            if (
38                isinstance(self.interpreter, str)
39                and self.interpreter not in VALID_INTERPRETERS
40            ):
41                raise ConfigValidationError(
42                    "Invalid value for option 'interpreter',\n"
43                    f"Expected one of {VALID_INTERPRETERS}"
44                )
45
46            if isinstance(self.interpreter, list):
47                if len(self.interpreter) == 0:
48                    raise ConfigValidationError(
49                        "Invalid value for option 'interpreter',\n"
50                        "Expected at least one item in list."
51                    )
52                for item in self.interpreter:
53                    if item not in VALID_INTERPRETERS:
54                        raise ConfigValidationError(
55                            f"Invalid item {item!r} in option 'interpreter',\n"
56                            f"Expected one of {VALID_INTERPRETERS!r}"
57                        )

A special kind of config object that parses options ...

interpreter: Union[str, list, NoneType] = None
def validate(self):
32        def validate(self):
33            super().validate()
34
35            from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS
36
37            if (
38                isinstance(self.interpreter, str)
39                and self.interpreter not in VALID_INTERPRETERS
40            ):
41                raise ConfigValidationError(
42                    "Invalid value for option 'interpreter',\n"
43                    f"Expected one of {VALID_INTERPRETERS}"
44                )
45
46            if isinstance(self.interpreter, list):
47                if len(self.interpreter) == 0:
48                    raise ConfigValidationError(
49                        "Invalid value for option 'interpreter',\n"
50                        "Expected at least one item in list."
51                    )
52                for item in self.interpreter:
53                    if item not in VALID_INTERPRETERS:
54                        raise ConfigValidationError(
55                            f"Invalid item {item!r} in option 'interpreter',\n"
56                            f"Expected one of {VALID_INTERPRETERS!r}"
57                        )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class ShellTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
59    class TaskSpec(PoeTask.TaskSpec):
60        content: str
61        options: "ShellTask.TaskOptions"
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'ShellTask'>
class SwitchTask(poethepoet.task.base.PoeTask):
 29class SwitchTask(PoeTask):
 30    """
 31    A task that runs one of several `case` subtasks depending on the output of a
 32    `switch` subtask.
 33    """
 34
 35    __key__ = "switch"
 36    __content_type__: ClassVar[Type] = list
 37
 38    class TaskOptions(PoeTask.TaskOptions):
 39        control: Union[str, dict]
 40        default: Literal["pass", "fail"] = "fail"
 41
 42        @classmethod
 43        def normalize(
 44            cls,
 45            config: Any,
 46            strict: bool = True,
 47        ):
 48            """
 49            Perform validations that require access to to the raw config.
 50            """
 51            if strict and isinstance(config, dict):
 52                # Subtasks may not declare certain options
 53                for subtask_def in config.get("switch", tuple()):
 54                    for banned_option in SUBTASK_OPTIONS_BLOCKLIST:
 55                        if banned_option in subtask_def:
 56                            if "case" not in subtask_def:
 57                                raise ConfigValidationError(
 58                                    "Default case includes incompatible option "
 59                                    f"{banned_option!r}"
 60                                )
 61                            raise ConfigValidationError(
 62                                f"Case {subtask_def.get('case')!r} includes "
 63                                f"incompatible option {banned_option!r}"
 64                            )
 65
 66            return super().normalize(config, strict)
 67
 68    class TaskSpec(PoeTask.TaskSpec):
 69        control_task_spec: PoeTask.TaskSpec
 70        case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...]
 71        options: "SwitchTask.TaskOptions"
 72
 73        def __init__(
 74            self,
 75            name: str,
 76            task_def: Dict[str, Any],
 77            factory: "TaskSpecFactory",
 78            source: "ConfigPartition",
 79            parent: Optional["PoeTask.TaskSpec"] = None,
 80        ):
 81            super().__init__(name, task_def, factory, source, parent)
 82
 83            switch_args = task_def.get("args")
 84            control_task_def = task_def["control"]
 85
 86            if switch_args:
 87                if isinstance(control_task_def, str):
 88                    control_task_def = {
 89                        factory.config.default_task_type: control_task_def
 90                    }
 91                control_task_def = dict(control_task_def, args=switch_args)
 92
 93            self.control_task_spec = factory.get(
 94                task_name=f"{name}[__control__]", task_def=control_task_def, parent=self
 95            )
 96
 97            case_task_specs = []
 98            for switch_item in task_def["switch"]:
 99                case_task_def = dict(switch_item, args=switch_args)
100                case = case_task_def.pop("case", DEFAULT_CASE)
101                case_tuple = tuple(case) if isinstance(case, list) else (case,)
102                case_task_index = ",".join(str(value) for value in case_tuple)
103                case_task_specs.append(
104                    (
105                        case_tuple,
106                        factory.get(
107                            task_name=f"{name}[{case_task_index}]",
108                            task_def=case_task_def,
109                            parent=self,
110                        ),
111                    )
112                )
113
114            self.case_task_specs = tuple(case_task_specs)
115
116        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
117            from collections import defaultdict
118
119            allowed_control_task_types = ("expr", "cmd", "script")
120            if (
121                self.control_task_spec.task_type.__key__
122                not in allowed_control_task_types
123            ):
124                raise ConfigValidationError(
125                    f"Control task must have a type that is one of "
126                    f"{allowed_control_task_types!r}"
127                )
128
129            cases: MutableMapping[Any, int] = defaultdict(int)
130            for case_keys, case_task_spec in self.case_task_specs:
131                for case_key in case_keys:
132                    cases[case_key] += 1
133
134            # Ensure case keys don't overlap (and only one default case)
135            for case, count in cases.items():
136                if count > 1:
137                    if case is DEFAULT_CASE:
138                        raise ConfigValidationError(
139                            "Switch array includes more than one default case"
140                        )
141                    raise ConfigValidationError(
142                        f"Switch array includes more than one case for {case!r}"
143                    )
144
145            if self.options.default != "fail" and DEFAULT_CASE in cases:
146                raise ConfigValidationError(
147                    "switch tasks should not declare both a default case and the "
148                    "'default' option"
149                )
150
151            # Validate subtask specs
152            self.control_task_spec.validate(config, task_specs)
153            for _, case_task_spec in self.case_task_specs:
154                case_task_spec.validate(config, task_specs)
155
156    spec: TaskSpec
157    control_task: PoeTask
158    switch_tasks: Dict[str, PoeTask]
159
160    def __init__(
161        self,
162        spec: TaskSpec,
163        invocation: Tuple[str, ...],
164        ctx: TaskContext,
165        capture_stdout: bool = False,
166    ):
167        super().__init__(spec, invocation, ctx, capture_stdout)
168
169        control_task_name = f"{spec.name}[__control__]"
170        control_invocation: Tuple[str, ...] = (control_task_name,)
171        options = self.spec.options
172        if options.get("args"):
173            control_invocation = (*control_invocation, *invocation[1:])
174
175        self.control_task = self.spec.control_task_spec.create_task(
176            invocation=control_invocation,
177            ctx=TaskContext.from_task(self),
178            capture_stdout=True,
179        )
180
181        self.switch_tasks = {}
182        for case_keys, case_spec in spec.case_task_specs:
183            task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",)
184            if options.get("args"):
185                task_invocation = (*task_invocation, *invocation[1:])
186
187            case_task = case_spec.create_task(
188                invocation=task_invocation,
189                ctx=TaskContext.from_task(self),
190                capture_stdout=self.capture_stdout,
191            )
192            for case_key in case_keys:
193                self.switch_tasks[case_key] = case_task
194
195    def _handle_run(
196        self,
197        context: "RunContext",
198        env: "EnvVarsManager",
199    ) -> int:
200        named_arg_values, extra_args = self.get_parsed_arguments(env)
201        env.update(named_arg_values)
202
203        if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]):
204            raise PoeException(f"Switch task {self.name!r} does not accept arguments")
205
206        # Indicate on the global context that there are multiple stages to this task
207        context.multistage = True
208
209        task_result = self.control_task.run(context=context, parent_env=env)
210        if task_result:
211            raise ExecutionError(
212                f"Switch task {self.name!r} aborted after failed control task"
213            )
214
215        if context.dry:
216            self._print_action(
217                "unresolved case for switch task", dry=True, unresolved=True
218            )
219            return 0
220
221        control_task_output = context.get_task_output(self.control_task.invocation)
222        case_task = self.switch_tasks.get(
223            control_task_output, self.switch_tasks.get(DEFAULT_CASE)
224        )
225
226        if case_task is None:
227            if self.spec.options.default == "pass":
228                return 0
229            raise ExecutionError(
230                f"Control value {control_task_output!r} did not match any cases in "
231                f"switch task {self.name!r}."
232            )
233
234        result = case_task.run(context=context, parent_env=env)
235
236        if self.capture_stdout is True:
237            # The executor saved output for the case task, but we need it to be
238            # registered for this switch task as well
239            context.save_task_output(
240                self.invocation, context.get_task_output(case_task.invocation).encode()
241            )
242
243        return result

A task that runs one of several case subtasks depending on the output of a switch subtask.

SwitchTask( spec: SwitchTask.TaskSpec, invocation: Tuple[str, ...], ctx: poethepoet.task.base.TaskContext, capture_stdout: bool = False)
160    def __init__(
161        self,
162        spec: TaskSpec,
163        invocation: Tuple[str, ...],
164        ctx: TaskContext,
165        capture_stdout: bool = False,
166    ):
167        super().__init__(spec, invocation, ctx, capture_stdout)
168
169        control_task_name = f"{spec.name}[__control__]"
170        control_invocation: Tuple[str, ...] = (control_task_name,)
171        options = self.spec.options
172        if options.get("args"):
173            control_invocation = (*control_invocation, *invocation[1:])
174
175        self.control_task = self.spec.control_task_spec.create_task(
176            invocation=control_invocation,
177            ctx=TaskContext.from_task(self),
178            capture_stdout=True,
179        )
180
181        self.switch_tasks = {}
182        for case_keys, case_spec in spec.case_task_specs:
183            task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",)
184            if options.get("args"):
185                task_invocation = (*task_invocation, *invocation[1:])
186
187            case_task = case_spec.create_task(
188                invocation=task_invocation,
189                ctx=TaskContext.from_task(self),
190                capture_stdout=self.capture_stdout,
191            )
192            for case_key in case_keys:
193                self.switch_tasks[case_key] = case_task
switch_tasks: Dict[str, poethepoet.task.base.PoeTask]
class SwitchTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
38    class TaskOptions(PoeTask.TaskOptions):
39        control: Union[str, dict]
40        default: Literal["pass", "fail"] = "fail"
41
42        @classmethod
43        def normalize(
44            cls,
45            config: Any,
46            strict: bool = True,
47        ):
48            """
49            Perform validations that require access to to the raw config.
50            """
51            if strict and isinstance(config, dict):
52                # Subtasks may not declare certain options
53                for subtask_def in config.get("switch", tuple()):
54                    for banned_option in SUBTASK_OPTIONS_BLOCKLIST:
55                        if banned_option in subtask_def:
56                            if "case" not in subtask_def:
57                                raise ConfigValidationError(
58                                    "Default case includes incompatible option "
59                                    f"{banned_option!r}"
60                                )
61                            raise ConfigValidationError(
62                                f"Case {subtask_def.get('case')!r} includes "
63                                f"incompatible option {banned_option!r}"
64                            )
65
66            return super().normalize(config, strict)

A special kind of config object that parses options ...

control: Union[str, dict]
default: Literal['pass', 'fail'] = 'fail'
@classmethod
def normalize(cls, config: Any, strict: bool = True):
42        @classmethod
43        def normalize(
44            cls,
45            config: Any,
46            strict: bool = True,
47        ):
48            """
49            Perform validations that require access to to the raw config.
50            """
51            if strict and isinstance(config, dict):
52                # Subtasks may not declare certain options
53                for subtask_def in config.get("switch", tuple()):
54                    for banned_option in SUBTASK_OPTIONS_BLOCKLIST:
55                        if banned_option in subtask_def:
56                            if "case" not in subtask_def:
57                                raise ConfigValidationError(
58                                    "Default case includes incompatible option "
59                                    f"{banned_option!r}"
60                                )
61                            raise ConfigValidationError(
62                                f"Case {subtask_def.get('case')!r} includes "
63                                f"incompatible option {banned_option!r}"
64                            )
65
66            return super().normalize(config, strict)

Perform validations that require access to to the raw config.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
validate
class SwitchTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
 68    class TaskSpec(PoeTask.TaskSpec):
 69        control_task_spec: PoeTask.TaskSpec
 70        case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...]
 71        options: "SwitchTask.TaskOptions"
 72
 73        def __init__(
 74            self,
 75            name: str,
 76            task_def: Dict[str, Any],
 77            factory: "TaskSpecFactory",
 78            source: "ConfigPartition",
 79            parent: Optional["PoeTask.TaskSpec"] = None,
 80        ):
 81            super().__init__(name, task_def, factory, source, parent)
 82
 83            switch_args = task_def.get("args")
 84            control_task_def = task_def["control"]
 85
 86            if switch_args:
 87                if isinstance(control_task_def, str):
 88                    control_task_def = {
 89                        factory.config.default_task_type: control_task_def
 90                    }
 91                control_task_def = dict(control_task_def, args=switch_args)
 92
 93            self.control_task_spec = factory.get(
 94                task_name=f"{name}[__control__]", task_def=control_task_def, parent=self
 95            )
 96
 97            case_task_specs = []
 98            for switch_item in task_def["switch"]:
 99                case_task_def = dict(switch_item, args=switch_args)
100                case = case_task_def.pop("case", DEFAULT_CASE)
101                case_tuple = tuple(case) if isinstance(case, list) else (case,)
102                case_task_index = ",".join(str(value) for value in case_tuple)
103                case_task_specs.append(
104                    (
105                        case_tuple,
106                        factory.get(
107                            task_name=f"{name}[{case_task_index}]",
108                            task_def=case_task_def,
109                            parent=self,
110                        ),
111                    )
112                )
113
114            self.case_task_specs = tuple(case_task_specs)
115
116        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
117            from collections import defaultdict
118
119            allowed_control_task_types = ("expr", "cmd", "script")
120            if (
121                self.control_task_spec.task_type.__key__
122                not in allowed_control_task_types
123            ):
124                raise ConfigValidationError(
125                    f"Control task must have a type that is one of "
126                    f"{allowed_control_task_types!r}"
127                )
128
129            cases: MutableMapping[Any, int] = defaultdict(int)
130            for case_keys, case_task_spec in self.case_task_specs:
131                for case_key in case_keys:
132                    cases[case_key] += 1
133
134            # Ensure case keys don't overlap (and only one default case)
135            for case, count in cases.items():
136                if count > 1:
137                    if case is DEFAULT_CASE:
138                        raise ConfigValidationError(
139                            "Switch array includes more than one default case"
140                        )
141                    raise ConfigValidationError(
142                        f"Switch array includes more than one case for {case!r}"
143                    )
144
145            if self.options.default != "fail" and DEFAULT_CASE in cases:
146                raise ConfigValidationError(
147                    "switch tasks should not declare both a default case and the "
148                    "'default' option"
149                )
150
151            # Validate subtask specs
152            self.control_task_spec.validate(config, task_specs)
153            for _, case_task_spec in self.case_task_specs:
154                case_task_spec.validate(config, task_specs)
SwitchTask.TaskSpec( name: str, task_def: Dict[str, Any], factory: poethepoet.task.base.TaskSpecFactory, source: poethepoet.config.ConfigPartition, parent: Optional[poethepoet.task.base.PoeTask.TaskSpec] = None)
 73        def __init__(
 74            self,
 75            name: str,
 76            task_def: Dict[str, Any],
 77            factory: "TaskSpecFactory",
 78            source: "ConfigPartition",
 79            parent: Optional["PoeTask.TaskSpec"] = None,
 80        ):
 81            super().__init__(name, task_def, factory, source, parent)
 82
 83            switch_args = task_def.get("args")
 84            control_task_def = task_def["control"]
 85
 86            if switch_args:
 87                if isinstance(control_task_def, str):
 88                    control_task_def = {
 89                        factory.config.default_task_type: control_task_def
 90                    }
 91                control_task_def = dict(control_task_def, args=switch_args)
 92
 93            self.control_task_spec = factory.get(
 94                task_name=f"{name}[__control__]", task_def=control_task_def, parent=self
 95            )
 96
 97            case_task_specs = []
 98            for switch_item in task_def["switch"]:
 99                case_task_def = dict(switch_item, args=switch_args)
100                case = case_task_def.pop("case", DEFAULT_CASE)
101                case_tuple = tuple(case) if isinstance(case, list) else (case,)
102                case_task_index = ",".join(str(value) for value in case_tuple)
103                case_task_specs.append(
104                    (
105                        case_tuple,
106                        factory.get(
107                            task_name=f"{name}[{case_task_index}]",
108                            task_def=case_task_def,
109                            parent=self,
110                        ),
111                    )
112                )
113
114            self.case_task_specs = tuple(case_task_specs)
case_task_specs: Tuple[Tuple[Tuple[Any, ...], poethepoet.task.base.PoeTask.TaskSpec], ...]
task_type: Type[poethepoet.task.base.PoeTask] = <class 'SwitchTask'>