poethepoet.task
1from .cmd import CmdTask 2from .expr import ExprTask 3from .ref import RefTask 4from .script import ScriptTask 5from .sequence import SequenceTask 6from .shell import ShellTask 7from .switch import SwitchTask 8 9__all__ = [ 10 "CmdTask", 11 "ExprTask", 12 "RefTask", 13 "ScriptTask", 14 "SequenceTask", 15 "ShellTask", 16 "SwitchTask", 17]
15class CmdTask(PoeTask): 16 """ 17 A task consisting of a reference to a shell command 18 """ 19 20 __key__ = "cmd" 21 22 class TaskOptions(PoeTask.TaskOptions): 23 use_exec: bool = False 24 25 def validate(self): 26 """ 27 Validation rules that don't require any extra context go here. 28 """ 29 super().validate() 30 if self.use_exec and self.capture_stdout: 31 raise ConfigValidationError( 32 "'use_exec' and 'capture_stdout'" 33 " options cannot be both provided on the same task." 34 ) 35 36 class TaskSpec(PoeTask.TaskSpec): 37 content: str 38 options: "CmdTask.TaskOptions" 39 40 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 41 """ 42 Perform validations on this TaskSpec that apply to a specific task type 43 """ 44 if not self.content.strip(): 45 raise ConfigValidationError("Task has no content") 46 47 spec: TaskSpec 48 49 def _handle_run( 50 self, 51 context: "RunContext", 52 env: "EnvVarsManager", 53 ) -> int: 54 named_arg_values, extra_args = self.get_parsed_arguments(env) 55 env.update(named_arg_values) 56 57 cmd = (*self._resolve_commandline(context, env), *extra_args) 58 59 self._print_action(shlex.join(cmd), context.dry) 60 61 return self._get_executor(context, env).execute( 62 cmd, use_exec=self.spec.options.get("use_exec", False) 63 ) 64 65 def _resolve_commandline(self, context: "RunContext", env: "EnvVarsManager"): 66 from ..helpers.command import parse_poe_cmd, resolve_command_tokens 67 from ..helpers.command.ast_core import ParseError 68 69 try: 70 command_lines = parse_poe_cmd(self.spec.content).command_lines 71 except ParseError as error: 72 raise PoeException( 73 f"Couldn't parse command line for task {self.name!r}: {error.args[0]}" 74 ) from error 75 76 if not command_lines: 77 raise PoeException( 78 f"Invalid cmd task {self.name!r} does not include any command lines" 79 ) 80 if any(line._terminator == ";" for line in command_lines[:-1]): 81 # lines terminated by a line break or comment are implicitly joined 82 raise PoeException( 83 f"Invalid cmd task {self.name!r} includes multiple command lines" 84 ) 85 86 working_dir = self.get_working_dir(env) 87 88 result = [] 89 for cmd_token, has_glob in resolve_command_tokens(command_lines, env): 90 if has_glob: 91 # Resolve glob pattern from the working directory 92 result.extend([str(match) for match in working_dir.glob(cmd_token)]) 93 else: 94 result.append(cmd_token) 95 96 return result
A task consisting of a reference to a shell command
22 class TaskOptions(PoeTask.TaskOptions): 23 use_exec: bool = False 24 25 def validate(self): 26 """ 27 Validation rules that don't require any extra context go here. 28 """ 29 super().validate() 30 if self.use_exec and self.capture_stdout: 31 raise ConfigValidationError( 32 "'use_exec' and 'capture_stdout'" 33 " options cannot be both provided on the same task." 34 )
A special kind of config object that parses options ...
25 def validate(self): 26 """ 27 Validation rules that don't require any extra context go here. 28 """ 29 super().validate() 30 if self.use_exec and self.capture_stdout: 31 raise ConfigValidationError( 32 "'use_exec' and 'capture_stdout'" 33 " options cannot be both provided on the same task." 34 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
36 class TaskSpec(PoeTask.TaskSpec): 37 content: str 38 options: "CmdTask.TaskOptions" 39 40 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 41 """ 42 Perform validations on this TaskSpec that apply to a specific task type 43 """ 44 if not self.content.strip(): 45 raise ConfigValidationError("Task has no content")
Inherited Members
25class ExprTask(PoeTask): 26 """ 27 A task consisting of a python expression 28 """ 29 30 content: str 31 32 __key__ = "expr" 33 34 class TaskOptions(PoeTask.TaskOptions): 35 imports: Sequence[str] = tuple() 36 assert_: Union[bool, int] = False 37 use_exec: bool = False 38 39 def validate(self): 40 super().validate() 41 if self.use_exec and self.capture_stdout: 42 raise ConfigValidationError( 43 "'use_exec' and 'capture_stdout'" 44 " options cannot be both provided on the same task." 45 ) 46 47 class TaskSpec(PoeTask.TaskSpec): 48 content: str 49 options: "ExprTask.TaskOptions" 50 51 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 52 """ 53 Perform validations on this TaskSpec that apply to a specific task type 54 """ 55 try: 56 # ruff: noqa: E501 57 self.task_type._substitute_env_vars(self.content.strip(), {}) # type: ignore[attr-defined] 58 except (ValueError, ExpressionParseError) as error: 59 raise ConfigValidationError(f"Invalid expression: {error}") 60 61 spec: TaskSpec 62 63 def _handle_run( 64 self, 65 context: "RunContext", 66 env: "EnvVarsManager", 67 ) -> int: 68 from ..helpers.python import format_class 69 70 named_arg_values, extra_args = self.get_parsed_arguments(env) 71 env.update(named_arg_values) 72 73 imports = self.spec.options.imports 74 75 expr, env_values = self.parse_content(named_arg_values, env, imports) 76 argv = [ 77 self.spec.name, 78 *(env.fill_template(token) for token in self.invocation[1:]), 79 ] 80 81 script = [ 82 f"import sys;" f"sys.path.append('src');" f"sys.argv = {argv!r};", 83 (f"import {', '.join(imports)}; " if imports else ""), 84 f"{format_class(named_arg_values)}", 85 f"{format_class(env_values, classname='__env')}", 86 f"result = ({expr});", 87 "print(result);", 88 ] 89 90 falsy_return_code = int(self.spec.options.get("assert")) 91 if falsy_return_code: 92 script.append(f"exit(0 if result else {falsy_return_code});") 93 94 # Exactly which python executable to use is usually resolved by the executor 95 # It's important that the script contains no line breaks to avoid issues on 96 # windows 97 cmd = ("python", "-c", "".join(script)) 98 99 self._print_action(self.spec.content.strip(), context.dry) 100 return self._get_executor(context, env).execute( 101 cmd, use_exec=self.spec.options.use_exec 102 ) 103 104 def parse_content( 105 self, 106 args: Optional[Dict[str, Any]], 107 env: "EnvVarsManager", 108 imports=Iterable[str], 109 ) -> Tuple[str, Dict[str, str]]: 110 """ 111 Returns the expression to evaluate and the subset of env vars that it references 112 113 Templated referenced to env vars are resolve before parsing the expression. 114 115 Will raise an exception if the content contains invalid syntax or references 116 python variables that are not in scope. 117 """ 118 119 from ..helpers.python import resolve_expression 120 121 expression, accessed_vars = self._substitute_env_vars( 122 self.spec.content.strip(), env.to_dict() 123 ) 124 125 expression = resolve_expression( 126 source=expression, 127 arguments=set(args or tuple()), 128 call_only=False, 129 allowed_vars={"sys", "__env", *imports}, 130 ) 131 # Strip out any new lines because they can be problematic on windows 132 expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression) 133 expression = re.sub(r"(\r\n|\r|\n)", " ", expression) 134 135 return expression, accessed_vars 136 137 @classmethod 138 def _substitute_env_vars(cls, content: str, env: Mapping[str, str]): 139 """ 140 Substitute ${template} references to env vars with a reference to a python class 141 attribute like __env.var, and collect the accessed env vars so we can construct 142 that class with the required attributes later. 143 """ 144 145 from ..env.template import SpyDict, apply_envvars_to_template 146 147 # Spy on access to the env, so that instead of replacing template ${keys} with 148 # the corresponding value, replace them with a python name and keep track of 149 # referenced env vars. 150 accessed_vars: Dict[str, str] = {} 151 152 def getitem_spy(obj: SpyDict, key: str, value: str): 153 accessed_vars[key] = value 154 return f"__env.{key}" 155 156 expression = apply_envvars_to_template( 157 content=content, 158 env=SpyDict(env, getitem_spy=getitem_spy), 159 require_braces=True, 160 ) 161 162 return expression, accessed_vars
A task consisting of a python expression
104 def parse_content( 105 self, 106 args: Optional[Dict[str, Any]], 107 env: "EnvVarsManager", 108 imports=Iterable[str], 109 ) -> Tuple[str, Dict[str, str]]: 110 """ 111 Returns the expression to evaluate and the subset of env vars that it references 112 113 Templated referenced to env vars are resolve before parsing the expression. 114 115 Will raise an exception if the content contains invalid syntax or references 116 python variables that are not in scope. 117 """ 118 119 from ..helpers.python import resolve_expression 120 121 expression, accessed_vars = self._substitute_env_vars( 122 self.spec.content.strip(), env.to_dict() 123 ) 124 125 expression = resolve_expression( 126 source=expression, 127 arguments=set(args or tuple()), 128 call_only=False, 129 allowed_vars={"sys", "__env", *imports}, 130 ) 131 # Strip out any new lines because they can be problematic on windows 132 expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression) 133 expression = re.sub(r"(\r\n|\r|\n)", " ", expression) 134 135 return expression, accessed_vars
Returns the expression to evaluate and the subset of env vars that it references
Templated referenced to env vars are resolve before parsing the expression.
Will raise an exception if the content contains invalid syntax or references python variables that are not in scope.
34 class TaskOptions(PoeTask.TaskOptions): 35 imports: Sequence[str] = tuple() 36 assert_: Union[bool, int] = False 37 use_exec: bool = False 38 39 def validate(self): 40 super().validate() 41 if self.use_exec and self.capture_stdout: 42 raise ConfigValidationError( 43 "'use_exec' and 'capture_stdout'" 44 " options cannot be both provided on the same task." 45 )
A special kind of config object that parses options ...
39 def validate(self): 40 super().validate() 41 if self.use_exec and self.capture_stdout: 42 raise ConfigValidationError( 43 "'use_exec' and 'capture_stdout'" 44 " options cannot be both provided on the same task." 45 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
47 class TaskSpec(PoeTask.TaskSpec): 48 content: str 49 options: "ExprTask.TaskOptions" 50 51 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 52 """ 53 Perform validations on this TaskSpec that apply to a specific task type 54 """ 55 try: 56 # ruff: noqa: E501 57 self.task_type._substitute_env_vars(self.content.strip(), {}) # type: ignore[attr-defined] 58 except (ValueError, ExpressionParseError) as error: 59 raise ConfigValidationError(f"Invalid expression: {error}")
Inherited Members
14class RefTask(PoeTask): 15 """ 16 A task consisting of a reference to another task 17 """ 18 19 __key__ = "ref" 20 21 class TaskOptions(PoeTask.TaskOptions): 22 def validate(self): 23 """ 24 Validation rules that don't require any extra context go here. 25 """ 26 if self.executor: 27 raise ConfigValidationError( 28 "Option 'executor' cannot be set on a ref task" 29 ) 30 if self.capture_stdout: 31 raise ConfigValidationError( 32 "Option 'capture_stdout' cannot be set on a ref task" 33 ) 34 35 class TaskSpec(PoeTask.TaskSpec): 36 content: str 37 options: "RefTask.TaskOptions" 38 39 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 40 """ 41 Perform validations on this TaskSpec that apply to a specific task type 42 """ 43 44 import shlex 45 46 task_name_ref = shlex.split(self.content)[0] 47 48 if task_name_ref not in task_specs: 49 raise ConfigValidationError( 50 f"Includes reference to unknown task {task_name_ref!r}" 51 ) 52 53 if task_specs.get(task_name_ref).options.get("use_exec", False): 54 raise ConfigValidationError( 55 f"Illegal reference to task with " 56 f"'use_exec' set to true: {task_name_ref!r}" 57 ) 58 59 spec: TaskSpec 60 61 def _handle_run( 62 self, 63 context: "RunContext", 64 env: "EnvVarsManager", 65 ) -> int: 66 """ 67 Lookup and delegate to the referenced task 68 """ 69 import shlex 70 71 named_arg_values, extra_args = self.get_parsed_arguments(env) 72 env.update(named_arg_values) 73 74 ref_invocation = ( 75 *( 76 env.fill_template(token) 77 for token in shlex.split(env.fill_template(self.spec.content.strip())) 78 ), 79 *extra_args, 80 ) 81 82 task = self.ctx.specs.get(ref_invocation[0]).create_task( 83 invocation=ref_invocation, ctx=TaskContext.from_task(self) 84 ) 85 86 if task.has_deps(): 87 return self._run_task_graph(task, context, env) 88 89 return task.run(context=context, parent_env=env) 90 91 def _run_task_graph( 92 self, 93 task: "PoeTask", 94 context: "RunContext", 95 env: "EnvVarsManager", 96 ) -> int: 97 from ..exceptions import ExecutionError 98 from .graph import TaskExecutionGraph 99 100 graph = TaskExecutionGraph(task, context) 101 plan = graph.get_execution_plan() 102 for stage in plan: 103 for stage_task in stage: 104 if stage_task == task: 105 # The final sink task gets special treatment 106 return task.run(context=context, parent_env=env) 107 108 task_result = stage_task.run(context=context) 109 if task_result: 110 raise ExecutionError( 111 f"Task graph aborted after failed task {stage_task.name!r}" 112 ) 113 return 0
A task consisting of a reference to another task
21 class TaskOptions(PoeTask.TaskOptions): 22 def validate(self): 23 """ 24 Validation rules that don't require any extra context go here. 25 """ 26 if self.executor: 27 raise ConfigValidationError( 28 "Option 'executor' cannot be set on a ref task" 29 ) 30 if self.capture_stdout: 31 raise ConfigValidationError( 32 "Option 'capture_stdout' cannot be set on a ref task" 33 )
A special kind of config object that parses options ...
22 def validate(self): 23 """ 24 Validation rules that don't require any extra context go here. 25 """ 26 if self.executor: 27 raise ConfigValidationError( 28 "Option 'executor' cannot be set on a ref task" 29 ) 30 if self.capture_stdout: 31 raise ConfigValidationError( 32 "Option 'capture_stdout' cannot be set on a ref task" 33 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
35 class TaskSpec(PoeTask.TaskSpec): 36 content: str 37 options: "RefTask.TaskOptions" 38 39 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 40 """ 41 Perform validations on this TaskSpec that apply to a specific task type 42 """ 43 44 import shlex 45 46 task_name_ref = shlex.split(self.content)[0] 47 48 if task_name_ref not in task_specs: 49 raise ConfigValidationError( 50 f"Includes reference to unknown task {task_name_ref!r}" 51 ) 52 53 if task_specs.get(task_name_ref).options.get("use_exec", False): 54 raise ConfigValidationError( 55 f"Illegal reference to task with " 56 f"'use_exec' set to true: {task_name_ref!r}" 57 )
Inherited Members
16class ScriptTask(PoeTask): 17 """ 18 A task consisting of a reference to a python script 19 """ 20 21 content: str 22 23 __key__ = "script" 24 25 class TaskOptions(PoeTask.TaskOptions): 26 use_exec: bool = False 27 print_result: bool = False 28 29 def validate(self): 30 super().validate() 31 if self.use_exec and self.capture_stdout: 32 raise ConfigValidationError( 33 "'use_exec' and 'capture_stdout'" 34 " options cannot be both provided on the same task." 35 ) 36 37 class TaskSpec(PoeTask.TaskSpec): 38 content: str 39 options: "ScriptTask.TaskOptions" 40 41 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 42 """ 43 Perform validations on this TaskSpec that apply to a specific task type 44 """ 45 from ..helpers.python import parse_and_validate 46 47 try: 48 target_module, target_ref = self.content.split(":", 1) 49 if not target_ref.isidentifier(): 50 parse_and_validate(target_ref, call_only=True) 51 except (ValueError, ExpressionParseError): 52 raise ConfigValidationError( 53 f"Invalid callable reference {self.content!r}\n" 54 "(expected something like `module:callable` or `module:callable()`)" 55 ) 56 57 spec: TaskSpec 58 59 def _handle_run( 60 self, 61 context: "RunContext", 62 env: "EnvVarsManager", 63 ) -> int: 64 from ..helpers.python import format_class 65 66 named_arg_values, extra_args = self.get_parsed_arguments(env) 67 env.update(named_arg_values) 68 69 # TODO: do something about extra_args, error? 70 71 target_module, function_call = self.parse_content(named_arg_values) 72 function_ref = function_call[: function_call.index("(")] 73 74 argv = [ 75 self.name, 76 *(env.fill_template(token) for token in self.invocation[1:]), 77 ] 78 79 # TODO: check whether the project really does use src layout, and don't do 80 # sys.path.append('src') if it doesn't 81 82 script = [ 83 "import asyncio,os,sys;", 84 "from inspect import iscoroutinefunction as _c;", 85 "from os import environ;", 86 "from importlib import import_module as _i;", 87 f"sys.argv = {argv!r}; sys.path.append('src');", 88 f"{format_class(named_arg_values)}", 89 f"_m = _i('{target_module}');", 90 f"_r = asyncio.run(_m.{function_call}) if _c(_m.{function_ref})", 91 f" else _m.{function_call};", 92 ] 93 94 if self.spec.options.get("print_result"): 95 script.append("_r is not None and print(_r);") 96 97 # Exactly which python executable to use is usually resolved by the executor 98 # It's important that the script contains no line breaks to avoid issues on 99 # windows 100 cmd = ("python", "-c", "".join(script)) 101 102 self._print_action(shlex.join(argv), context.dry) 103 return self._get_executor(context, env).execute( 104 cmd, use_exec=self.spec.options.get("use_exec", False) 105 ) 106 107 def parse_content(self, args: Optional[Dict[str, Any]]) -> Tuple[str, str]: 108 """ 109 Returns the module to load, and the function call to execute. 110 111 Will raise an exception if the function call contains invalid syntax or 112 references variables that are not in scope. 113 """ 114 115 from ..helpers.python import resolve_expression 116 117 try: 118 target_module, target_ref = self.spec.content.strip().split(":", 1) 119 except ValueError: 120 raise ExpressionParseError( 121 f"Invalid task content: {self.spec.content.strip()!r}" 122 ) 123 124 if target_ref.isidentifier(): 125 if args: 126 return target_module, f"{target_ref}(**({args}))" 127 return target_module, f"{target_ref}()" 128 129 function_call = resolve_expression( 130 target_ref, 131 set(args or tuple()), 132 call_only=True, 133 allowed_vars={"sys", "os", "environ"}, 134 ) 135 # Strip out any new lines because they can be problematic on windows 136 function_call = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", function_call) 137 function_call = re.sub(r"(\r\n|\r|\n)", " ", function_call) 138 139 return target_module, function_call
A task consisting of a reference to a python script
107 def parse_content(self, args: Optional[Dict[str, Any]]) -> Tuple[str, str]: 108 """ 109 Returns the module to load, and the function call to execute. 110 111 Will raise an exception if the function call contains invalid syntax or 112 references variables that are not in scope. 113 """ 114 115 from ..helpers.python import resolve_expression 116 117 try: 118 target_module, target_ref = self.spec.content.strip().split(":", 1) 119 except ValueError: 120 raise ExpressionParseError( 121 f"Invalid task content: {self.spec.content.strip()!r}" 122 ) 123 124 if target_ref.isidentifier(): 125 if args: 126 return target_module, f"{target_ref}(**({args}))" 127 return target_module, f"{target_ref}()" 128 129 function_call = resolve_expression( 130 target_ref, 131 set(args or tuple()), 132 call_only=True, 133 allowed_vars={"sys", "os", "environ"}, 134 ) 135 # Strip out any new lines because they can be problematic on windows 136 function_call = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", function_call) 137 function_call = re.sub(r"(\r\n|\r|\n)", " ", function_call) 138 139 return target_module, function_call
Returns the module to load, and the function call to execute.
Will raise an exception if the function call contains invalid syntax or references variables that are not in scope.
25 class TaskOptions(PoeTask.TaskOptions): 26 use_exec: bool = False 27 print_result: bool = False 28 29 def validate(self): 30 super().validate() 31 if self.use_exec and self.capture_stdout: 32 raise ConfigValidationError( 33 "'use_exec' and 'capture_stdout'" 34 " options cannot be both provided on the same task." 35 )
A special kind of config object that parses options ...
29 def validate(self): 30 super().validate() 31 if self.use_exec and self.capture_stdout: 32 raise ConfigValidationError( 33 "'use_exec' and 'capture_stdout'" 34 " options cannot be both provided on the same task." 35 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
37 class TaskSpec(PoeTask.TaskSpec): 38 content: str 39 options: "ScriptTask.TaskOptions" 40 41 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 42 """ 43 Perform validations on this TaskSpec that apply to a specific task type 44 """ 45 from ..helpers.python import parse_and_validate 46 47 try: 48 target_module, target_ref = self.content.split(":", 1) 49 if not target_ref.isidentifier(): 50 parse_and_validate(target_ref, call_only=True) 51 except (ValueError, ExpressionParseError): 52 raise ConfigValidationError( 53 f"Invalid callable reference {self.content!r}\n" 54 "(expected something like `module:callable` or `module:callable()`)" 55 )
Inherited Members
26class SequenceTask(PoeTask): 27 """ 28 A task consisting of a sequence of other tasks 29 """ 30 31 content: List[Union[str, Dict[str, Any]]] 32 33 __key__ = "sequence" 34 __content_type__: ClassVar[Type] = list 35 36 class TaskOptions(PoeTask.TaskOptions): 37 ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False 38 default_item_type: Optional[str] = None 39 40 def validate(self): 41 """ 42 Validation rules that don't require any extra context go here. 43 """ 44 super().validate() 45 if self.default_item_type is not None and not PoeTask.is_task_type( 46 self.default_item_type, content_type=str 47 ): 48 raise ConfigValidationError( 49 "Unsupported value for option `default_item_type`,\n" 50 f"Expected one of {PoeTask.get_task_types(content_type=str)}" 51 ) 52 53 class TaskSpec(PoeTask.TaskSpec): 54 content: list 55 options: "SequenceTask.TaskOptions" 56 subtasks: Sequence[PoeTask.TaskSpec] 57 58 def __init__( 59 self, 60 name: str, 61 task_def: Dict[str, Any], 62 factory: "TaskSpecFactory", 63 source: "ConfigPartition", 64 parent: Optional["PoeTask.TaskSpec"] = None, 65 ): 66 super().__init__(name, task_def, factory, source, parent) 67 68 self.subtasks = [] 69 for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]): 70 if not isinstance(sub_task_def, (str, dict, list)): 71 raise ConfigValidationError( 72 f"Item #{index} in sequence task should be a value of " 73 "type: str | dict | list", 74 task_name=self.name, 75 ) 76 77 subtask_name = ( 78 sub_task_def 79 if ( 80 isinstance(sub_task_def, str) 81 and (sub_task_def[0].isalpha() or sub_task_def[0] == "_") 82 ) 83 else SequenceTask._subtask_name(name, index) 84 ) 85 task_type_key = self.task_type.resolve_task_type( 86 sub_task_def, 87 factory.config, 88 array_item=task_def.get("default_item_type", True), 89 ) 90 91 try: 92 self.subtasks.append( 93 factory.get( 94 subtask_name, sub_task_def, task_type_key, parent=self 95 ) 96 ) 97 except PoeException: 98 raise ConfigValidationError( 99 f"Failed to interpret subtask #{index} in sequence", 100 task_name=self.name, 101 ) 102 103 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 104 """ 105 Perform validations on this TaskSpec that apply to a specific task type 106 """ 107 for index, subtask in enumerate(self.subtasks): 108 if subtask.args: 109 raise ConfigValidationError( 110 "Unsupported option 'args' for task declared inside sequence" 111 ) 112 113 subtask.validate(config, task_specs) 114 115 spec: TaskSpec 116 117 def __init__( 118 self, 119 spec: TaskSpec, 120 invocation: Tuple[str, ...], 121 ctx: TaskContext, 122 capture_stdout: bool = False, 123 ): 124 assert capture_stdout in (False, None) 125 super().__init__(spec, invocation, ctx) 126 self.subtasks = [ 127 task_spec.create_task( 128 invocation=(self._subtask_name(task_spec.name, index),), 129 ctx=TaskContext.from_task(self), 130 ) 131 for index, task_spec in enumerate(spec.subtasks) 132 ] 133 134 def _handle_run( 135 self, 136 context: "RunContext", 137 env: "EnvVarsManager", 138 ) -> int: 139 named_arg_values, extra_args = self.get_parsed_arguments(env) 140 env.update(named_arg_values) 141 142 if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]): 143 raise PoeException(f"Sequence task {self.name!r} does not accept arguments") 144 145 if len(self.subtasks) > 1: 146 # Indicate on the global context that there are multiple stages 147 context.multistage = True 148 149 ignore_fail = self.spec.options.ignore_fail 150 non_zero_subtasks: List[str] = list() 151 for subtask in self.subtasks: 152 task_result = subtask.run(context=context, parent_env=env) 153 if task_result and not ignore_fail: 154 raise ExecutionError( 155 f"Sequence aborted after failed subtask {subtask.name!r}" 156 ) 157 if task_result: 158 non_zero_subtasks.append(subtask.name) 159 160 if non_zero_subtasks and ignore_fail == "return_non_zero": 161 raise ExecutionError( 162 f"Subtasks {', '.join(non_zero_subtasks)} returned non-zero exit status" 163 ) 164 return 0 165 166 @classmethod 167 def _subtask_name(cls, task_name: str, index: int): 168 return f"{task_name}[{index}]"
A task consisting of a sequence of other tasks
117 def __init__( 118 self, 119 spec: TaskSpec, 120 invocation: Tuple[str, ...], 121 ctx: TaskContext, 122 capture_stdout: bool = False, 123 ): 124 assert capture_stdout in (False, None) 125 super().__init__(spec, invocation, ctx) 126 self.subtasks = [ 127 task_spec.create_task( 128 invocation=(self._subtask_name(task_spec.name, index),), 129 ctx=TaskContext.from_task(self), 130 ) 131 for index, task_spec in enumerate(spec.subtasks) 132 ]
36 class TaskOptions(PoeTask.TaskOptions): 37 ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False 38 default_item_type: Optional[str] = None 39 40 def validate(self): 41 """ 42 Validation rules that don't require any extra context go here. 43 """ 44 super().validate() 45 if self.default_item_type is not None and not PoeTask.is_task_type( 46 self.default_item_type, content_type=str 47 ): 48 raise ConfigValidationError( 49 "Unsupported value for option `default_item_type`,\n" 50 f"Expected one of {PoeTask.get_task_types(content_type=str)}" 51 )
A special kind of config object that parses options ...
40 def validate(self): 41 """ 42 Validation rules that don't require any extra context go here. 43 """ 44 super().validate() 45 if self.default_item_type is not None and not PoeTask.is_task_type( 46 self.default_item_type, content_type=str 47 ): 48 raise ConfigValidationError( 49 "Unsupported value for option `default_item_type`,\n" 50 f"Expected one of {PoeTask.get_task_types(content_type=str)}" 51 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
53 class TaskSpec(PoeTask.TaskSpec): 54 content: list 55 options: "SequenceTask.TaskOptions" 56 subtasks: Sequence[PoeTask.TaskSpec] 57 58 def __init__( 59 self, 60 name: str, 61 task_def: Dict[str, Any], 62 factory: "TaskSpecFactory", 63 source: "ConfigPartition", 64 parent: Optional["PoeTask.TaskSpec"] = None, 65 ): 66 super().__init__(name, task_def, factory, source, parent) 67 68 self.subtasks = [] 69 for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]): 70 if not isinstance(sub_task_def, (str, dict, list)): 71 raise ConfigValidationError( 72 f"Item #{index} in sequence task should be a value of " 73 "type: str | dict | list", 74 task_name=self.name, 75 ) 76 77 subtask_name = ( 78 sub_task_def 79 if ( 80 isinstance(sub_task_def, str) 81 and (sub_task_def[0].isalpha() or sub_task_def[0] == "_") 82 ) 83 else SequenceTask._subtask_name(name, index) 84 ) 85 task_type_key = self.task_type.resolve_task_type( 86 sub_task_def, 87 factory.config, 88 array_item=task_def.get("default_item_type", True), 89 ) 90 91 try: 92 self.subtasks.append( 93 factory.get( 94 subtask_name, sub_task_def, task_type_key, parent=self 95 ) 96 ) 97 except PoeException: 98 raise ConfigValidationError( 99 f"Failed to interpret subtask #{index} in sequence", 100 task_name=self.name, 101 ) 102 103 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 104 """ 105 Perform validations on this TaskSpec that apply to a specific task type 106 """ 107 for index, subtask in enumerate(self.subtasks): 108 if subtask.args: 109 raise ConfigValidationError( 110 "Unsupported option 'args' for task declared inside sequence" 111 ) 112 113 subtask.validate(config, task_specs)
58 def __init__( 59 self, 60 name: str, 61 task_def: Dict[str, Any], 62 factory: "TaskSpecFactory", 63 source: "ConfigPartition", 64 parent: Optional["PoeTask.TaskSpec"] = None, 65 ): 66 super().__init__(name, task_def, factory, source, parent) 67 68 self.subtasks = [] 69 for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]): 70 if not isinstance(sub_task_def, (str, dict, list)): 71 raise ConfigValidationError( 72 f"Item #{index} in sequence task should be a value of " 73 "type: str | dict | list", 74 task_name=self.name, 75 ) 76 77 subtask_name = ( 78 sub_task_def 79 if ( 80 isinstance(sub_task_def, str) 81 and (sub_task_def[0].isalpha() or sub_task_def[0] == "_") 82 ) 83 else SequenceTask._subtask_name(name, index) 84 ) 85 task_type_key = self.task_type.resolve_task_type( 86 sub_task_def, 87 factory.config, 88 array_item=task_def.get("default_item_type", True), 89 ) 90 91 try: 92 self.subtasks.append( 93 factory.get( 94 subtask_name, sub_task_def, task_type_key, parent=self 95 ) 96 ) 97 except PoeException: 98 raise ConfigValidationError( 99 f"Failed to interpret subtask #{index} in sequence", 100 task_name=self.name, 101 )
Inherited Members
20class ShellTask(PoeTask): 21 """ 22 A task consisting of a reference to a shell command 23 """ 24 25 content: str 26 27 __key__ = "shell" 28 29 class TaskOptions(PoeTask.TaskOptions): 30 interpreter: Optional[Union[str, list]] = None 31 32 def validate(self): 33 super().validate() 34 35 from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS 36 37 if ( 38 isinstance(self.interpreter, str) 39 and self.interpreter not in VALID_INTERPRETERS 40 ): 41 raise ConfigValidationError( 42 "Invalid value for option 'interpreter',\n" 43 f"Expected one of {VALID_INTERPRETERS}" 44 ) 45 46 if isinstance(self.interpreter, list): 47 if len(self.interpreter) == 0: 48 raise ConfigValidationError( 49 "Invalid value for option 'interpreter',\n" 50 "Expected at least one item in list." 51 ) 52 for item in self.interpreter: 53 if item not in VALID_INTERPRETERS: 54 raise ConfigValidationError( 55 f"Invalid item {item!r} in option 'interpreter',\n" 56 f"Expected one of {VALID_INTERPRETERS!r}" 57 ) 58 59 class TaskSpec(PoeTask.TaskSpec): 60 content: str 61 options: "ShellTask.TaskOptions" 62 63 spec: TaskSpec 64 65 def _handle_run( 66 self, 67 context: "RunContext", 68 env: "EnvVarsManager", 69 ) -> int: 70 named_arg_values, extra_args = self.get_parsed_arguments(env) 71 env.update(named_arg_values) 72 73 if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]): 74 raise PoeException( 75 f"Shell task {self.spec.name!r} does not accept arguments" 76 ) 77 78 interpreter_cmd = self.resolve_interpreter_cmd() 79 if not interpreter_cmd: 80 config_value = self._get_interpreter_config() 81 message = ( 82 f"Couldn't locate interpreter executable for {config_value!r} to run " 83 "shell task. " 84 ) 85 if self._is_windows and set(config_value).issubset({"posix", "bash"}): 86 message += "Installing Git Bash or using WSL should fix this." 87 else: 88 message += "Some dependencies may be missing from your system." 89 raise PoeException(message) 90 91 content = _unindent_code(self.spec.content).rstrip() 92 93 self._print_action(content, context.dry) 94 95 return self._get_executor(context, env).execute( 96 interpreter_cmd, input=content.encode() 97 ) 98 99 def _get_interpreter_config(self) -> Tuple[str, ...]: 100 result: Union[str, Tuple[str, ...]] = self.spec.options.get( 101 "interpreter", self.ctx.config.shell_interpreter 102 ) 103 if isinstance(result, str): 104 return (result,) 105 return tuple(result) 106 107 def resolve_interpreter_cmd(self) -> Optional[List[str]]: 108 """ 109 Return a formatted command for the first specified interpreter that can be 110 located. 111 """ 112 for item in self._get_interpreter_config(): 113 executable = self._locate_interpreter(item) 114 if executable is None: 115 continue 116 117 if item in ("pwsh", "powershell"): 118 return [executable, "-NoLogo", "-Command", "-"] 119 120 return [executable] 121 122 return None 123 124 def _locate_interpreter(self, interpreter: str) -> Optional[str]: 125 from shutil import which 126 127 result = None 128 prog_files = environ.get("PROGRAMFILES", "C:\\Program Files") 129 130 # Try use $SHELL from the environment as a hint 131 shell_var = environ.get("SHELL", "") 132 if shell_var.endswith(f"/{interpreter}") and which(shell_var) == shell_var: 133 result = shell_var 134 135 elif interpreter == "posix": 136 # look for any known posix shell 137 result = ( 138 self._locate_interpreter("sh") 139 or self._locate_interpreter("bash") 140 or self._locate_interpreter("zsh") 141 ) 142 143 elif interpreter == "sh": 144 result = which("sh") or which("/bin/sh") 145 146 # Specifically look for git sh on windows 147 if result is None and self._is_windows: 148 result = which(f"{prog_files}\\Git\\bin\\sh.exe") 149 150 elif interpreter == "bash": 151 if self._is_windows: 152 # Specifically look for git bash on windows as the preferred option 153 # Don't trust bash from the path because it might be a useless decoy 154 result = ( 155 which(f"{prog_files}\\Git\\bin\\bash.exe") 156 or which("/bin/bash") 157 or which("bash") 158 ) 159 else: 160 result = which("bash") or which("/bin/bash") 161 162 elif interpreter == "zsh": 163 result = which("zsh") or which("/bin/zsh") 164 165 elif interpreter == "fish": 166 result = which("fish") or which("/bin/fish") 167 168 elif interpreter in ("pwsh", "powershell"): 169 # Look for the pwsh executable and verify the version matches 170 result = ( 171 which("pwsh") 172 or which(f"{prog_files}\\PowerShell\\7\\pwsh.exe") 173 or which(f"{prog_files}\\PowerShell\\6\\pwsh.exe") 174 ) 175 176 if result is None and interpreter == "powershell" and self._is_windows: 177 # Look for older versions of powershell 178 result = which("powershell") or which( 179 environ.get("WINDIR", "C:\\Windows") 180 + "\\System32\\WindowsPowerShell\\v1.0\\powershell.EXE" 181 ) 182 183 elif interpreter == "python": 184 # Exactly which python executable to use is usually resolved by the executor 185 result = "python" 186 187 return result
A task consisting of a reference to a shell command
107 def resolve_interpreter_cmd(self) -> Optional[List[str]]: 108 """ 109 Return a formatted command for the first specified interpreter that can be 110 located. 111 """ 112 for item in self._get_interpreter_config(): 113 executable = self._locate_interpreter(item) 114 if executable is None: 115 continue 116 117 if item in ("pwsh", "powershell"): 118 return [executable, "-NoLogo", "-Command", "-"] 119 120 return [executable] 121 122 return None
Return a formatted command for the first specified interpreter that can be located.
29 class TaskOptions(PoeTask.TaskOptions): 30 interpreter: Optional[Union[str, list]] = None 31 32 def validate(self): 33 super().validate() 34 35 from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS 36 37 if ( 38 isinstance(self.interpreter, str) 39 and self.interpreter not in VALID_INTERPRETERS 40 ): 41 raise ConfigValidationError( 42 "Invalid value for option 'interpreter',\n" 43 f"Expected one of {VALID_INTERPRETERS}" 44 ) 45 46 if isinstance(self.interpreter, list): 47 if len(self.interpreter) == 0: 48 raise ConfigValidationError( 49 "Invalid value for option 'interpreter',\n" 50 "Expected at least one item in list." 51 ) 52 for item in self.interpreter: 53 if item not in VALID_INTERPRETERS: 54 raise ConfigValidationError( 55 f"Invalid item {item!r} in option 'interpreter',\n" 56 f"Expected one of {VALID_INTERPRETERS!r}" 57 )
A special kind of config object that parses options ...
32 def validate(self): 33 super().validate() 34 35 from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS 36 37 if ( 38 isinstance(self.interpreter, str) 39 and self.interpreter not in VALID_INTERPRETERS 40 ): 41 raise ConfigValidationError( 42 "Invalid value for option 'interpreter',\n" 43 f"Expected one of {VALID_INTERPRETERS}" 44 ) 45 46 if isinstance(self.interpreter, list): 47 if len(self.interpreter) == 0: 48 raise ConfigValidationError( 49 "Invalid value for option 'interpreter',\n" 50 "Expected at least one item in list." 51 ) 52 for item in self.interpreter: 53 if item not in VALID_INTERPRETERS: 54 raise ConfigValidationError( 55 f"Invalid item {item!r} in option 'interpreter',\n" 56 f"Expected one of {VALID_INTERPRETERS!r}" 57 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
Inherited Members
29class SwitchTask(PoeTask): 30 """ 31 A task that runs one of several `case` subtasks depending on the output of a 32 `switch` subtask. 33 """ 34 35 __key__ = "switch" 36 __content_type__: ClassVar[Type] = list 37 38 class TaskOptions(PoeTask.TaskOptions): 39 control: Union[str, dict] 40 default: Literal["pass", "fail"] = "fail" 41 42 @classmethod 43 def normalize( 44 cls, 45 config: Any, 46 strict: bool = True, 47 ): 48 """ 49 Perform validations that require access to to the raw config. 50 """ 51 if strict and isinstance(config, dict): 52 # Subtasks may not declare certain options 53 for subtask_def in config.get("switch", tuple()): 54 for banned_option in SUBTASK_OPTIONS_BLOCKLIST: 55 if banned_option in subtask_def: 56 if "case" not in subtask_def: 57 raise ConfigValidationError( 58 "Default case includes incompatible option " 59 f"{banned_option!r}" 60 ) 61 raise ConfigValidationError( 62 f"Case {subtask_def.get('case')!r} includes " 63 f"incompatible option {banned_option!r}" 64 ) 65 66 return super().normalize(config, strict) 67 68 class TaskSpec(PoeTask.TaskSpec): 69 control_task_spec: PoeTask.TaskSpec 70 case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...] 71 options: "SwitchTask.TaskOptions" 72 73 def __init__( 74 self, 75 name: str, 76 task_def: Dict[str, Any], 77 factory: "TaskSpecFactory", 78 source: "ConfigPartition", 79 parent: Optional["PoeTask.TaskSpec"] = None, 80 ): 81 super().__init__(name, task_def, factory, source, parent) 82 83 switch_args = task_def.get("args") 84 control_task_def = task_def["control"] 85 86 if switch_args: 87 if isinstance(control_task_def, str): 88 control_task_def = { 89 factory.config.default_task_type: control_task_def 90 } 91 control_task_def = dict(control_task_def, args=switch_args) 92 93 self.control_task_spec = factory.get( 94 task_name=f"{name}[__control__]", task_def=control_task_def, parent=self 95 ) 96 97 case_task_specs = [] 98 for switch_item in task_def["switch"]: 99 case_task_def = dict(switch_item, args=switch_args) 100 case = case_task_def.pop("case", DEFAULT_CASE) 101 case_tuple = tuple(case) if isinstance(case, list) else (case,) 102 case_task_index = ",".join(str(value) for value in case_tuple) 103 case_task_specs.append( 104 ( 105 case_tuple, 106 factory.get( 107 task_name=f"{name}[{case_task_index}]", 108 task_def=case_task_def, 109 parent=self, 110 ), 111 ) 112 ) 113 114 self.case_task_specs = tuple(case_task_specs) 115 116 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 117 from collections import defaultdict 118 119 allowed_control_task_types = ("expr", "cmd", "script") 120 if ( 121 self.control_task_spec.task_type.__key__ 122 not in allowed_control_task_types 123 ): 124 raise ConfigValidationError( 125 f"Control task must have a type that is one of " 126 f"{allowed_control_task_types!r}" 127 ) 128 129 cases: MutableMapping[Any, int] = defaultdict(int) 130 for case_keys, case_task_spec in self.case_task_specs: 131 for case_key in case_keys: 132 cases[case_key] += 1 133 134 # Ensure case keys don't overlap (and only one default case) 135 for case, count in cases.items(): 136 if count > 1: 137 if case is DEFAULT_CASE: 138 raise ConfigValidationError( 139 "Switch array includes more than one default case" 140 ) 141 raise ConfigValidationError( 142 f"Switch array includes more than one case for {case!r}" 143 ) 144 145 if self.options.default != "fail" and DEFAULT_CASE in cases: 146 raise ConfigValidationError( 147 "switch tasks should not declare both a default case and the " 148 "'default' option" 149 ) 150 151 # Validate subtask specs 152 self.control_task_spec.validate(config, task_specs) 153 for _, case_task_spec in self.case_task_specs: 154 case_task_spec.validate(config, task_specs) 155 156 spec: TaskSpec 157 control_task: PoeTask 158 switch_tasks: Dict[str, PoeTask] 159 160 def __init__( 161 self, 162 spec: TaskSpec, 163 invocation: Tuple[str, ...], 164 ctx: TaskContext, 165 capture_stdout: bool = False, 166 ): 167 super().__init__(spec, invocation, ctx, capture_stdout) 168 169 control_task_name = f"{spec.name}[__control__]" 170 control_invocation: Tuple[str, ...] = (control_task_name,) 171 options = self.spec.options 172 if options.get("args"): 173 control_invocation = (*control_invocation, *invocation[1:]) 174 175 self.control_task = self.spec.control_task_spec.create_task( 176 invocation=control_invocation, 177 ctx=TaskContext.from_task(self), 178 capture_stdout=True, 179 ) 180 181 self.switch_tasks = {} 182 for case_keys, case_spec in spec.case_task_specs: 183 task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",) 184 if options.get("args"): 185 task_invocation = (*task_invocation, *invocation[1:]) 186 187 case_task = case_spec.create_task( 188 invocation=task_invocation, 189 ctx=TaskContext.from_task(self), 190 capture_stdout=self.capture_stdout, 191 ) 192 for case_key in case_keys: 193 self.switch_tasks[case_key] = case_task 194 195 def _handle_run( 196 self, 197 context: "RunContext", 198 env: "EnvVarsManager", 199 ) -> int: 200 named_arg_values, extra_args = self.get_parsed_arguments(env) 201 env.update(named_arg_values) 202 203 if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]): 204 raise PoeException(f"Switch task {self.name!r} does not accept arguments") 205 206 # Indicate on the global context that there are multiple stages to this task 207 context.multistage = True 208 209 task_result = self.control_task.run(context=context, parent_env=env) 210 if task_result: 211 raise ExecutionError( 212 f"Switch task {self.name!r} aborted after failed control task" 213 ) 214 215 if context.dry: 216 self._print_action( 217 "unresolved case for switch task", dry=True, unresolved=True 218 ) 219 return 0 220 221 control_task_output = context.get_task_output(self.control_task.invocation) 222 case_task = self.switch_tasks.get( 223 control_task_output, self.switch_tasks.get(DEFAULT_CASE) 224 ) 225 226 if case_task is None: 227 if self.spec.options.default == "pass": 228 return 0 229 raise ExecutionError( 230 f"Control value {control_task_output!r} did not match any cases in " 231 f"switch task {self.name!r}." 232 ) 233 234 result = case_task.run(context=context, parent_env=env) 235 236 if self.capture_stdout is True: 237 # The executor saved output for the case task, but we need it to be 238 # registered for this switch task as well 239 context.save_task_output( 240 self.invocation, context.get_task_output(case_task.invocation).encode() 241 ) 242 243 return result
A task that runs one of several case subtasks depending on the output of a
switch subtask.
160 def __init__( 161 self, 162 spec: TaskSpec, 163 invocation: Tuple[str, ...], 164 ctx: TaskContext, 165 capture_stdout: bool = False, 166 ): 167 super().__init__(spec, invocation, ctx, capture_stdout) 168 169 control_task_name = f"{spec.name}[__control__]" 170 control_invocation: Tuple[str, ...] = (control_task_name,) 171 options = self.spec.options 172 if options.get("args"): 173 control_invocation = (*control_invocation, *invocation[1:]) 174 175 self.control_task = self.spec.control_task_spec.create_task( 176 invocation=control_invocation, 177 ctx=TaskContext.from_task(self), 178 capture_stdout=True, 179 ) 180 181 self.switch_tasks = {} 182 for case_keys, case_spec in spec.case_task_specs: 183 task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",) 184 if options.get("args"): 185 task_invocation = (*task_invocation, *invocation[1:]) 186 187 case_task = case_spec.create_task( 188 invocation=task_invocation, 189 ctx=TaskContext.from_task(self), 190 capture_stdout=self.capture_stdout, 191 ) 192 for case_key in case_keys: 193 self.switch_tasks[case_key] = case_task
38 class TaskOptions(PoeTask.TaskOptions): 39 control: Union[str, dict] 40 default: Literal["pass", "fail"] = "fail" 41 42 @classmethod 43 def normalize( 44 cls, 45 config: Any, 46 strict: bool = True, 47 ): 48 """ 49 Perform validations that require access to to the raw config. 50 """ 51 if strict and isinstance(config, dict): 52 # Subtasks may not declare certain options 53 for subtask_def in config.get("switch", tuple()): 54 for banned_option in SUBTASK_OPTIONS_BLOCKLIST: 55 if banned_option in subtask_def: 56 if "case" not in subtask_def: 57 raise ConfigValidationError( 58 "Default case includes incompatible option " 59 f"{banned_option!r}" 60 ) 61 raise ConfigValidationError( 62 f"Case {subtask_def.get('case')!r} includes " 63 f"incompatible option {banned_option!r}" 64 ) 65 66 return super().normalize(config, strict)
A special kind of config object that parses options ...
42 @classmethod 43 def normalize( 44 cls, 45 config: Any, 46 strict: bool = True, 47 ): 48 """ 49 Perform validations that require access to to the raw config. 50 """ 51 if strict and isinstance(config, dict): 52 # Subtasks may not declare certain options 53 for subtask_def in config.get("switch", tuple()): 54 for banned_option in SUBTASK_OPTIONS_BLOCKLIST: 55 if banned_option in subtask_def: 56 if "case" not in subtask_def: 57 raise ConfigValidationError( 58 "Default case includes incompatible option " 59 f"{banned_option!r}" 60 ) 61 raise ConfigValidationError( 62 f"Case {subtask_def.get('case')!r} includes " 63 f"incompatible option {banned_option!r}" 64 ) 65 66 return super().normalize(config, strict)
Perform validations that require access to to the raw config.
68 class TaskSpec(PoeTask.TaskSpec): 69 control_task_spec: PoeTask.TaskSpec 70 case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...] 71 options: "SwitchTask.TaskOptions" 72 73 def __init__( 74 self, 75 name: str, 76 task_def: Dict[str, Any], 77 factory: "TaskSpecFactory", 78 source: "ConfigPartition", 79 parent: Optional["PoeTask.TaskSpec"] = None, 80 ): 81 super().__init__(name, task_def, factory, source, parent) 82 83 switch_args = task_def.get("args") 84 control_task_def = task_def["control"] 85 86 if switch_args: 87 if isinstance(control_task_def, str): 88 control_task_def = { 89 factory.config.default_task_type: control_task_def 90 } 91 control_task_def = dict(control_task_def, args=switch_args) 92 93 self.control_task_spec = factory.get( 94 task_name=f"{name}[__control__]", task_def=control_task_def, parent=self 95 ) 96 97 case_task_specs = [] 98 for switch_item in task_def["switch"]: 99 case_task_def = dict(switch_item, args=switch_args) 100 case = case_task_def.pop("case", DEFAULT_CASE) 101 case_tuple = tuple(case) if isinstance(case, list) else (case,) 102 case_task_index = ",".join(str(value) for value in case_tuple) 103 case_task_specs.append( 104 ( 105 case_tuple, 106 factory.get( 107 task_name=f"{name}[{case_task_index}]", 108 task_def=case_task_def, 109 parent=self, 110 ), 111 ) 112 ) 113 114 self.case_task_specs = tuple(case_task_specs) 115 116 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 117 from collections import defaultdict 118 119 allowed_control_task_types = ("expr", "cmd", "script") 120 if ( 121 self.control_task_spec.task_type.__key__ 122 not in allowed_control_task_types 123 ): 124 raise ConfigValidationError( 125 f"Control task must have a type that is one of " 126 f"{allowed_control_task_types!r}" 127 ) 128 129 cases: MutableMapping[Any, int] = defaultdict(int) 130 for case_keys, case_task_spec in self.case_task_specs: 131 for case_key in case_keys: 132 cases[case_key] += 1 133 134 # Ensure case keys don't overlap (and only one default case) 135 for case, count in cases.items(): 136 if count > 1: 137 if case is DEFAULT_CASE: 138 raise ConfigValidationError( 139 "Switch array includes more than one default case" 140 ) 141 raise ConfigValidationError( 142 f"Switch array includes more than one case for {case!r}" 143 ) 144 145 if self.options.default != "fail" and DEFAULT_CASE in cases: 146 raise ConfigValidationError( 147 "switch tasks should not declare both a default case and the " 148 "'default' option" 149 ) 150 151 # Validate subtask specs 152 self.control_task_spec.validate(config, task_specs) 153 for _, case_task_spec in self.case_task_specs: 154 case_task_spec.validate(config, task_specs)
73 def __init__( 74 self, 75 name: str, 76 task_def: Dict[str, Any], 77 factory: "TaskSpecFactory", 78 source: "ConfigPartition", 79 parent: Optional["PoeTask.TaskSpec"] = None, 80 ): 81 super().__init__(name, task_def, factory, source, parent) 82 83 switch_args = task_def.get("args") 84 control_task_def = task_def["control"] 85 86 if switch_args: 87 if isinstance(control_task_def, str): 88 control_task_def = { 89 factory.config.default_task_type: control_task_def 90 } 91 control_task_def = dict(control_task_def, args=switch_args) 92 93 self.control_task_spec = factory.get( 94 task_name=f"{name}[__control__]", task_def=control_task_def, parent=self 95 ) 96 97 case_task_specs = [] 98 for switch_item in task_def["switch"]: 99 case_task_def = dict(switch_item, args=switch_args) 100 case = case_task_def.pop("case", DEFAULT_CASE) 101 case_tuple = tuple(case) if isinstance(case, list) else (case,) 102 case_task_index = ",".join(str(value) for value in case_tuple) 103 case_task_specs.append( 104 ( 105 case_tuple, 106 factory.get( 107 task_name=f"{name}[{case_task_index}]", 108 task_def=case_task_def, 109 parent=self, 110 ), 111 ) 112 ) 113 114 self.case_task_specs = tuple(case_task_specs)