
  1import enum
  2import json
  3import os
  4import re
  5import typing as t
  6from collections import abc
  7from collections import deque
  8from random import choice
  9from random import randrange
 10from threading import Lock
 11from types import CodeType
 12from urllib.parse import quote_from_bytes
 14import markupsafe
 17    import typing_extensions as te
 19F = t.TypeVar("F", bound=t.Callable[..., t.Any])
 21# special singleton representing missing values for the runtime
 22missing: t.Any = type("MissingType", (), {"__repr__": lambda x: "missing"})()
 24internal_code: t.MutableSet[CodeType] = set()
 26concat = "".join
 29def pass_context(f: F) -> F:
 30    """Pass the :class:`~jinja2.runtime.Context` as the first argument
 31    to the decorated function when called while rendering a template.
 33    Can be used on functions, filters, and tests.
 35    If only ``Context.eval_context`` is needed, use
 36    :func:`pass_eval_context`. If only ``Context.environment`` is
 37    needed, use :func:`pass_environment`.
 39    .. versionadded:: 3.0.0
 40        Replaces ``contextfunction`` and ``contextfilter``.
 41    """
 42    f.jinja_pass_arg = _PassArg.context  # type: ignore
 43    return f
 46def pass_eval_context(f: F) -> F:
 47    """Pass the :class:`~jinja2.nodes.EvalContext` as the first argument
 48    to the decorated function when called while rendering a template.
 49    See :ref:`eval-context`.
 51    Can be used on functions, filters, and tests.
 53    If only ``EvalContext.environment`` is needed, use
 54    :func:`pass_environment`.
 56    .. versionadded:: 3.0.0
 57        Replaces ``evalcontextfunction`` and ``evalcontextfilter``.
 58    """
 59    f.jinja_pass_arg = _PassArg.eval_context  # type: ignore
 60    return f
 63def pass_environment(f: F) -> F:
 64    """Pass the :class:`~jinja2.Environment` as the first argument to
 65    the decorated function when called while rendering a template.
 67    Can be used on functions, filters, and tests.
 69    .. versionadded:: 3.0.0
 70        Replaces ``environmentfunction`` and ``environmentfilter``.
 71    """
 72    f.jinja_pass_arg = _PassArg.environment  # type: ignore
 73    return f
 76class _PassArg(enum.Enum):
 77    context = enum.auto()
 78    eval_context = enum.auto()
 79    environment = enum.auto()
 81    @classmethod
 82    def from_obj(cls, obj: F) -> t.Optional["_PassArg"]:
 83        if hasattr(obj, "jinja_pass_arg"):
 84            return obj.jinja_pass_arg  # type: ignore
 86        return None
 89def internalcode(f: F) -> F:
 90    """Marks the function as internally used"""
 91    internal_code.add(f.__code__)
 92    return f
 95def is_undefined(obj: t.Any) -> bool:
 96    """Check if the object passed is undefined.  This does nothing more than
 97    performing an instance check against :class:`Undefined` but looks nicer.
 98    This can be used for custom filters or tests that want to react to
 99    undefined variables.  For example a custom default filter can look like
100    this::
102        def default(var, default=''):
103            if is_undefined(var):
104                return default
105            return var
106    """
107    from .runtime import Undefined
109    return isinstance(obj, Undefined)
112def consume(iterable: t.Iterable[t.Any]) -> None:
113    """Consumes an iterable without doing anything with it."""
114    for _ in iterable:
115        pass
118def clear_caches() -> None:
119    """Jinja keeps internal caches for environments and lexers.  These are
120    used so that Jinja doesn't have to recreate environments and lexers all
121    the time.  Normally you don't have to care about that but if you are
122    measuring memory consumption you may want to clean the caches.
123    """
124    from .environment import get_spontaneous_environment
125    from .lexer import _lexer_cache
127    get_spontaneous_environment.cache_clear()
128    _lexer_cache.clear()
131def import_string(import_name: str, silent: bool = False) -> t.Any:
132    """Imports an object based on a string.  This is useful if you want to
133    use import paths as endpoints or something similar.  An import path can
134    be specified either in dotted notation (``xml.sax.saxutils.escape``)
135    or with a colon as object delimiter (``xml.sax.saxutils:escape``).
137    If the `silent` is True the return value will be `None` if the import
138    fails.
140    :return: imported object
141    """
142    try:
143        if ":" in import_name:
144            module, obj = import_name.split(":", 1)
145        elif "." in import_name:
146            module, _, obj = import_name.rpartition(".")
147        else:
148            return __import__(import_name)
149        return getattr(__import__(module, None, None, [obj]), obj)
150    except (ImportError, AttributeError):
151        if not silent:
152            raise
155def open_if_exists(filename: str, mode: str = "rb") -> t.Optional[t.IO[t.Any]]:
156    """Returns a file descriptor for the filename if that file exists,
157    otherwise ``None``.
158    """
159    if not os.path.isfile(filename):
160        return None
162    return open(filename, mode)
165def object_type_repr(obj: t.Any) -> str:
166    """Returns the name of the object's type.  For some recognized
167    singletons the name of the object is returned instead. (For
168    example for `None` and `Ellipsis`).
169    """
170    if obj is None:
171        return "None"
172    elif obj is Ellipsis:
173        return "Ellipsis"
175    cls = type(obj)
177    if cls.__module__ == "builtins":
178        return f"{cls.__name__} object"
180    return f"{cls.__module__}.{cls.__name__} object"
183def pformat(obj: t.Any) -> str:
184    """Format an object using :func:`pprint.pformat`."""
185    from pprint import pformat
187    return pformat(obj)
190_http_re = re.compile(
191    r"""
192    ^
193    (
194        (https?://|www\.)  # scheme or www
195        (([\w%-]+\.)+)?  # subdomain
196        (
197            [a-z]{2,63}  # basic tld
198        |
199            xn--[\w%]{2,59}  # idna tld
200        )
201    |
202        ([\w%-]{2,63}\.)+  # basic domain
203        (com|net|int|edu|gov|org|info|mil)  # basic tld
204    |
205        (https?://)  # scheme
206        (
207            (([\d]{1,3})(\.[\d]{1,3}){3})  # IPv4
208        |
209            (\[([\da-f]{0,4}:){2}([\da-f]{0,4}:?){1,6}])  # IPv6
210        )
211    )
212    (?::[\d]{1,5})?  # port
213    (?:[/?#]\S*)?  # path, query, and fragment
214    $
215    """,
216    re.IGNORECASE | re.VERBOSE,
218_email_re = re.compile(r"^\S+@\w[\w.-]*\.\w+$")
221def urlize(
222    text: str,
223    trim_url_limit: t.Optional[int] = None,
224    rel: t.Optional[str] = None,
225    target: t.Optional[str] = None,
226    extra_schemes: t.Optional[t.Iterable[str]] = None,
227) -> str:
228    """Convert URLs in text into clickable links.
230    This may not recognize links in some situations. Usually, a more
231    comprehensive formatter, such as a Markdown library, is a better
232    choice.
234    Works on ``http://``, ``https://``, ``www.``, ``mailto:``, and email
235    addresses. Links with trailing punctuation (periods, commas, closing
236    parentheses) and leading punctuation (opening parentheses) are
237    recognized excluding the punctuation. Email addresses that include
238    header fields are not recognized (for example,
239    ``mailto:address@example.com?cc=copy@example.com``).
241    :param text: Original text containing URLs to link.
242    :param trim_url_limit: Shorten displayed URL values to this length.
243    :param target: Add the ``target`` attribute to links.
244    :param rel: Add the ``rel`` attribute to links.
245    :param extra_schemes: Recognize URLs that start with these schemes
246        in addition to the default behavior.
248    .. versionchanged:: 3.0
249        The ``extra_schemes`` parameter was added.
251    .. versionchanged:: 3.0
252        Generate ``https://`` links for URLs without a scheme.
254    .. versionchanged:: 3.0
255        The parsing rules were updated. Recognize email addresses with
256        or without the ``mailto:`` scheme. Validate IP addresses. Ignore
257        parentheses and brackets in more cases.
258    """
259    if trim_url_limit is not None:
261        def trim_url(x: str) -> str:
262            if len(x) > trim_url_limit:
263                return f"{x[:trim_url_limit]}..."
265            return x
267    else:
269        def trim_url(x: str) -> str:
270            return x
272    words = re.split(r"(\s+)", str(markupsafe.escape(text)))
273    rel_attr = f' rel="{markupsafe.escape(rel)}"' if rel else ""
274    target_attr = f' target="{markupsafe.escape(target)}"' if target else ""
276    for i, word in enumerate(words):
277        head, middle, tail = "", word, ""
278        match = re.match(r"^([(<]|&lt;)+", middle)
280        if match:
281            head = match.group()
282            middle = middle[match.end() :]
284        # Unlike lead, which is anchored to the start of the string,
285        # need to check that the string ends with any of the characters
286        # before trying to match all of them, to avoid backtracking.
287        if middle.endswith((")", ">", ".", ",", "\n", "&gt;")):
288            match = re.search(r"([)>.,\n]|&gt;)+$", middle)
290            if match:
291                tail = match.group()
292                middle = middle[: match.start()]
294        # Prefer balancing parentheses in URLs instead of ignoring a
295        # trailing character.
296        for start_char, end_char in ("(", ")"), ("<", ">"), ("&lt;", "&gt;"):
297            start_count = middle.count(start_char)
299            if start_count <= middle.count(end_char):
300                # Balanced, or lighter on the left
301                continue
303            # Move as many as possible from the tail to balance
304            for _ in range(min(start_count, tail.count(end_char))):
305                end_index = tail.index(end_char) + len(end_char)
306                # Move anything in the tail before the end char too
307                middle += tail[:end_index]
308                tail = tail[end_index:]
310        if _http_re.match(middle):
311            if middle.startswith("https://") or middle.startswith("http://"):
312                middle = (
313                    f'<a href="{middle}"{rel_attr}{target_attr}>{trim_url(middle)}</a>'
314                )
315            else:
316                middle = (
317                    f'<a href="https://{middle}"{rel_attr}{target_attr}>'
318                    f"{trim_url(middle)}</a>"
319                )
321        elif middle.startswith("mailto:") and _email_re.match(middle[7:]):
322            middle = f'<a href="{middle}">{middle[7:]}</a>'
324        elif (
325            "@" in middle
326            and not middle.startswith("www.")
327            and ":" not in middle
328            and _email_re.match(middle)
329        ):
330            middle = f'<a href="mailto:{middle}">{middle}</a>'
332        elif extra_schemes is not None:
333            for scheme in extra_schemes:
334                if middle != scheme and middle.startswith(scheme):
335                    middle = f'<a href="{middle}"{rel_attr}{target_attr}>{middle}</a>'
337        words[i] = f"{head}{middle}{tail}"
339    return "".join(words)
342def generate_lorem_ipsum(
343    n: int = 5, html: bool = True, min: int = 20, max: int = 100
344) -> str:
345    """Generate some lorem ipsum for the template."""
346    from .constants import LOREM_IPSUM_WORDS
348    words = LOREM_IPSUM_WORDS.split()
349    result = []
351    for _ in range(n):
352        next_capitalized = True
353        last_comma = last_fullstop = 0
354        word = None
355        last = None
356        p = []
358        # each paragraph contains out of 20 to 100 words.
359        for idx, _ in enumerate(range(randrange(min, max))):
360            while True:
361                word = choice(words)
362                if word != last:
363                    last = word
364                    break
365            if next_capitalized:
366                word = word.capitalize()
367                next_capitalized = False
368            # add commas
369            if idx - randrange(3, 8) > last_comma:
370                last_comma = idx
371                last_fullstop += 2
372                word += ","
373            # add end of sentences
374            if idx - randrange(10, 20) > last_fullstop:
375                last_comma = last_fullstop = idx
376                word += "."
377                next_capitalized = True
378            p.append(word)
380        # ensure that the paragraph ends with a dot.
381        p_str = " ".join(p)
383        if p_str.endswith(","):
384            p_str = p_str[:-1] + "."
385        elif not p_str.endswith("."):
386            p_str += "."
388        result.append(p_str)
390    if not html:
391        return "\n\n".join(result)
392    return markupsafe.Markup(
393        "\n".join(f"<p>{markupsafe.escape(x)}</p>" for x in result)
394    )
397def url_quote(obj: t.Any, charset: str = "utf-8", for_qs: bool = False) -> str:
398    """Quote a string for use in a URL using the given charset.
400    :param obj: String or bytes to quote. Other types are converted to
401        string then encoded to bytes using the given charset.
402    :param charset: Encode text to bytes using this charset.
403    :param for_qs: Quote "/" and use "+" for spaces.
404    """
405    if not isinstance(obj, bytes):
406        if not isinstance(obj, str):
407            obj = str(obj)
409        obj = obj.encode(charset)
411    safe = b"" if for_qs else b"/"
412    rv = quote_from_bytes(obj, safe)
414    if for_qs:
415        rv = rv.replace("%20", "+")
417    return rv
421class LRUCache:
422    """A simple LRU Cache implementation."""
424    # this is fast for small capacities (something below 1000) but doesn't
425    # scale.  But as long as it's only used as storage for templates this
426    # won't do any harm.
428    def __init__(self, capacity: int) -> None:
429        self.capacity = capacity
430        self._mapping: t.Dict[t.Any, t.Any] = {}
431        self._queue: "te.Deque[t.Any]" = deque()
432        self._postinit()
434    def _postinit(self) -> None:
435        # alias all queue methods for faster lookup
436        self._popleft = self._queue.popleft
437        self._pop = self._queue.pop
438        self._remove = self._queue.remove
439        self._wlock = Lock()
440        self._append = self._queue.append
442    def __getstate__(self) -> t.Mapping[str, t.Any]:
443        return {
444            "capacity": self.capacity,
445            "_mapping": self._mapping,
446            "_queue": self._queue,
447        }
449    def __setstate__(self, d: t.Mapping[str, t.Any]) -> None:
450        self.__dict__.update(d)
451        self._postinit()
453    def __getnewargs__(self) -> t.Tuple[t.Any, ...]:
454        return (self.capacity,)
456    def copy(self) -> "LRUCache":
457        """Return a shallow copy of the instance."""
458        rv = self.__class__(self.capacity)
459        rv._mapping.update(self._mapping)
460        rv._queue.extend(self._queue)
461        return rv
463    def get(self, key: t.Any, default: t.Any = None) -> t.Any:
464        """Return an item from the cache dict or `default`"""
465        try:
466            return self[key]
467        except KeyError:
468            return default
470    def setdefault(self, key: t.Any, default: t.Any = None) -> t.Any:
471        """Set `default` if the key is not in the cache otherwise
472        leave unchanged. Return the value of this key.
473        """
474        try:
475            return self[key]
476        except KeyError:
477            self[key] = default
478            return default
480    def clear(self) -> None:
481        """Clear the cache."""
482        with self._wlock:
483            self._mapping.clear()
484            self._queue.clear()
486    def __contains__(self, key: t.Any) -> bool:
487        """Check if a key exists in this cache."""
488        return key in self._mapping
490    def __len__(self) -> int:
491        """Return the current size of the cache."""
492        return len(self._mapping)
494    def __repr__(self) -> str:
495        return f"<{type(self).__name__} {self._mapping!r}>"
497    def __getitem__(self, key: t.Any) -> t.Any:
498        """Get an item from the cache. Moves the item up so that it has the
499        highest priority then.
501        Raise a `KeyError` if it does not exist.
502        """
503        with self._wlock:
504            rv = self._mapping[key]
506            if self._queue[-1] != key:
507                try:
508                    self._remove(key)
509                except ValueError:
510                    # if something removed the key from the container
511                    # when we read, ignore the ValueError that we would
512                    # get otherwise.
513                    pass
515                self._append(key)
517            return rv
519    def __setitem__(self, key: t.Any, value: t.Any) -> None:
520        """Sets the value for an item. Moves the item up so that it
521        has the highest priority then.
522        """
523        with self._wlock:
524            if key in self._mapping:
525                self._remove(key)
526            elif len(self._mapping) == self.capacity:
527                del self._mapping[self._popleft()]
529            self._append(key)
530            self._mapping[key] = value
532    def __delitem__(self, key: t.Any) -> None:
533        """Remove an item from the cache dict.
534        Raise a `KeyError` if it does not exist.
535        """
536        with self._wlock:
537            del self._mapping[key]
539            try:
540                self._remove(key)
541            except ValueError:
542                pass
544    def items(self) -> t.Iterable[t.Tuple[t.Any, t.Any]]:
545        """Return a list of items."""
546        result = [(key, self._mapping[key]) for key in list(self._queue)]
547        result.reverse()
548        return result
550    def values(self) -> t.Iterable[t.Any]:
551        """Return a list of all values."""
552        return [x[1] for x in self.items()]
554    def keys(self) -> t.Iterable[t.Any]:
555        """Return a list of all keys ordered by most recent usage."""
556        return list(self)
558    def __iter__(self) -> t.Iterator[t.Any]:
559        return reversed(tuple(self._queue))
561    def __reversed__(self) -> t.Iterator[t.Any]:
562        """Iterate over the keys in the cache dict, oldest items
563        coming first.
564        """
565        return iter(tuple(self._queue))
567    __copy__ = copy
570def select_autoescape(
571    enabled_extensions: t.Collection[str] = ("html", "htm", "xml"),
572    disabled_extensions: t.Collection[str] = (),
573    default_for_string: bool = True,
574    default: bool = False,
575) -> t.Callable[[t.Optional[str]], bool]:
576    """Intelligently sets the initial value of autoescaping based on the
577    filename of the template.  This is the recommended way to configure
578    autoescaping if you do not want to write a custom function yourself.
580    If you want to enable it for all templates created from strings or
581    for all templates with `.html` and `.xml` extensions::
583        from jinja2 import Environment, select_autoescape
584        env = Environment(autoescape=select_autoescape(
585            enabled_extensions=('html', 'xml'),
586            default_for_string=True,
587        ))
589    Example configuration to turn it on at all times except if the template
590    ends with `.txt`::
592        from jinja2 import Environment, select_autoescape
593        env = Environment(autoescape=select_autoescape(
594            disabled_extensions=('txt',),
595            default_for_string=True,
596            default=True,
597        ))
599    The `enabled_extensions` is an iterable of all the extensions that
600    autoescaping should be enabled for.  Likewise `disabled_extensions` is
601    a list of all templates it should be disabled for.  If a template is
602    loaded from a string then the default from `default_for_string` is used.
603    If nothing matches then the initial value of autoescaping is set to the
604    value of `default`.
606    For security reasons this function operates case insensitive.
608    .. versionadded:: 2.9
609    """
610    enabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in enabled_extensions)
611    disabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in disabled_extensions)
613    def autoescape(template_name: t.Optional[str]) -> bool:
614        if template_name is None:
615            return default_for_string
616        template_name = template_name.lower()
617        if template_name.endswith(enabled_patterns):
618            return True
619        if template_name.endswith(disabled_patterns):
620            return False
621        return default
623    return autoescape
626def htmlsafe_json_dumps(
627    obj: t.Any, dumps: t.Optional[t.Callable[..., str]] = None, **kwargs: t.Any
628) -> markupsafe.Markup:
629    """Serialize an object to a string of JSON with :func:`json.dumps`,
630    then replace HTML-unsafe characters with Unicode escapes and mark
631    the result safe with :class:`~markupsafe.Markup`.
633    This is available in templates as the ``|tojson`` filter.
635    The following characters are escaped: ``<``, ``>``, ``&``, ``'``.
637    The returned string is safe to render in HTML documents and
638    ``<script>`` tags. The exception is in HTML attributes that are
639    double quoted; either use single quotes or the ``|forceescape``
640    filter.
642    :param obj: The object to serialize to JSON.
643    :param dumps: The ``dumps`` function to use. Defaults to
644        ``env.policies["json.dumps_function"]``, which defaults to
645        :func:`json.dumps`.
646    :param kwargs: Extra arguments to pass to ``dumps``. Merged onto
647        ``env.policies["json.dumps_kwargs"]``.
649    .. versionchanged:: 3.0
650        The ``dumper`` parameter is renamed to ``dumps``.
652    .. versionadded:: 2.9
653    """
654    if dumps is None:
655        dumps = json.dumps
657    return markupsafe.Markup(
658        dumps(obj, **kwargs)
659        .replace("<", "\\u003c")
660        .replace(">", "\\u003e")
661        .replace("&", "\\u0026")
662        .replace("'", "\\u0027")
663    )
666class Cycler:
667    """Cycle through values by yield them one at a time, then restarting
668    once the end is reached. Available as ``cycler`` in templates.
670    Similar to ``loop.cycle``, but can be used outside loops or across
671    multiple loops. For example, render a list of folders and files in a
672    list, alternating giving them "odd" and "even" classes.
674    .. code-block:: html+jinja
676        {% set row_class = cycler("odd", "even") %}
677        <ul class="browser">
678        {% for folder in folders %}
679          <li class="folder {{ row_class.next() }}">{{ folder }}
680        {% endfor %}
681        {% for file in files %}
682          <li class="file {{ row_class.next() }}">{{ file }}
683        {% endfor %}
684        </ul>
686    :param items: Each positional argument will be yielded in the order
687        given for each cycle.
689    .. versionadded:: 2.1
690    """
692    def __init__(self, *items: t.Any) -> None:
693        if not items:
694            raise RuntimeError("at least one item has to be provided")
695        self.items = items
696        self.pos = 0
698    def reset(self) -> None:
699        """Resets the current item to the first item."""
700        self.pos = 0
702    @property
703    def current(self) -> t.Any:
704        """Return the current item. Equivalent to the item that will be
705        returned next time :meth:`next` is called.
706        """
707        return self.items[self.pos]
709    def next(self) -> t.Any:
710        """Return the current item, then advance :attr:`current` to the
711        next item.
712        """
713        rv = self.current
714        self.pos = (self.pos + 1) % len(self.items)
715        return rv
717    __next__ = next
720class Joiner:
721    """A joining helper for templates."""
723    def __init__(self, sep: str = ", ") -> None:
724        self.sep = sep
725        self.used = False
727    def __call__(self) -> str:
728        if not self.used:
729            self.used = True
730            return ""
731        return self.sep
734class Namespace:
735    """A namespace object that can hold arbitrary attributes.  It may be
736    initialized from a dictionary or with keyword arguments."""
738    def __init__(*args: t.Any, **kwargs: t.Any) -> None:  # noqa: B902
739        self, args = args[0], args[1:]
740        self.__attrs = dict(*args, **kwargs)
742    def __getattribute__(self, name: str) -> t.Any:
743        # __class__ is needed for the awaitable check in async mode
744        if name in {"_Namespace__attrs", "__class__"}:
745            return object.__getattribute__(self, name)
746        try:
747            return self.__attrs[name]
748        except KeyError:
749            raise AttributeError(name) from None
751    def __setitem__(self, name: str, value: t.Any) -> None:
752        self.__attrs[name] = value
754    def __repr__(self) -> str:
755        return f"<Namespace {self.__attrs!r}>"
