Module search_me.osint

Expand source code
# -*- coding: utf-8 -*-
import asyncio
import logging
import time
from functools import cached_property
import jmespath
from numpy import where
from search_me.storage import SafeStorage as Storage
from search_me.tools import validate_api_key, get_current_dir


__all__ = ( )

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


class DocMixin:
    """Doc mixin
    """

    def doc(self):
        """Get doc

        Returns
        -------
        str
            Doc
        """
        return self.df_doc.to_markdown(tablefmt="grid", index=False)


class OSINT:
    """OSINT
    """

    __slots__ = ("df_main", "df_swap", "df_doc")

    def __init__(self, api_key):
        """Init

        Parameters
        ----------
        api_key : tuple
            API KEY
        """
        self.load(api_key)

    def __repr__(self):
        """Repr

        Returns
        -------
        str
            Repr
        """
        df = self.df_main
        return (
            df
            .iloc[where(df.name.values == self.name)]
            .to_markdown(tablefmt="grid", index=False)
        )

    @cached_property
    def name(self):
        """Name

        Returns
        -------
        str
            Name
        """
        return self.__class__.__name__.lower()

    @cached_property
    def data_main(self):
        """Data main

        Returns
        -------
        list
            Data main
        """
        df = self.df_main
        return df.iloc[where(df.name.values == self.name)].to_dict("records")

    @cached_property
    def data_swap(self):
        """Data swap

        Returns
        -------
        list
            Data swap
        """
        df = self.df_swap
        return df.iloc[where(df.name.values == self.name)].swap.tolist()

    @validate_api_key
    def load(self, api_key):
        """Load data

        Parameters
        ----------
        api_key : tuple
            API KEY
        """
        workdir = get_current_dir() / ".osint"
        fp_s, fp_p = api_key
        with Storage.load(fp_s, fp_p) as loader:
            self.df_main = loader.send(workdir / ".main")
            next(loader)
            self.df_swap = loader.send(workdir / ".swap")
            next(loader)
            if DocMixin in self.__class__.__bases__:
                self.df_doc = loader.send(workdir / f".{self.name}")
                next(loader)

    def __swap_values(self, **kwargs):
        """Swap values

        Returns
        -------
        dict
            Swapped values
        """
        swp = self.data_swap
        if swp:
            swp = swp[0]
            for k1, k_arr in swp.items():
                for k2 in k_arr:
                    kwargs[k2] = kwargs[k1]
        return kwargs

    async def search(self, session, **kwargs):
        """Osint search

        Parameters
        ----------
        session : aiohttp.ClientSession
            Session

        Returns
        -------
        generator
            Results
        """
        xl_names = {"gov", "brand", "code", "rutube"}
        call_f = self.__call_xl if self.name in xl_names else self.__call
        return (
            await self.__call_batch(
                session,
                call_f,
                **self.__swap_values(**kwargs)
                )
            )

    async def __call(self, session, d, **kwargs):
        """Http request

        Parameters
        ----------
        session : aiohttp.ClientSession
            Session
        d : dict
            Data

        Returns
        -------
        Any
            Response content
        """
        uri = d["uri"].format(**kwargs)
        kw = {}
        params = d.get("params", None)
        default_headers = d.get("default_headers", None)
        if params:
            params = {k: kwargs[k] for k in kwargs if k in params.split(",")}
            default_params = d.get("default_params", None)
            if default_params:
                params = {**default_params, **params}
            kw[d["in_format"]] = params
        if default_headers:
            kw["headers"] = default_headers
        logger.debug(f"{uri, d, kw}")
        async with session.request(d["method"], uri, **kw) as resp:
            logger.info(f"{uri, resp.status}")
            if d["out_format"] == "json":
                content_type = d.get("out_content_type", None)
                if content_type:
                    content = await resp.json(content_type=content_type)
                else:
                    content = await resp.json()
                out_xpath = d.get("out_xpath", None)
                if out_xpath:
                    return jmespath.compile(out_xpath).search(content)
                else:
                    return content
            return uri, resp.status

    async def __call_xl(self, session, d, **kwargs):
        """Recursive http request

        Parameters
        ----------
        session : aiohttp.ClientSession
            Session
        d : dict
            Data

        Returns
        -------
        Any
            Response content
        """
        nested_results = []
        uri = d["uri"].format(**kwargs)
        kw = {}
        params = d.get("params", None)
        default_headers = d.get("default_headers", None)
        if params:
            params = {k: kwargs[k] for k in kwargs if k in params.split(",")}
            default_params = d.get("default_params", None)
            if default_params:
                params = {**default_params, **params}
            kw[d["in_format"]] = params
        if default_headers:
            kw["headers"] = default_headers
        logger.debug(f"{uri, d, kw}")
        async with session.request(d["method"], uri, **kw) as resp:
            logger.info(f"{uri, resp.status}")
            out_format = d.get("out_format", None)
            if out_format == "json":
                content_type = d.get("out_content_type", None)
                if content_type:
                    content = await resp.json(content_type=content_type)
                else:
                    content = await resp.json()
                out_xpath = d.get("out_xpath", None)
                if out_xpath:
                    content = jmespath.search(out_xpath, content)
            elif out_format == "file":
                fp = resp.headers["Content-Disposition"].split(";")[-1].split("=")[-1]
                with open(fp, "wb") as f:
                    async for chunk in resp.content.iter_chunked(2 ** 10):
                        f.write(chunk)
                content = fp
            else:
                content = None
            delay = d.get("delay", 0)
            time.sleep(delay)
            nested_context = d.get("nested", None)
            if nested_context:
                trace_key = d.get("trace_key", "")
                batch = d.get("batch", False)
                if batch:
                    for c in content:
                        r = await self.__call_xl(
                            session, nested_context, **{**nested_context, **{trace_key: c}}
                            )
                        nested_results.append(r)
                    return nested_results
                else:
                    trace_data = kwargs.get(trace_key, None)
                    if trace_data:
                        nested_context[trace_key] = trace_data
                    return await self.__call_xl(
                        session, nested_context, **{**content, **nested_context}
                        )
            else:
                return content

    async def __call_batch(self, session, call_f, **kwargs):
        """Batch http requests

        Parameters
        ----------
        session : aiohttp.ClientSession
            Session
        call_f : function
            Http request function

        Returns
        -------
        generator
            Results
        """
        return (
            r for r in (
                await asyncio.gather(
                    *(call_f(session, d, **kwargs) for d in self.data_main),
                    return_exceptions=True
                    )
                )
            )


class Domain(OSINT):
    """OSINT. Domain
    """


class Ip(OSINT):
    """OSINT. Ip
    """


class Url(OSINT):
    """OSINT. Url
    """


class Email(OSINT):
    """OSINT. Email
    """


class Zipcode(OSINT):
    """OSINT. Zipcode
    """


class Geo(OSINT):
    """OSINT. Geo
    """


class GeoRoute(OSINT):
    """OSINT. GeoRoute
    """


class WebCam(OSINT):
    """OSINT. WebCam
    """


class WebCamGeo(OSINT):
    """OSINT. WebCamGeo
    """


class Gov(OSINT, DocMixin):
    """OSINT. Gov
    """


class Academic(OSINT):
    """OSINT. Academic
    """


class Text(OSINT):
    """OSINT. Text
    """


class Dataset(OSINT):
    """OSINT. Dataset
    """


class News(OSINT):
    """OSINT. News
    """


class Btc(OSINT):
    """OSINT. Btc
    """


class Username(OSINT):
    """OSINT. Username
    """


class Telegram(OSINT):
    """OSINT. Telegram
    """


class Github(OSINT):
    """OSINT. Github
    """


class Vk(OSINT):
    """OSINT. Vk
    """


class Associations(OSINT):
    """OSINT. Associations
    """


class Suggestions(OSINT):
    """OSINT. Suggestions
    """


class Code(OSINT, DocMixin):
    """OSINT. Code
    """


class Brand(OSINT):
    """OSINT. Brand
    """


class Youtube(OSINT):
    """OSINT. Youtube
    """


class Rutube(OSINT):
    """OSINT. Rutube
    """


class Work(OSINT):
    """OSINT. Work
    """