poethepoet.task.base
1import re 2import sys 3from os import environ 4from pathlib import Path 5from typing import ( 6 TYPE_CHECKING, 7 Any, 8 ClassVar, 9 Dict, 10 Iterator, 11 List, 12 Mapping, 13 NamedTuple, 14 Optional, 15 Sequence, 16 Tuple, 17 Type, 18 Union, 19) 20 21from ..exceptions import ConfigValidationError, PoeException 22from ..options import PoeOptions 23 24if TYPE_CHECKING: 25 from ..config import ConfigPartition, PoeConfig 26 from ..context import RunContext 27 from ..env.manager import EnvVarsManager 28 from ..ui import PoeUi 29 from .args import PoeTaskArgs 30 31 32_TASK_NAME_PATTERN = re.compile(r"^\w[\w\d\-\_\+\:]*$") 33 34 35class MetaPoeTask(type): 36 """ 37 This metaclass makes all descendents of PoeTask (task types) register themselves on 38 declaration and validates that they include the expected class attributes. 39 """ 40 41 def __init__(cls, *args): 42 super().__init__(*args) 43 if cls.__name__ == "PoeTask": 44 return 45 46 assert isinstance(getattr(cls, "__key__", None), str) 47 assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions) 48 PoeTask._PoeTask__task_types[cls.__key__] = cls 49 50 # Give each TaskSpec a reference to its parent PoeTask 51 if "TaskSpec" in cls.__dict__: 52 cls.TaskSpec.task_type = cls 53 54 55TaskContent = Union[str, Sequence[Union[str, Mapping[str, Any]]]] 56 57TaskDef = Union[str, Mapping[str, Any], Sequence[Union[str, Mapping[str, Any]]]] 58 59 60class TaskSpecFactory: 61 __cache: Dict[str, "PoeTask.TaskSpec"] 62 config: "PoeConfig" 63 64 def __init__(self, config: "PoeConfig"): 65 self.__cache = {} 66 self.config = config 67 68 def __contains__(self, other) -> bool: 69 return other in self.__cache 70 71 def get( 72 self, 73 task_name: str, 74 task_def: Optional[TaskDef] = None, 75 task_type: Optional[str] = None, 76 parent: Optional["PoeTask.TaskSpec"] = None, 77 ) -> "PoeTask.TaskSpec": 78 if task_def and parent: 79 # This is probably a subtask and will be cached by the parent task_spec 80 if not task_type: 81 task_type = PoeTask.resolve_task_type(task_def, self.config) 82 assert task_type 83 return self.create( 84 task_name, task_type, task_def, source=parent.source, parent=parent 85 ) 86 87 if task_name not in self.__cache: 88 self.load(task_name) 89 90 return self.__cache[task_name] 91 92 def create( 93 self, 94 task_name: str, 95 task_type: str, 96 task_def: TaskDef, 97 source: "ConfigPartition", 98 parent: Optional["PoeTask.TaskSpec"] = None, 99 ) -> "PoeTask.TaskSpec": 100 """ 101 A parent task should be provided when this task is defined inline within another 102 task, for example as part of a sequence. 103 """ 104 if not isinstance(task_def, dict): 105 task_def = {task_type: task_def} 106 107 return PoeTask.lookup_task_spec_cls(task_type)( 108 name=task_name, 109 task_def=task_def, 110 factory=self, 111 source=source, 112 parent=parent, 113 ) 114 115 def load_all(self): 116 for task_name in self.config.task_names: 117 self.load(task_name) 118 119 return self 120 121 def load(self, task_name: str): 122 task_def, config_partition = self.config.lookup_task(task_name) 123 124 if task_def is None or config_partition is None: 125 raise PoeException(f"Cannot instantiate unknown task {task_name!r}") 126 127 task_type = PoeTask.resolve_task_type(task_def, self.config) 128 if not task_type: 129 raise ConfigValidationError( 130 "Task definition must be a string, a list, or a table including exactly" 131 " one task key\n" 132 f"Available task keys: {set(PoeTask.get_task_types())!r}", 133 task_name=task_name, 134 filename=( 135 None if config_partition.is_primary else str(config_partition.path) 136 ), 137 ) 138 139 self.__cache[task_name] = self.create( 140 task_name, task_type, task_def, source=config_partition 141 ) 142 143 def __iter__(self): 144 return iter(self.__cache.values()) 145 146 147class TaskContext(NamedTuple): 148 """ 149 Collection of contextual config inherited from a parent task to a child task 150 """ 151 152 config: "PoeConfig" 153 cwd: str 154 ui: "PoeUi" 155 specs: "TaskSpecFactory" 156 157 @classmethod 158 def from_task(cls, parent_task: "PoeTask"): 159 return cls( 160 config=parent_task.ctx.config, 161 cwd=str(parent_task.spec.options.get("cwd", parent_task.ctx.cwd)), 162 specs=parent_task.ctx.specs, 163 ui=parent_task.ctx.ui, 164 ) 165 166 167class PoeTask(metaclass=MetaPoeTask): 168 __key__: ClassVar[str] 169 __content_type__: ClassVar[Type] = str 170 171 class TaskOptions(PoeOptions): 172 args: Optional[Union[dict, list]] = None 173 capture_stdout: Optional[str] = None 174 cwd: Optional[str] = None 175 deps: Optional[Sequence[str]] = None 176 env: Optional[dict] = None 177 envfile: Optional[Union[str, list]] = None 178 executor: Optional[dict] = None 179 help: Optional[str] = None 180 uses: Optional[dict] = None 181 182 def validate(self): 183 """ 184 Validation rules that don't require any extra context go here. 185 """ 186 187 class TaskSpec: 188 name: str 189 content: TaskContent 190 options: "PoeTask.TaskOptions" 191 task_type: Type["PoeTask"] 192 source: "ConfigPartition" 193 parent: Optional["PoeTask.TaskSpec"] = None 194 195 _args: Optional["PoeTaskArgs"] = None 196 197 def __init__( 198 self, 199 name: str, 200 task_def: Dict[str, Any], 201 factory: TaskSpecFactory, 202 source: "ConfigPartition", 203 parent: Optional["PoeTask.TaskSpec"] = None, 204 ): 205 self.name = name 206 self.content = task_def[self.task_type.__key__] 207 self.options = self._parse_options(task_def) 208 self.source = source 209 self.parent = parent 210 211 def _parse_options(self, task_def: Dict[str, Any]): 212 try: 213 return next( 214 self.task_type.TaskOptions.parse( 215 task_def, extra_keys=(self.task_type.__key__,) 216 ) 217 ) 218 except ConfigValidationError as error: 219 error.task_name = self.name 220 raise 221 222 def get_task_env( 223 self, 224 parent_env: "EnvVarsManager", 225 uses_values: Optional[Mapping[str, str]] = None, 226 ) -> "EnvVarsManager": 227 """ 228 Resolve the EnvVarsManager for this task, relative to the given parent_env 229 """ 230 task_envfile = self.options.get("envfile") 231 task_env = self.options.get("env") 232 233 result = parent_env.clone() 234 235 # Include env vars from outputs of upstream dependencies 236 if uses_values: 237 result.update(uses_values) 238 239 result.set("POE_CONF_DIR", str(self.source.config_dir)) 240 result.apply_env_config( 241 task_envfile, 242 task_env, 243 config_dir=self.source.config_dir, 244 config_working_dir=self.source.cwd, 245 ) 246 247 return result 248 249 @property 250 def args(self) -> Optional["PoeTaskArgs"]: 251 from .args import PoeTaskArgs 252 253 if not self._args and self.options.args: 254 self._args = PoeTaskArgs(self.options.args, self.name) 255 256 return self._args 257 258 def create_task( 259 self, 260 invocation: Tuple[str, ...], 261 ctx: TaskContext, 262 capture_stdout: Union[str, bool] = False, 263 ) -> "PoeTask": 264 return self.task_type( 265 spec=self, 266 invocation=invocation, 267 capture_stdout=capture_stdout, 268 ctx=ctx, 269 ) 270 271 def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory): 272 try: 273 self._base_validations(config, task_specs) 274 self._task_validations(config, task_specs) 275 except ConfigValidationError as error: 276 error.task_name = self.name 277 raise 278 279 def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 280 """ 281 Perform validations on this TaskSpec that apply to all task types 282 """ 283 if not (self.name[0].isalpha() or self.name[0] == "_"): 284 raise ConfigValidationError( 285 "Task names must start with a letter or underscore." 286 ) 287 288 if not self.parent and not _TASK_NAME_PATTERN.match(self.name): 289 raise ConfigValidationError( 290 "Task names characters must be alphanumeric, colon, underscore or " 291 "dash." 292 ) 293 294 if not isinstance(self.content, self.task_type.__content_type__): 295 raise ConfigValidationError( 296 f"Content for {self.task_type.__name__} must be a " 297 f"{self.task_type.__content_type__.__name__}" 298 ) 299 300 if self.options.deps: 301 for dep in self.options.deps: 302 dep_task_name = dep.split(" ", 1)[0] 303 if dep_task_name not in task_specs: 304 raise ConfigValidationError( 305 "'deps' option includes reference to unknown task: " 306 f"{dep_task_name!r}" 307 ) 308 309 if task_specs.get(dep_task_name).options.get("use_exec", False): 310 raise ConfigValidationError( 311 f"'deps' option includes reference to task with " 312 f"'use_exec' set to true: {dep_task_name!r}" 313 ) 314 315 if self.options.uses: 316 from ..helpers import is_valid_env_var 317 318 for key, dep in self.options.uses.items(): 319 if not is_valid_env_var(key): 320 raise ConfigValidationError( 321 f"'uses' option includes invalid key: {key!r}" 322 ) 323 324 dep_task_name = dep.split(" ", 1)[0] 325 if dep_task_name not in task_specs: 326 raise ConfigValidationError( 327 "'uses' options includes reference to unknown task: " 328 f"{dep_task_name!r}" 329 ) 330 331 referenced_task = task_specs.get(dep_task_name) 332 if referenced_task.options.get("use_exec", False): 333 raise ConfigValidationError( 334 f"'uses' option references task with 'use_exec' set to " 335 f"true: {dep_task_name!r}" 336 ) 337 if referenced_task.options.get("capture_stdout"): 338 raise ConfigValidationError( 339 f"'uses' option references task with 'capture_stdout' " 340 f"option set: {dep_task_name!r}" 341 ) 342 343 def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 344 """ 345 Perform validations on this TaskSpec that apply to a specific task type 346 """ 347 348 spec: TaskSpec 349 ctx: TaskContext 350 _parsed_args: Optional[Tuple[Dict[str, str], Tuple[str, ...]]] = None 351 352 __task_types: ClassVar[Dict[str, Type["PoeTask"]]] = {} 353 __upstream_invocations: Optional[ 354 Dict[str, Union[List[Tuple[str, ...]], Dict[str, Tuple[str, ...]]]] 355 ] = None 356 357 def __init__( 358 self, 359 spec: TaskSpec, 360 invocation: Tuple[str, ...], 361 ctx: TaskContext, 362 capture_stdout: Union[str, bool] = False, 363 ): 364 self.spec = spec 365 self.invocation = invocation 366 self.ctx = ctx 367 self.capture_stdout = spec.options.capture_stdout or capture_stdout 368 self._is_windows = sys.platform == "win32" 369 370 @property 371 def name(self): 372 return self.spec.name 373 374 @classmethod 375 def lookup_task_spec_cls(cls, task_key: str) -> Type[TaskSpec]: 376 return cls.__task_types[task_key].TaskSpec 377 378 @classmethod 379 def resolve_task_type( 380 cls, 381 task_def: TaskDef, 382 config: "PoeConfig", 383 array_item: Union[bool, str] = False, 384 ) -> Optional[str]: 385 if isinstance(task_def, str): 386 if array_item: 387 return ( 388 array_item 389 if isinstance(array_item, str) 390 else config.default_array_item_task_type 391 ) 392 else: 393 return config.default_task_type 394 395 elif isinstance(task_def, dict): 396 task_type_keys = set(task_def.keys()).intersection(cls.__task_types) 397 if len(task_type_keys) == 1: 398 return next(iter(task_type_keys)) 399 400 elif isinstance(task_def, list): 401 return config.default_array_task_type 402 403 return None 404 405 def _parse_named_args( 406 self, extra_args: Sequence[str], env: "EnvVarsManager" 407 ) -> Optional[Dict[str, str]]: 408 if task_args := self.spec.args: 409 return task_args.parse(extra_args, env, self.ctx.ui.program_name) 410 411 return None 412 413 def get_parsed_arguments( 414 self, env: "EnvVarsManager" 415 ) -> Tuple[Dict[str, str], Tuple[str, ...]]: 416 if self._parsed_args is None: 417 all_args = self.invocation[1:] 418 419 if task_args := self.spec.args: 420 try: 421 split_index = all_args.index("--") 422 option_args = all_args[:split_index] 423 extra_args = all_args[split_index + 1 :] 424 except ValueError: 425 option_args = all_args 426 extra_args = tuple() 427 428 self._parsed_args = ( 429 task_args.parse(option_args, env, self.ctx.ui.program_name), 430 extra_args, 431 ) 432 433 else: 434 self._parsed_args = ({}, all_args) 435 436 return self._parsed_args 437 438 def run( 439 self, 440 context: "RunContext", 441 parent_env: Optional["EnvVarsManager"] = None, 442 ) -> int: 443 """ 444 Run this task 445 """ 446 447 if environ.get("POE_DEBUG"): 448 task_type_key = self.__key__ # type: ignore[attr-defined] 449 print(f" * Running {task_type_key}:{self.name}") 450 print(f" . Invocation {self.invocation!r}") 451 452 upstream_invocations = self._get_upstream_invocations(context) 453 454 if context.dry and upstream_invocations.get("uses", {}): 455 self._print_action( 456 ( 457 "unresolved dependency task results via uses option for task " 458 f"{self.name!r}" 459 ), 460 dry=True, 461 unresolved=True, 462 ) 463 return 0 464 465 task_env = self.spec.get_task_env( 466 parent_env or context.env, 467 context._get_dep_values(upstream_invocations["uses"]), 468 ) 469 470 if environ.get("POE_DEBUG"): 471 named_arg_values, extra_args = self.get_parsed_arguments(task_env) 472 print(f" . Parsed args {named_arg_values!r}") 473 print(f" . Extra args {extra_args!r}") 474 475 return self._handle_run(context, task_env) 476 477 def _handle_run( 478 self, 479 context: "RunContext", 480 env: "EnvVarsManager", 481 ) -> int: 482 """ 483 This method must be implemented by a subclass and return a single executor 484 result. 485 """ 486 raise NotImplementedError 487 488 def _get_executor(self, context: "RunContext", env: "EnvVarsManager"): 489 return context.get_executor( 490 self.invocation, 491 env, 492 working_dir=self.get_working_dir(env), 493 executor_config=self.spec.options.get("executor"), 494 capture_stdout=self.capture_stdout, 495 ) 496 497 def get_working_dir( 498 self, 499 env: "EnvVarsManager", 500 ) -> Path: 501 cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd)) 502 working_dir = Path(cwd_option) 503 504 if not working_dir.is_absolute(): 505 working_dir = self.ctx.config.project_dir / working_dir 506 507 return working_dir 508 509 def iter_upstream_tasks( 510 self, context: "RunContext" 511 ) -> Iterator[Tuple[str, "PoeTask"]]: 512 invocations = self._get_upstream_invocations(context) 513 for invocation in invocations["deps"]: 514 yield ("", self._instantiate_dep(invocation, capture_stdout=False)) 515 for key, invocation in invocations["uses"].items(): 516 yield (key, self._instantiate_dep(invocation, capture_stdout=True)) 517 518 def _get_upstream_invocations(self, context: "RunContext"): 519 """ 520 NB. this memoization assumes the context (and contained env vars) will be the 521 same in all instances for the lifetime of this object. Whilst this should be OK 522 for all current usecases is it strictly speaking something that this object 523 should not know enough to safely assume. So we probably want to revisit this. 524 """ 525 import shlex 526 527 options = self.spec.options 528 529 if self.__upstream_invocations is None: 530 env = self.spec.get_task_env(context.env) 531 env.update(self.get_parsed_arguments(env)[0]) 532 533 self.__upstream_invocations = { 534 "deps": [ 535 tuple(shlex.split(env.fill_template(task_ref))) 536 for task_ref in options.get("deps", tuple()) 537 ], 538 "uses": { 539 key: tuple(shlex.split(env.fill_template(task_ref))) 540 for key, task_ref in options.get("uses", {}).items() 541 }, 542 } 543 544 return self.__upstream_invocations 545 546 def _instantiate_dep( 547 self, invocation: Tuple[str, ...], capture_stdout: bool 548 ) -> "PoeTask": 549 return self.ctx.specs.get(invocation[0]).create_task( 550 invocation=invocation, 551 ctx=TaskContext( 552 config=self.ctx.config, 553 cwd=str(self.ctx.config.project_dir), 554 specs=self.ctx.specs, 555 ui=self.ctx.ui, 556 ), 557 capture_stdout=capture_stdout, 558 ) 559 560 def has_deps(self) -> bool: 561 return bool( 562 self.spec.options.get("deps", False) or self.spec.options.get("uses", False) 563 ) 564 565 @classmethod 566 def is_task_type( 567 cls, task_def_key: str, content_type: Optional[Type] = None 568 ) -> bool: 569 """ 570 Checks whether the given key identifies a known task type. 571 Optionally also check whether the given content_type matches the type of content 572 for this tasks type. 573 """ 574 return task_def_key in cls.__task_types and ( 575 content_type is None 576 or cls.__task_types[task_def_key].__content_type__ is content_type 577 ) 578 579 @classmethod 580 def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]: 581 if content_type: 582 return tuple( 583 task_type 584 for task_type, task_cls in cls.__task_types.items() 585 if task_cls.__content_type__ is content_type 586 ) 587 return tuple(task_type for task_type in cls.__task_types.keys()) 588 589 def _print_action(self, action: str, dry: bool, unresolved: bool = False): 590 """ 591 Print the action taken by a task just before executing it. 592 """ 593 min_verbosity = -1 if dry else 0 594 arrow = "??" if unresolved else "<=" if self.capture_stdout else "=>" 595 self.ctx.ui.print_msg( 596 f"<hl>Poe {arrow}</hl> <action>{action}</action>", min_verbosity 597 ) 598 599 class Error(Exception): 600 pass
36class MetaPoeTask(type): 37 """ 38 This metaclass makes all descendents of PoeTask (task types) register themselves on 39 declaration and validates that they include the expected class attributes. 40 """ 41 42 def __init__(cls, *args): 43 super().__init__(*args) 44 if cls.__name__ == "PoeTask": 45 return 46 47 assert isinstance(getattr(cls, "__key__", None), str) 48 assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions) 49 PoeTask._PoeTask__task_types[cls.__key__] = cls 50 51 # Give each TaskSpec a reference to its parent PoeTask 52 if "TaskSpec" in cls.__dict__: 53 cls.TaskSpec.task_type = cls
This metaclass makes all descendents of PoeTask (task types) register themselves on declaration and validates that they include the expected class attributes.
42 def __init__(cls, *args): 43 super().__init__(*args) 44 if cls.__name__ == "PoeTask": 45 return 46 47 assert isinstance(getattr(cls, "__key__", None), str) 48 assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions) 49 PoeTask._PoeTask__task_types[cls.__key__] = cls 50 51 # Give each TaskSpec a reference to its parent PoeTask 52 if "TaskSpec" in cls.__dict__: 53 cls.TaskSpec.task_type = cls
Inherited Members
- builtins.type
- mro
61class TaskSpecFactory: 62 __cache: Dict[str, "PoeTask.TaskSpec"] 63 config: "PoeConfig" 64 65 def __init__(self, config: "PoeConfig"): 66 self.__cache = {} 67 self.config = config 68 69 def __contains__(self, other) -> bool: 70 return other in self.__cache 71 72 def get( 73 self, 74 task_name: str, 75 task_def: Optional[TaskDef] = None, 76 task_type: Optional[str] = None, 77 parent: Optional["PoeTask.TaskSpec"] = None, 78 ) -> "PoeTask.TaskSpec": 79 if task_def and parent: 80 # This is probably a subtask and will be cached by the parent task_spec 81 if not task_type: 82 task_type = PoeTask.resolve_task_type(task_def, self.config) 83 assert task_type 84 return self.create( 85 task_name, task_type, task_def, source=parent.source, parent=parent 86 ) 87 88 if task_name not in self.__cache: 89 self.load(task_name) 90 91 return self.__cache[task_name] 92 93 def create( 94 self, 95 task_name: str, 96 task_type: str, 97 task_def: TaskDef, 98 source: "ConfigPartition", 99 parent: Optional["PoeTask.TaskSpec"] = None, 100 ) -> "PoeTask.TaskSpec": 101 """ 102 A parent task should be provided when this task is defined inline within another 103 task, for example as part of a sequence. 104 """ 105 if not isinstance(task_def, dict): 106 task_def = {task_type: task_def} 107 108 return PoeTask.lookup_task_spec_cls(task_type)( 109 name=task_name, 110 task_def=task_def, 111 factory=self, 112 source=source, 113 parent=parent, 114 ) 115 116 def load_all(self): 117 for task_name in self.config.task_names: 118 self.load(task_name) 119 120 return self 121 122 def load(self, task_name: str): 123 task_def, config_partition = self.config.lookup_task(task_name) 124 125 if task_def is None or config_partition is None: 126 raise PoeException(f"Cannot instantiate unknown task {task_name!r}") 127 128 task_type = PoeTask.resolve_task_type(task_def, self.config) 129 if not task_type: 130 raise ConfigValidationError( 131 "Task definition must be a string, a list, or a table including exactly" 132 " one task key\n" 133 f"Available task keys: {set(PoeTask.get_task_types())!r}", 134 task_name=task_name, 135 filename=( 136 None if config_partition.is_primary else str(config_partition.path) 137 ), 138 ) 139 140 self.__cache[task_name] = self.create( 141 task_name, task_type, task_def, source=config_partition 142 ) 143 144 def __iter__(self): 145 return iter(self.__cache.values())
72 def get( 73 self, 74 task_name: str, 75 task_def: Optional[TaskDef] = None, 76 task_type: Optional[str] = None, 77 parent: Optional["PoeTask.TaskSpec"] = None, 78 ) -> "PoeTask.TaskSpec": 79 if task_def and parent: 80 # This is probably a subtask and will be cached by the parent task_spec 81 if not task_type: 82 task_type = PoeTask.resolve_task_type(task_def, self.config) 83 assert task_type 84 return self.create( 85 task_name, task_type, task_def, source=parent.source, parent=parent 86 ) 87 88 if task_name not in self.__cache: 89 self.load(task_name) 90 91 return self.__cache[task_name]
93 def create( 94 self, 95 task_name: str, 96 task_type: str, 97 task_def: TaskDef, 98 source: "ConfigPartition", 99 parent: Optional["PoeTask.TaskSpec"] = None, 100 ) -> "PoeTask.TaskSpec": 101 """ 102 A parent task should be provided when this task is defined inline within another 103 task, for example as part of a sequence. 104 """ 105 if not isinstance(task_def, dict): 106 task_def = {task_type: task_def} 107 108 return PoeTask.lookup_task_spec_cls(task_type)( 109 name=task_name, 110 task_def=task_def, 111 factory=self, 112 source=source, 113 parent=parent, 114 )
A parent task should be provided when this task is defined inline within another task, for example as part of a sequence.
122 def load(self, task_name: str): 123 task_def, config_partition = self.config.lookup_task(task_name) 124 125 if task_def is None or config_partition is None: 126 raise PoeException(f"Cannot instantiate unknown task {task_name!r}") 127 128 task_type = PoeTask.resolve_task_type(task_def, self.config) 129 if not task_type: 130 raise ConfigValidationError( 131 "Task definition must be a string, a list, or a table including exactly" 132 " one task key\n" 133 f"Available task keys: {set(PoeTask.get_task_types())!r}", 134 task_name=task_name, 135 filename=( 136 None if config_partition.is_primary else str(config_partition.path) 137 ), 138 ) 139 140 self.__cache[task_name] = self.create( 141 task_name, task_type, task_def, source=config_partition 142 )
148class TaskContext(NamedTuple): 149 """ 150 Collection of contextual config inherited from a parent task to a child task 151 """ 152 153 config: "PoeConfig" 154 cwd: str 155 ui: "PoeUi" 156 specs: "TaskSpecFactory" 157 158 @classmethod 159 def from_task(cls, parent_task: "PoeTask"): 160 return cls( 161 config=parent_task.ctx.config, 162 cwd=str(parent_task.spec.options.get("cwd", parent_task.ctx.cwd)), 163 specs=parent_task.ctx.specs, 164 ui=parent_task.ctx.ui, 165 )
Collection of contextual config inherited from a parent task to a child task
Create new instance of TaskContext(config, cwd, ui, specs)
Inherited Members
- builtins.tuple
- index
- count
168class PoeTask(metaclass=MetaPoeTask): 169 __key__: ClassVar[str] 170 __content_type__: ClassVar[Type] = str 171 172 class TaskOptions(PoeOptions): 173 args: Optional[Union[dict, list]] = None 174 capture_stdout: Optional[str] = None 175 cwd: Optional[str] = None 176 deps: Optional[Sequence[str]] = None 177 env: Optional[dict] = None 178 envfile: Optional[Union[str, list]] = None 179 executor: Optional[dict] = None 180 help: Optional[str] = None 181 uses: Optional[dict] = None 182 183 def validate(self): 184 """ 185 Validation rules that don't require any extra context go here. 186 """ 187 188 class TaskSpec: 189 name: str 190 content: TaskContent 191 options: "PoeTask.TaskOptions" 192 task_type: Type["PoeTask"] 193 source: "ConfigPartition" 194 parent: Optional["PoeTask.TaskSpec"] = None 195 196 _args: Optional["PoeTaskArgs"] = None 197 198 def __init__( 199 self, 200 name: str, 201 task_def: Dict[str, Any], 202 factory: TaskSpecFactory, 203 source: "ConfigPartition", 204 parent: Optional["PoeTask.TaskSpec"] = None, 205 ): 206 self.name = name 207 self.content = task_def[self.task_type.__key__] 208 self.options = self._parse_options(task_def) 209 self.source = source 210 self.parent = parent 211 212 def _parse_options(self, task_def: Dict[str, Any]): 213 try: 214 return next( 215 self.task_type.TaskOptions.parse( 216 task_def, extra_keys=(self.task_type.__key__,) 217 ) 218 ) 219 except ConfigValidationError as error: 220 error.task_name = self.name 221 raise 222 223 def get_task_env( 224 self, 225 parent_env: "EnvVarsManager", 226 uses_values: Optional[Mapping[str, str]] = None, 227 ) -> "EnvVarsManager": 228 """ 229 Resolve the EnvVarsManager for this task, relative to the given parent_env 230 """ 231 task_envfile = self.options.get("envfile") 232 task_env = self.options.get("env") 233 234 result = parent_env.clone() 235 236 # Include env vars from outputs of upstream dependencies 237 if uses_values: 238 result.update(uses_values) 239 240 result.set("POE_CONF_DIR", str(self.source.config_dir)) 241 result.apply_env_config( 242 task_envfile, 243 task_env, 244 config_dir=self.source.config_dir, 245 config_working_dir=self.source.cwd, 246 ) 247 248 return result 249 250 @property 251 def args(self) -> Optional["PoeTaskArgs"]: 252 from .args import PoeTaskArgs 253 254 if not self._args and self.options.args: 255 self._args = PoeTaskArgs(self.options.args, self.name) 256 257 return self._args 258 259 def create_task( 260 self, 261 invocation: Tuple[str, ...], 262 ctx: TaskContext, 263 capture_stdout: Union[str, bool] = False, 264 ) -> "PoeTask": 265 return self.task_type( 266 spec=self, 267 invocation=invocation, 268 capture_stdout=capture_stdout, 269 ctx=ctx, 270 ) 271 272 def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory): 273 try: 274 self._base_validations(config, task_specs) 275 self._task_validations(config, task_specs) 276 except ConfigValidationError as error: 277 error.task_name = self.name 278 raise 279 280 def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 281 """ 282 Perform validations on this TaskSpec that apply to all task types 283 """ 284 if not (self.name[0].isalpha() or self.name[0] == "_"): 285 raise ConfigValidationError( 286 "Task names must start with a letter or underscore." 287 ) 288 289 if not self.parent and not _TASK_NAME_PATTERN.match(self.name): 290 raise ConfigValidationError( 291 "Task names characters must be alphanumeric, colon, underscore or " 292 "dash." 293 ) 294 295 if not isinstance(self.content, self.task_type.__content_type__): 296 raise ConfigValidationError( 297 f"Content for {self.task_type.__name__} must be a " 298 f"{self.task_type.__content_type__.__name__}" 299 ) 300 301 if self.options.deps: 302 for dep in self.options.deps: 303 dep_task_name = dep.split(" ", 1)[0] 304 if dep_task_name not in task_specs: 305 raise ConfigValidationError( 306 "'deps' option includes reference to unknown task: " 307 f"{dep_task_name!r}" 308 ) 309 310 if task_specs.get(dep_task_name).options.get("use_exec", False): 311 raise ConfigValidationError( 312 f"'deps' option includes reference to task with " 313 f"'use_exec' set to true: {dep_task_name!r}" 314 ) 315 316 if self.options.uses: 317 from ..helpers import is_valid_env_var 318 319 for key, dep in self.options.uses.items(): 320 if not is_valid_env_var(key): 321 raise ConfigValidationError( 322 f"'uses' option includes invalid key: {key!r}" 323 ) 324 325 dep_task_name = dep.split(" ", 1)[0] 326 if dep_task_name not in task_specs: 327 raise ConfigValidationError( 328 "'uses' options includes reference to unknown task: " 329 f"{dep_task_name!r}" 330 ) 331 332 referenced_task = task_specs.get(dep_task_name) 333 if referenced_task.options.get("use_exec", False): 334 raise ConfigValidationError( 335 f"'uses' option references task with 'use_exec' set to " 336 f"true: {dep_task_name!r}" 337 ) 338 if referenced_task.options.get("capture_stdout"): 339 raise ConfigValidationError( 340 f"'uses' option references task with 'capture_stdout' " 341 f"option set: {dep_task_name!r}" 342 ) 343 344 def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 345 """ 346 Perform validations on this TaskSpec that apply to a specific task type 347 """ 348 349 spec: TaskSpec 350 ctx: TaskContext 351 _parsed_args: Optional[Tuple[Dict[str, str], Tuple[str, ...]]] = None 352 353 __task_types: ClassVar[Dict[str, Type["PoeTask"]]] = {} 354 __upstream_invocations: Optional[ 355 Dict[str, Union[List[Tuple[str, ...]], Dict[str, Tuple[str, ...]]]] 356 ] = None 357 358 def __init__( 359 self, 360 spec: TaskSpec, 361 invocation: Tuple[str, ...], 362 ctx: TaskContext, 363 capture_stdout: Union[str, bool] = False, 364 ): 365 self.spec = spec 366 self.invocation = invocation 367 self.ctx = ctx 368 self.capture_stdout = spec.options.capture_stdout or capture_stdout 369 self._is_windows = sys.platform == "win32" 370 371 @property 372 def name(self): 373 return self.spec.name 374 375 @classmethod 376 def lookup_task_spec_cls(cls, task_key: str) -> Type[TaskSpec]: 377 return cls.__task_types[task_key].TaskSpec 378 379 @classmethod 380 def resolve_task_type( 381 cls, 382 task_def: TaskDef, 383 config: "PoeConfig", 384 array_item: Union[bool, str] = False, 385 ) -> Optional[str]: 386 if isinstance(task_def, str): 387 if array_item: 388 return ( 389 array_item 390 if isinstance(array_item, str) 391 else config.default_array_item_task_type 392 ) 393 else: 394 return config.default_task_type 395 396 elif isinstance(task_def, dict): 397 task_type_keys = set(task_def.keys()).intersection(cls.__task_types) 398 if len(task_type_keys) == 1: 399 return next(iter(task_type_keys)) 400 401 elif isinstance(task_def, list): 402 return config.default_array_task_type 403 404 return None 405 406 def _parse_named_args( 407 self, extra_args: Sequence[str], env: "EnvVarsManager" 408 ) -> Optional[Dict[str, str]]: 409 if task_args := self.spec.args: 410 return task_args.parse(extra_args, env, self.ctx.ui.program_name) 411 412 return None 413 414 def get_parsed_arguments( 415 self, env: "EnvVarsManager" 416 ) -> Tuple[Dict[str, str], Tuple[str, ...]]: 417 if self._parsed_args is None: 418 all_args = self.invocation[1:] 419 420 if task_args := self.spec.args: 421 try: 422 split_index = all_args.index("--") 423 option_args = all_args[:split_index] 424 extra_args = all_args[split_index + 1 :] 425 except ValueError: 426 option_args = all_args 427 extra_args = tuple() 428 429 self._parsed_args = ( 430 task_args.parse(option_args, env, self.ctx.ui.program_name), 431 extra_args, 432 ) 433 434 else: 435 self._parsed_args = ({}, all_args) 436 437 return self._parsed_args 438 439 def run( 440 self, 441 context: "RunContext", 442 parent_env: Optional["EnvVarsManager"] = None, 443 ) -> int: 444 """ 445 Run this task 446 """ 447 448 if environ.get("POE_DEBUG"): 449 task_type_key = self.__key__ # type: ignore[attr-defined] 450 print(f" * Running {task_type_key}:{self.name}") 451 print(f" . Invocation {self.invocation!r}") 452 453 upstream_invocations = self._get_upstream_invocations(context) 454 455 if context.dry and upstream_invocations.get("uses", {}): 456 self._print_action( 457 ( 458 "unresolved dependency task results via uses option for task " 459 f"{self.name!r}" 460 ), 461 dry=True, 462 unresolved=True, 463 ) 464 return 0 465 466 task_env = self.spec.get_task_env( 467 parent_env or context.env, 468 context._get_dep_values(upstream_invocations["uses"]), 469 ) 470 471 if environ.get("POE_DEBUG"): 472 named_arg_values, extra_args = self.get_parsed_arguments(task_env) 473 print(f" . Parsed args {named_arg_values!r}") 474 print(f" . Extra args {extra_args!r}") 475 476 return self._handle_run(context, task_env) 477 478 def _handle_run( 479 self, 480 context: "RunContext", 481 env: "EnvVarsManager", 482 ) -> int: 483 """ 484 This method must be implemented by a subclass and return a single executor 485 result. 486 """ 487 raise NotImplementedError 488 489 def _get_executor(self, context: "RunContext", env: "EnvVarsManager"): 490 return context.get_executor( 491 self.invocation, 492 env, 493 working_dir=self.get_working_dir(env), 494 executor_config=self.spec.options.get("executor"), 495 capture_stdout=self.capture_stdout, 496 ) 497 498 def get_working_dir( 499 self, 500 env: "EnvVarsManager", 501 ) -> Path: 502 cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd)) 503 working_dir = Path(cwd_option) 504 505 if not working_dir.is_absolute(): 506 working_dir = self.ctx.config.project_dir / working_dir 507 508 return working_dir 509 510 def iter_upstream_tasks( 511 self, context: "RunContext" 512 ) -> Iterator[Tuple[str, "PoeTask"]]: 513 invocations = self._get_upstream_invocations(context) 514 for invocation in invocations["deps"]: 515 yield ("", self._instantiate_dep(invocation, capture_stdout=False)) 516 for key, invocation in invocations["uses"].items(): 517 yield (key, self._instantiate_dep(invocation, capture_stdout=True)) 518 519 def _get_upstream_invocations(self, context: "RunContext"): 520 """ 521 NB. this memoization assumes the context (and contained env vars) will be the 522 same in all instances for the lifetime of this object. Whilst this should be OK 523 for all current usecases is it strictly speaking something that this object 524 should not know enough to safely assume. So we probably want to revisit this. 525 """ 526 import shlex 527 528 options = self.spec.options 529 530 if self.__upstream_invocations is None: 531 env = self.spec.get_task_env(context.env) 532 env.update(self.get_parsed_arguments(env)[0]) 533 534 self.__upstream_invocations = { 535 "deps": [ 536 tuple(shlex.split(env.fill_template(task_ref))) 537 for task_ref in options.get("deps", tuple()) 538 ], 539 "uses": { 540 key: tuple(shlex.split(env.fill_template(task_ref))) 541 for key, task_ref in options.get("uses", {}).items() 542 }, 543 } 544 545 return self.__upstream_invocations 546 547 def _instantiate_dep( 548 self, invocation: Tuple[str, ...], capture_stdout: bool 549 ) -> "PoeTask": 550 return self.ctx.specs.get(invocation[0]).create_task( 551 invocation=invocation, 552 ctx=TaskContext( 553 config=self.ctx.config, 554 cwd=str(self.ctx.config.project_dir), 555 specs=self.ctx.specs, 556 ui=self.ctx.ui, 557 ), 558 capture_stdout=capture_stdout, 559 ) 560 561 def has_deps(self) -> bool: 562 return bool( 563 self.spec.options.get("deps", False) or self.spec.options.get("uses", False) 564 ) 565 566 @classmethod 567 def is_task_type( 568 cls, task_def_key: str, content_type: Optional[Type] = None 569 ) -> bool: 570 """ 571 Checks whether the given key identifies a known task type. 572 Optionally also check whether the given content_type matches the type of content 573 for this tasks type. 574 """ 575 return task_def_key in cls.__task_types and ( 576 content_type is None 577 or cls.__task_types[task_def_key].__content_type__ is content_type 578 ) 579 580 @classmethod 581 def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]: 582 if content_type: 583 return tuple( 584 task_type 585 for task_type, task_cls in cls.__task_types.items() 586 if task_cls.__content_type__ is content_type 587 ) 588 return tuple(task_type for task_type in cls.__task_types.keys()) 589 590 def _print_action(self, action: str, dry: bool, unresolved: bool = False): 591 """ 592 Print the action taken by a task just before executing it. 593 """ 594 min_verbosity = -1 if dry else 0 595 arrow = "??" if unresolved else "<=" if self.capture_stdout else "=>" 596 self.ctx.ui.print_msg( 597 f"<hl>Poe {arrow}</hl> <action>{action}</action>", min_verbosity 598 ) 599 600 class Error(Exception): 601 pass
358 def __init__( 359 self, 360 spec: TaskSpec, 361 invocation: Tuple[str, ...], 362 ctx: TaskContext, 363 capture_stdout: Union[str, bool] = False, 364 ): 365 self.spec = spec 366 self.invocation = invocation 367 self.ctx = ctx 368 self.capture_stdout = spec.options.capture_stdout or capture_stdout 369 self._is_windows = sys.platform == "win32"
379 @classmethod 380 def resolve_task_type( 381 cls, 382 task_def: TaskDef, 383 config: "PoeConfig", 384 array_item: Union[bool, str] = False, 385 ) -> Optional[str]: 386 if isinstance(task_def, str): 387 if array_item: 388 return ( 389 array_item 390 if isinstance(array_item, str) 391 else config.default_array_item_task_type 392 ) 393 else: 394 return config.default_task_type 395 396 elif isinstance(task_def, dict): 397 task_type_keys = set(task_def.keys()).intersection(cls.__task_types) 398 if len(task_type_keys) == 1: 399 return next(iter(task_type_keys)) 400 401 elif isinstance(task_def, list): 402 return config.default_array_task_type 403 404 return None
414 def get_parsed_arguments( 415 self, env: "EnvVarsManager" 416 ) -> Tuple[Dict[str, str], Tuple[str, ...]]: 417 if self._parsed_args is None: 418 all_args = self.invocation[1:] 419 420 if task_args := self.spec.args: 421 try: 422 split_index = all_args.index("--") 423 option_args = all_args[:split_index] 424 extra_args = all_args[split_index + 1 :] 425 except ValueError: 426 option_args = all_args 427 extra_args = tuple() 428 429 self._parsed_args = ( 430 task_args.parse(option_args, env, self.ctx.ui.program_name), 431 extra_args, 432 ) 433 434 else: 435 self._parsed_args = ({}, all_args) 436 437 return self._parsed_args
439 def run( 440 self, 441 context: "RunContext", 442 parent_env: Optional["EnvVarsManager"] = None, 443 ) -> int: 444 """ 445 Run this task 446 """ 447 448 if environ.get("POE_DEBUG"): 449 task_type_key = self.__key__ # type: ignore[attr-defined] 450 print(f" * Running {task_type_key}:{self.name}") 451 print(f" . Invocation {self.invocation!r}") 452 453 upstream_invocations = self._get_upstream_invocations(context) 454 455 if context.dry and upstream_invocations.get("uses", {}): 456 self._print_action( 457 ( 458 "unresolved dependency task results via uses option for task " 459 f"{self.name!r}" 460 ), 461 dry=True, 462 unresolved=True, 463 ) 464 return 0 465 466 task_env = self.spec.get_task_env( 467 parent_env or context.env, 468 context._get_dep_values(upstream_invocations["uses"]), 469 ) 470 471 if environ.get("POE_DEBUG"): 472 named_arg_values, extra_args = self.get_parsed_arguments(task_env) 473 print(f" . Parsed args {named_arg_values!r}") 474 print(f" . Extra args {extra_args!r}") 475 476 return self._handle_run(context, task_env)
Run this task
498 def get_working_dir( 499 self, 500 env: "EnvVarsManager", 501 ) -> Path: 502 cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd)) 503 working_dir = Path(cwd_option) 504 505 if not working_dir.is_absolute(): 506 working_dir = self.ctx.config.project_dir / working_dir 507 508 return working_dir
510 def iter_upstream_tasks( 511 self, context: "RunContext" 512 ) -> Iterator[Tuple[str, "PoeTask"]]: 513 invocations = self._get_upstream_invocations(context) 514 for invocation in invocations["deps"]: 515 yield ("", self._instantiate_dep(invocation, capture_stdout=False)) 516 for key, invocation in invocations["uses"].items(): 517 yield (key, self._instantiate_dep(invocation, capture_stdout=True))
566 @classmethod 567 def is_task_type( 568 cls, task_def_key: str, content_type: Optional[Type] = None 569 ) -> bool: 570 """ 571 Checks whether the given key identifies a known task type. 572 Optionally also check whether the given content_type matches the type of content 573 for this tasks type. 574 """ 575 return task_def_key in cls.__task_types and ( 576 content_type is None 577 or cls.__task_types[task_def_key].__content_type__ is content_type 578 )
Checks whether the given key identifies a known task type. Optionally also check whether the given content_type matches the type of content for this tasks type.
580 @classmethod 581 def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]: 582 if content_type: 583 return tuple( 584 task_type 585 for task_type, task_cls in cls.__task_types.items() 586 if task_cls.__content_type__ is content_type 587 ) 588 return tuple(task_type for task_type in cls.__task_types.keys())
172 class TaskOptions(PoeOptions): 173 args: Optional[Union[dict, list]] = None 174 capture_stdout: Optional[str] = None 175 cwd: Optional[str] = None 176 deps: Optional[Sequence[str]] = None 177 env: Optional[dict] = None 178 envfile: Optional[Union[str, list]] = None 179 executor: Optional[dict] = None 180 help: Optional[str] = None 181 uses: Optional[dict] = None 182 183 def validate(self): 184 """ 185 Validation rules that don't require any extra context go here. 186 """
A special kind of config object that parses options ...
183 def validate(self): 184 """ 185 Validation rules that don't require any extra context go here. 186 """
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
188 class TaskSpec: 189 name: str 190 content: TaskContent 191 options: "PoeTask.TaskOptions" 192 task_type: Type["PoeTask"] 193 source: "ConfigPartition" 194 parent: Optional["PoeTask.TaskSpec"] = None 195 196 _args: Optional["PoeTaskArgs"] = None 197 198 def __init__( 199 self, 200 name: str, 201 task_def: Dict[str, Any], 202 factory: TaskSpecFactory, 203 source: "ConfigPartition", 204 parent: Optional["PoeTask.TaskSpec"] = None, 205 ): 206 self.name = name 207 self.content = task_def[self.task_type.__key__] 208 self.options = self._parse_options(task_def) 209 self.source = source 210 self.parent = parent 211 212 def _parse_options(self, task_def: Dict[str, Any]): 213 try: 214 return next( 215 self.task_type.TaskOptions.parse( 216 task_def, extra_keys=(self.task_type.__key__,) 217 ) 218 ) 219 except ConfigValidationError as error: 220 error.task_name = self.name 221 raise 222 223 def get_task_env( 224 self, 225 parent_env: "EnvVarsManager", 226 uses_values: Optional[Mapping[str, str]] = None, 227 ) -> "EnvVarsManager": 228 """ 229 Resolve the EnvVarsManager for this task, relative to the given parent_env 230 """ 231 task_envfile = self.options.get("envfile") 232 task_env = self.options.get("env") 233 234 result = parent_env.clone() 235 236 # Include env vars from outputs of upstream dependencies 237 if uses_values: 238 result.update(uses_values) 239 240 result.set("POE_CONF_DIR", str(self.source.config_dir)) 241 result.apply_env_config( 242 task_envfile, 243 task_env, 244 config_dir=self.source.config_dir, 245 config_working_dir=self.source.cwd, 246 ) 247 248 return result 249 250 @property 251 def args(self) -> Optional["PoeTaskArgs"]: 252 from .args import PoeTaskArgs 253 254 if not self._args and self.options.args: 255 self._args = PoeTaskArgs(self.options.args, self.name) 256 257 return self._args 258 259 def create_task( 260 self, 261 invocation: Tuple[str, ...], 262 ctx: TaskContext, 263 capture_stdout: Union[str, bool] = False, 264 ) -> "PoeTask": 265 return self.task_type( 266 spec=self, 267 invocation=invocation, 268 capture_stdout=capture_stdout, 269 ctx=ctx, 270 ) 271 272 def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory): 273 try: 274 self._base_validations(config, task_specs) 275 self._task_validations(config, task_specs) 276 except ConfigValidationError as error: 277 error.task_name = self.name 278 raise 279 280 def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 281 """ 282 Perform validations on this TaskSpec that apply to all task types 283 """ 284 if not (self.name[0].isalpha() or self.name[0] == "_"): 285 raise ConfigValidationError( 286 "Task names must start with a letter or underscore." 287 ) 288 289 if not self.parent and not _TASK_NAME_PATTERN.match(self.name): 290 raise ConfigValidationError( 291 "Task names characters must be alphanumeric, colon, underscore or " 292 "dash." 293 ) 294 295 if not isinstance(self.content, self.task_type.__content_type__): 296 raise ConfigValidationError( 297 f"Content for {self.task_type.__name__} must be a " 298 f"{self.task_type.__content_type__.__name__}" 299 ) 300 301 if self.options.deps: 302 for dep in self.options.deps: 303 dep_task_name = dep.split(" ", 1)[0] 304 if dep_task_name not in task_specs: 305 raise ConfigValidationError( 306 "'deps' option includes reference to unknown task: " 307 f"{dep_task_name!r}" 308 ) 309 310 if task_specs.get(dep_task_name).options.get("use_exec", False): 311 raise ConfigValidationError( 312 f"'deps' option includes reference to task with " 313 f"'use_exec' set to true: {dep_task_name!r}" 314 ) 315 316 if self.options.uses: 317 from ..helpers import is_valid_env_var 318 319 for key, dep in self.options.uses.items(): 320 if not is_valid_env_var(key): 321 raise ConfigValidationError( 322 f"'uses' option includes invalid key: {key!r}" 323 ) 324 325 dep_task_name = dep.split(" ", 1)[0] 326 if dep_task_name not in task_specs: 327 raise ConfigValidationError( 328 "'uses' options includes reference to unknown task: " 329 f"{dep_task_name!r}" 330 ) 331 332 referenced_task = task_specs.get(dep_task_name) 333 if referenced_task.options.get("use_exec", False): 334 raise ConfigValidationError( 335 f"'uses' option references task with 'use_exec' set to " 336 f"true: {dep_task_name!r}" 337 ) 338 if referenced_task.options.get("capture_stdout"): 339 raise ConfigValidationError( 340 f"'uses' option references task with 'capture_stdout' " 341 f"option set: {dep_task_name!r}" 342 ) 343 344 def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 345 """ 346 Perform validations on this TaskSpec that apply to a specific task type 347 """
198 def __init__( 199 self, 200 name: str, 201 task_def: Dict[str, Any], 202 factory: TaskSpecFactory, 203 source: "ConfigPartition", 204 parent: Optional["PoeTask.TaskSpec"] = None, 205 ): 206 self.name = name 207 self.content = task_def[self.task_type.__key__] 208 self.options = self._parse_options(task_def) 209 self.source = source 210 self.parent = parent
223 def get_task_env( 224 self, 225 parent_env: "EnvVarsManager", 226 uses_values: Optional[Mapping[str, str]] = None, 227 ) -> "EnvVarsManager": 228 """ 229 Resolve the EnvVarsManager for this task, relative to the given parent_env 230 """ 231 task_envfile = self.options.get("envfile") 232 task_env = self.options.get("env") 233 234 result = parent_env.clone() 235 236 # Include env vars from outputs of upstream dependencies 237 if uses_values: 238 result.update(uses_values) 239 240 result.set("POE_CONF_DIR", str(self.source.config_dir)) 241 result.apply_env_config( 242 task_envfile, 243 task_env, 244 config_dir=self.source.config_dir, 245 config_working_dir=self.source.cwd, 246 ) 247 248 return result
Resolve the EnvVarsManager for this task, relative to the given parent_env
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args