import asyncio
import os
import shutil
import subprocess
from argparse import ArgumentParser
from glob import iglob
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union, cast

from llama_index.core import (
    Settings,
    SimpleDirectoryReader,
    VectorStoreIndex,
)
from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.base.response.schema import (
    Response,
    StreamingResponse,
)
from llama_index.core.bridge.pydantic import BaseModel, Field, field_validator
from llama_index.core.chat_engine import CondenseQuestionChatEngine
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.llms import LLM
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.readers.base import BaseReader
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.utils import get_cache_dir


def _try_load_openai_llm():
    try:
        from llama_index.llms.openai import OpenAI  # pants: no-infer-dep

        return OpenAI(model="gpt-3.5-turbo", streaming=True)
    except ImportError:
        raise ImportError(
            "`llama-index-llms-openai` package not found, "
            "please run `pip install llama-index-llms-openai`"
        )


RAG_HISTORY_FILE_NAME = "files_history.txt"


def default_ragcli_persist_dir() -> str:
    return str(Path(get_cache_dir()) / "rag_cli")


def query_input(query_str: Optional[str] = None) -> str:
    return query_str or ""


class RagCLI(BaseModel):
    """
    CLI tool for chatting with output of a IngestionPipeline via a RetrieverQueryEngine.
    """

    ingestion_pipeline: IngestionPipeline = Field(
        description="Ingestion pipeline to run for RAG ingestion."
    )
    verbose: bool = Field(
        description="Whether to print out verbose information during execution.",
        default=False,
    )
    persist_dir: str = Field(
        description="Directory to persist ingestion pipeline.",
        default_factory=default_ragcli_persist_dir,
    )
    llm: LLM = Field(
        description="Language model to use for response generation.",
        default_factory=lambda: _try_load_openai_llm(),
    )
    chat_engine: Optional[CondenseQuestionChatEngine] = Field(
        description="Chat engine to use for chatting.",
        default=None,
    )
    file_extractor: Optional[Dict[str, BaseReader]] = Field(
        description="File extractor to use for extracting text from files.",
        default=None,
    )

    class Config:
        arbitrary_types_allowed = True

    @field_validator("chat_engine", mode="before")
    def chat_engine_from_ingestion_pipeline(
        cls, chat_engine: Any, values: Dict[str, Any]
    ) -> Optional[CondenseQuestionChatEngine]:
        """
        If chat_engine is not provided, create one from ingestion_pipeline.
        """
        if chat_engine is not None:
            return chat_engine

        ingestion_pipeline = cast(IngestionPipeline, values["ingestion_pipeline"])
        if ingestion_pipeline.vector_store is None:
            return None

        verbose = cast(bool, values["verbose"])
        llm = cast(LLM, values["llm"])

        # get embed_model from transformations if possible
        embed_model = None
        if ingestion_pipeline.transformations is not None:
            for transformation in ingestion_pipeline.transformations:
                if isinstance(transformation, BaseEmbedding):
                    embed_model = transformation
                    break

        Settings.llm = llm
        Settings.embed_model = embed_model

        retriever = VectorStoreIndex.from_vector_store(
            ingestion_pipeline.vector_store, embed_model=embed_model
        ).as_retriever(similarity_top_k=8)

        response_synthesizer = CompactAndRefine(
            streaming=True, llm=llm, verbose=verbose
        )

        query_engine = RetrieverQueryEngine(
            retriever=retriever, response_synthesizer=response_synthesizer
        )

        return CondenseQuestionChatEngine.from_defaults(
            query_engine=query_engine, llm=llm, verbose=verbose
        )

    async def handle_cli(
        self,
        files: Optional[List[str]] = None,
        question: Optional[str] = None,
        chat: bool = False,
        verbose: bool = False,
        clear: bool = False,
        create_llama: bool = False,
        **kwargs: Dict[str, Any],
    ) -> None:
        """
        Entrypoint for local document RAG CLI tool.
        """
        if clear:
            # delete self.persist_dir directory including all subdirectories and files
            if os.path.exists(self.persist_dir):
                # Ask for confirmation
                response = input(
                    f"Are you sure you want to delete data within {self.persist_dir}? [y/N] "
                )
                if response.strip().lower() != "y":
                    print("Aborted.")
                    return
                try:
                    shutil.rmtree(self.persist_dir)
                except Exception as e:
                    print(f"Error clearing {self.persist_dir}: {e}")
                    return
            print(f"Successfully cleared {self.persist_dir}")

        self.verbose = verbose
        ingestion_pipeline = cast(IngestionPipeline, self.ingestion_pipeline)
        if self.verbose:
            print("Saving/Loading from persist_dir: ", self.persist_dir)
        if files is not None:
            expanded_files = []
            for pattern in files:
                expanded_files.extend(iglob(pattern, recursive=True))
            documents = []
            for _file in expanded_files:
                _file = os.path.abspath(_file)
                if os.path.isdir(_file):
                    reader = SimpleDirectoryReader(
                        input_dir=_file,
                        filename_as_id=True,
                        file_extractor=self.file_extractor,
                    )
                else:
                    reader = SimpleDirectoryReader(
                        input_files=[_file],
                        filename_as_id=True,
                        file_extractor=self.file_extractor,
                    )

                documents.extend(reader.load_data(show_progress=verbose))

            await ingestion_pipeline.arun(show_progress=verbose, documents=documents)
            ingestion_pipeline.persist(persist_dir=self.persist_dir)

            # Append the `--files` argument to the history file
            with open(f"{self.persist_dir}/{RAG_HISTORY_FILE_NAME}", "a") as f:
                for file in files:
                    f.write(str(file) + "\n")

        if create_llama:
            if shutil.which("npx") is None:
                print(
                    "`npx` is not installed. Please install it by calling `npm install -g npx`"
                )
            else:
                history_file_path = Path(f"{self.persist_dir}/{RAG_HISTORY_FILE_NAME}")
                if not history_file_path.exists():
                    print(
                        "No data has been ingested, "
                        "please specify `--files` to create llama dataset."
                    )
                else:
                    with open(history_file_path) as f:
                        stored_paths = {line.strip() for line in f if line.strip()}
                    if len(stored_paths) == 0:
                        print(
                            "No data has been ingested, "
                            "please specify `--files` to create llama dataset."
                        )
                    elif len(stored_paths) > 1:
                        print(
                            "Multiple files or folders were ingested, which is not supported by create-llama. "
                            "Please call `llamaindex-cli rag --clear` to clear the cache first, "
                            "then call `llamaindex-cli rag --files` again with a single folder or file"
                        )
                    else:
                        path = stored_paths.pop()
                        if "*" in path:
                            print(
                                "Glob pattern is not supported by create-llama. "
                                "Please call `llamaindex-cli rag --clear` to clear the cache first, "
                                "then call `llamaindex-cli rag --files` again with a single folder or file."
                            )
                        elif not os.path.exists(path):
                            print(
                                f"The path {path} does not exist. "
                                "Please call `llamaindex-cli rag --clear` to clear the cache first, "
                                "then call `llamaindex-cli rag --files` again with a single folder or file."
                            )
                        else:
                            print(f"Calling create-llama using data from {path} ...")
                            command_args = [
                                "npx",
                                "create-llama@latest",
                                "--frontend",
                                "--template",
                                "streaming",
                                "--framework",
                                "fastapi",
                                "--ui",
                                "shadcn",
                                "--vector-db",
                                "none",
                                "--engine",
                                "context",
                                "--files",
                                path,
                            ]
                            subprocess.run(command_args, check=True)

        if question is not None:
            await self.handle_question(question)
        if chat:
            await self.start_chat_repl()

    async def handle_question(self, question: str) -> None:
        chat_engine = cast(CondenseQuestionChatEngine, self.chat_engine)
        response = chat_engine.chat(question)

        if isinstance(response, StreamingResponse):
            response.print_response_stream()
        else:
            response = cast(Response, response)
            print(response)

    async def start_chat_repl(self) -> None:
        """
        Start a REPL for chatting with the agent.
        """
        if self.chat_engine is None:
            raise ValueError("chat_engine is not defined.")

        chat_engine = cast(CondenseQuestionChatEngine, self.chat_engine)
        chat_engine.streaming_chat_repl()

    @classmethod
    def add_parser_args(
        cls,
        parser: Union[ArgumentParser, Any],
        instance_generator: Optional[Callable[[], "RagCLI"]],
    ) -> None:
        if instance_generator:
            parser.add_argument(
                "-q",
                "--question",
                type=str,
                help="The question you want to ask.",
                required=False,
            )

            parser.add_argument(
                "-f",
                "--files",
                type=str,
                nargs="+",
                help=(
                    "The name of the file(s) or directory you want to ask a question about,"
                    'such as "file.pdf". Supports globs like "*.py".'
                ),
            )
            parser.add_argument(
                "-c",
                "--chat",
                help="If flag is present, opens a chat REPL.",
                action="store_true",
            )
            parser.add_argument(
                "-v",
                "--verbose",
                help="Whether to print out verbose information during execution.",
                action="store_true",
            )
            parser.add_argument(
                "--clear",
                help="Clears out all currently embedded data.",
                action="store_true",
            )
            parser.add_argument(
                "--create-llama",
                help="Create a LlamaIndex application with your embedded data.",
                required=False,
                action="store_true",
            )
            parser.set_defaults(
                func=lambda args: asyncio.run(
                    instance_generator().handle_cli(**vars(args))
                )
            )

    def cli(self) -> None:
        """
        Entrypoint for CLI tool.
        """
        parser = ArgumentParser(description="LlamaIndex RAG Q&A tool.")
        subparsers = parser.add_subparsers(
            title="commands", dest="command", required=True
        )
        llamarag_parser = subparsers.add_parser(
            "rag", help="Ask a question to a document / a directory of documents."
        )
        self.add_parser_args(llamarag_parser, lambda: self)
        # Parse the command-line arguments
        args = parser.parse_args()

        # Call the appropriate function based on the command
        args.func(args)
