"""
Copyright 2023-present fretgfr
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import datetime
from itertools import islice
from operator import attrgetter
from typing import Any, AsyncIterable, Coroutine, Generator, Iterable, Optional, Tuple, TypeVar, Union, overload
__all__ = (
"as_chunks",
"get",
"utcnow",
)
T = TypeVar("T")
_Iter = Union[Iterable[T], AsyncIterable[T]]
Coro = Coroutine[Any, Any, T]
def parse_iso_timestamp(iso_str: str, /) -> datetime.datetime:
"""Parses an iso string to an aware UTC datetime."""
return datetime.datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=datetime.timezone.utc)
def to_iso_format(dt: datetime.datetime, /) -> str:
return dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
[docs]
def utcnow() -> datetime.datetime:
"""Returns an aware UTC datetime representing the current time.
Returns
--------
:class:`datetime.datetime`
The current aware datetime in UTC.
"""
return datetime.datetime.now(datetime.timezone.utc)
# From the python docs for 3.12s `itertools.batched`
[docs]
def as_chunks(iterable: Iterable[T], n: int) -> Generator[Tuple[T, ...], None, None]:
"""Batches an iterable into chunks of up to size n.
Parameters
----------
iterable : :class:`collections.abc.Iterable`
The iterable to batch
n : :class:`int`
The number of elements per generated tuple.
Raises
------
:class:`ValueError`
At least one result must be returned per group.
"""
if n < 1:
raise ValueError("n must be at least one")
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
# The following get utility was sourced from discord.py under the MIT license.
def _get(iterable: Iterable[T], /, **attrs: Any) -> Optional[T]:
# global -> local
_all = all
attrget = attrgetter
# Special case the single element call
if len(attrs) == 1:
k, v = attrs.popitem()
pred = attrget(k.replace("__", "."))
return next((elem for elem in iterable if pred(elem) == v), None)
converted = [(attrget(attr.replace("__", ".")), value) for attr, value in attrs.items()]
for elem in iterable:
if _all(pred(elem) == value for pred, value in converted):
return elem
return None
async def _aget(iterable: AsyncIterable[T], /, **attrs: Any) -> Optional[T]:
# global -> local
_all = all
attrget = attrgetter
# Special case the single element call
if len(attrs) == 1:
k, v = attrs.popitem()
pred = attrget(k.replace("__", "."))
async for elem in iterable:
if pred(elem) == v:
return elem
return None
converted = [(attrget(attr.replace("__", ".")), value) for attr, value in attrs.items()]
async for elem in iterable:
if _all(pred(elem) == value for pred, value in converted):
return elem
return None
@overload
def get(iterable: AsyncIterable[T], /, **attrs: Any) -> Coro[Optional[T]]: ...
@overload
def get(iterable: Iterable[T], /, **attrs: Any) -> Optional[T]: ...
[docs]
def get(iterable: _Iter[T], /, **attrs: Any) -> Union[Optional[T], Coro[Optional[T]]]:
r"""A helper that returns the first element in the iterable that meets
all the traits passed in ``attrs``.
When multiple attributes are specified, they are checked using
logical AND, not logical OR. Meaning they have to meet every
attribute passed in and not one of them.
To have a nested attribute search (i.e. search by ``x.y``) then
pass in ``x__y`` as the keyword argument.
If nothing is found that matches the attributes passed, then
``None`` is returned.
Parameters
-----------
iterable: Union[:class:`collections.abc.Iterable`, :class:`collections.abc.AsyncIterable`]
The iterable to search through. Using a :class:`collections.abc.AsyncIterable`,
makes this function return a coroutine.
\*\*attrs
Keyword arguments that denote attributes to search with.
"""
return (
_aget(iterable, **attrs) # type: ignore
if hasattr(iterable, "__aiter__") # isinstance(iterable, collections.abc.AsyncIterable) is too slow
else _get(iterable, **attrs) # type: ignore
)