import pickle

import pytest
import srsly
from thinc.api import Linear

import spacy
from spacy import Vocab, load, registry
from spacy.lang.en import English
from spacy.language import Language
from spacy.pipeline import (
    DependencyParser,
    EntityRecognizer,
    EntityRuler,
    SentenceRecognizer,
    Tagger,
    TextCategorizer,
    TrainablePipe,
)
from spacy.pipeline.dep_parser import DEFAULT_PARSER_MODEL
from spacy.pipeline.senter import DEFAULT_SENTER_MODEL
from spacy.pipeline.tagger import DEFAULT_TAGGER_MODEL
from spacy.pipeline.textcat import DEFAULT_SINGLE_TEXTCAT_MODEL
from spacy.tokens import Span
from spacy.util import ensure_path, load_model

from ..util import make_tempdir

test_parsers = [DependencyParser, EntityRecognizer]


@pytest.fixture
def parser(en_vocab):
    config = {
        "learn_tokens": False,
        "min_action_freq": 30,
        "update_with_oracle_cut_size": 100,
        "beam_width": 1,
        "beam_update_prob": 1.0,
        "beam_density": 0.0,
    }
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = DependencyParser(en_vocab, model, **config)
    parser.add_label("nsubj")
    return parser


@pytest.fixture
def blank_parser(en_vocab):
    config = {
        "learn_tokens": False,
        "min_action_freq": 30,
        "update_with_oracle_cut_size": 100,
        "beam_width": 1,
        "beam_update_prob": 1.0,
        "beam_density": 0.0,
    }
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = DependencyParser(en_vocab, model, **config)
    return parser


@pytest.fixture
def taggers(en_vocab):
    cfg = {"model": DEFAULT_TAGGER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    tagger1 = Tagger(en_vocab, model)
    tagger2 = Tagger(en_vocab, model)
    return tagger1, tagger2


@pytest.mark.issue(3456)
def test_issue3456():
    # this crashed because of a padding error in layer.ops.unflatten in thinc
    nlp = English()
    tagger = nlp.add_pipe("tagger")
    tagger.add_label("A")
    nlp.initialize()
    list(nlp.pipe(["hi", ""]))


@pytest.mark.issue(3526)
def test_issue_3526_1(en_vocab):
    patterns = [
        {"label": "HELLO", "pattern": "hello world"},
        {"label": "BYE", "pattern": [{"LOWER": "bye"}, {"LOWER": "bye"}]},
        {"label": "HELLO", "pattern": [{"ORTH": "HELLO"}]},
        {"label": "COMPLEX", "pattern": [{"ORTH": "foo", "OP": "*"}]},
        {"label": "TECH_ORG", "pattern": "Apple", "id": "a1"},
    ]
    nlp = Language(vocab=en_vocab)
    ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
    ruler_bytes = ruler.to_bytes()
    assert len(ruler) == len(patterns)
    assert len(ruler.labels) == 4
    assert ruler.overwrite
    new_ruler = EntityRuler(nlp)
    new_ruler = new_ruler.from_bytes(ruler_bytes)
    assert len(new_ruler) == len(ruler)
    assert len(new_ruler.labels) == 4
    assert new_ruler.overwrite == ruler.overwrite
    assert new_ruler.ent_id_sep == ruler.ent_id_sep


@pytest.mark.issue(3526)
def test_issue_3526_2(en_vocab):
    patterns = [
        {"label": "HELLO", "pattern": "hello world"},
        {"label": "BYE", "pattern": [{"LOWER": "bye"}, {"LOWER": "bye"}]},
        {"label": "HELLO", "pattern": [{"ORTH": "HELLO"}]},
        {"label": "COMPLEX", "pattern": [{"ORTH": "foo", "OP": "*"}]},
        {"label": "TECH_ORG", "pattern": "Apple", "id": "a1"},
    ]
    nlp = Language(vocab=en_vocab)
    ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
    bytes_old_style = srsly.msgpack_dumps(ruler.patterns)
    new_ruler = EntityRuler(nlp)
    new_ruler = new_ruler.from_bytes(bytes_old_style)
    assert len(new_ruler) == len(ruler)
    for pattern in ruler.patterns:
        assert pattern in new_ruler.patterns
    assert new_ruler.overwrite is not ruler.overwrite


@pytest.mark.issue(3526)
def test_issue_3526_3(en_vocab):
    patterns = [
        {"label": "HELLO", "pattern": "hello world"},
        {"label": "BYE", "pattern": [{"LOWER": "bye"}, {"LOWER": "bye"}]},
        {"label": "HELLO", "pattern": [{"ORTH": "HELLO"}]},
        {"label": "COMPLEX", "pattern": [{"ORTH": "foo", "OP": "*"}]},
        {"label": "TECH_ORG", "pattern": "Apple", "id": "a1"},
    ]
    nlp = Language(vocab=en_vocab)
    ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
    with make_tempdir() as tmpdir:
        out_file = tmpdir / "entity_ruler"
        srsly.write_jsonl(out_file.with_suffix(".jsonl"), ruler.patterns)
        new_ruler = EntityRuler(nlp).from_disk(out_file)
        for pattern in ruler.patterns:
            assert pattern in new_ruler.patterns
        assert len(new_ruler) == len(ruler)
        assert new_ruler.overwrite is not ruler.overwrite


@pytest.mark.issue(3526)
def test_issue_3526_4(en_vocab):
    nlp = Language(vocab=en_vocab)
    patterns = [{"label": "ORG", "pattern": "Apple"}]
    config = {"overwrite_ents": True}
    ruler = nlp.add_pipe("entity_ruler", config=config)
    ruler.add_patterns(patterns)
    with make_tempdir() as tmpdir:
        nlp.to_disk(tmpdir)
        ruler = nlp.get_pipe("entity_ruler")
        assert ruler.patterns == [{"label": "ORG", "pattern": "Apple"}]
        assert ruler.overwrite is True
        nlp2 = load(tmpdir)
        new_ruler = nlp2.get_pipe("entity_ruler")
        assert new_ruler.patterns == [{"label": "ORG", "pattern": "Apple"}]
        assert new_ruler.overwrite is True


@pytest.mark.issue(4042)
def test_issue4042():
    """Test that serialization of an EntityRuler before NER works fine."""
    nlp = English()
    # add ner pipe
    ner = nlp.add_pipe("ner")
    ner.add_label("SOME_LABEL")
    nlp.initialize()
    # Add entity ruler
    patterns = [
        {"label": "MY_ORG", "pattern": "Apple"},
        {"label": "MY_GPE", "pattern": [{"lower": "san"}, {"lower": "francisco"}]},
    ]
    # works fine with "after"
    ruler = nlp.add_pipe("entity_ruler", before="ner")
    ruler.add_patterns(patterns)
    doc1 = nlp("What do you think about Apple ?")
    assert doc1.ents[0].label_ == "MY_ORG"

    with make_tempdir() as d:
        output_dir = ensure_path(d)
        if not output_dir.exists():
            output_dir.mkdir()
        nlp.to_disk(output_dir)
        nlp2 = load_model(output_dir)
        doc2 = nlp2("What do you think about Apple ?")
        assert doc2.ents[0].label_ == "MY_ORG"


@pytest.mark.issue(4042)
def test_issue4042_bug2():
    """
    Test that serialization of an NER works fine when new labels were added.
    This is the second bug of two bugs underlying the issue 4042.
    """
    nlp1 = English()
    # add ner pipe
    ner1 = nlp1.add_pipe("ner")
    ner1.add_label("SOME_LABEL")
    nlp1.initialize()
    # add a new label to the doc
    doc1 = nlp1("What do you think about Apple ?")
    assert len(ner1.labels) == 1
    assert "SOME_LABEL" in ner1.labels
    apple_ent = Span(doc1, 5, 6, label="MY_ORG")
    doc1.ents = list(doc1.ents) + [apple_ent]
    # Add the label explicitly. Previously we didn't require this.
    ner1.add_label("MY_ORG")
    ner1(doc1)
    assert len(ner1.labels) == 2
    assert "SOME_LABEL" in ner1.labels
    assert "MY_ORG" in ner1.labels
    with make_tempdir() as d:
        # assert IO goes fine
        output_dir = ensure_path(d)
        if not output_dir.exists():
            output_dir.mkdir()
        ner1.to_disk(output_dir)
        config = {}
        ner2 = nlp1.create_pipe("ner", config=config)
        ner2.from_disk(output_dir)
        assert len(ner2.labels) == 2


@pytest.mark.issue(4725)
def test_issue4725_1():
    """Ensure the pickling of the NER goes well"""
    vocab = Vocab(vectors_name="test_vocab_add_vector")
    nlp = English(vocab=vocab)
    config = {
        "update_with_oracle_cut_size": 111,
    }
    ner = nlp.create_pipe("ner", config=config)
    with make_tempdir() as tmp_path:
        with (tmp_path / "ner.pkl").open("wb") as file_:
            pickle.dump(ner, file_)
            assert ner.cfg["update_with_oracle_cut_size"] == 111

        with (tmp_path / "ner.pkl").open("rb") as file_:
            ner2 = pickle.load(file_)
            assert ner2.cfg["update_with_oracle_cut_size"] == 111


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_parser_roundtrip_bytes(en_vocab, Parser):
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = Parser(en_vocab, model)
    new_parser = Parser(en_vocab, model)
    new_parser = new_parser.from_bytes(parser.to_bytes(exclude=["vocab"]))
    bytes_2 = new_parser.to_bytes(exclude=["vocab"])
    bytes_3 = parser.to_bytes(exclude=["vocab"])
    assert len(bytes_2) == len(bytes_3)
    assert bytes_2 == bytes_3


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_parser_strings(Parser):
    vocab1 = Vocab()
    label = "FunnyLabel"
    assert label not in vocab1.strings
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser1 = Parser(vocab1, model)
    parser1.add_label(label)
    assert label in parser1.vocab.strings
    vocab2 = Vocab()
    assert label not in vocab2.strings
    parser2 = Parser(vocab2, model)
    parser2 = parser2.from_bytes(parser1.to_bytes(exclude=["vocab"]))
    assert label in parser2.vocab.strings


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_parser_roundtrip_disk(en_vocab, Parser):
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = Parser(en_vocab, model)
    with make_tempdir() as d:
        file_path = d / "parser"
        parser.to_disk(file_path)
        parser_d = Parser(en_vocab, model)
        parser_d = parser_d.from_disk(file_path)
        parser_bytes = parser.to_bytes(exclude=["model", "vocab"])
        parser_d_bytes = parser_d.to_bytes(exclude=["model", "vocab"])
        assert len(parser_bytes) == len(parser_d_bytes)
        assert parser_bytes == parser_d_bytes


def test_to_from_bytes(parser, blank_parser):
    assert parser.model is not True
    assert blank_parser.model is not True
    assert blank_parser.moves.n_moves != parser.moves.n_moves
    bytes_data = parser.to_bytes(exclude=["vocab"])
    # the blank parser needs to be resized before we can call from_bytes
    blank_parser.model.attrs["resize_output"](blank_parser.model, parser.moves.n_moves)
    blank_parser.from_bytes(bytes_data)
    assert blank_parser.model is not True
    assert blank_parser.moves.n_moves == parser.moves.n_moves


def test_serialize_tagger_roundtrip_bytes(en_vocab, taggers):
    tagger1 = taggers[0]
    tagger1_b = tagger1.to_bytes()
    tagger1 = tagger1.from_bytes(tagger1_b)
    assert tagger1.to_bytes() == tagger1_b
    cfg = {"model": DEFAULT_TAGGER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    new_tagger1 = Tagger(en_vocab, model).from_bytes(tagger1_b)
    new_tagger1_b = new_tagger1.to_bytes()
    assert len(new_tagger1_b) == len(tagger1_b)
    assert new_tagger1_b == tagger1_b


def test_serialize_tagger_roundtrip_disk(en_vocab, taggers):
    tagger1, tagger2 = taggers
    with make_tempdir() as d:
        file_path1 = d / "tagger1"
        file_path2 = d / "tagger2"
        tagger1.to_disk(file_path1)
        tagger2.to_disk(file_path2)
        cfg = {"model": DEFAULT_TAGGER_MODEL}
        model = registry.resolve(cfg, validate=True)["model"]
        tagger1_d = Tagger(en_vocab, model).from_disk(file_path1)
        tagger2_d = Tagger(en_vocab, model).from_disk(file_path2)
        assert tagger1_d.to_bytes() == tagger2_d.to_bytes()


def test_serialize_tagger_strings(en_vocab, de_vocab, taggers):
    label = "SomeWeirdLabel"
    assert label not in en_vocab.strings
    assert label not in de_vocab.strings
    tagger = taggers[0]
    assert label not in tagger.vocab.strings
    with make_tempdir() as d:
        # check that custom labels are serialized as part of the component's strings.jsonl
        tagger.add_label(label)
        assert label in tagger.vocab.strings
        file_path = d / "tagger1"
        tagger.to_disk(file_path)
        # ensure that the custom strings are loaded back in when using the tagger in another pipeline
        cfg = {"model": DEFAULT_TAGGER_MODEL}
        model = registry.resolve(cfg, validate=True)["model"]
        tagger2 = Tagger(de_vocab, model).from_disk(file_path)
        assert label in tagger2.vocab.strings


@pytest.mark.issue(1105)
def test_serialize_textcat_empty(en_vocab):
    # See issue #1105
    cfg = {"model": DEFAULT_SINGLE_TEXTCAT_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    textcat = TextCategorizer(en_vocab, model, threshold=0.5)
    textcat.to_bytes(exclude=["vocab"])


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_pipe_exclude(en_vocab, Parser):
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]

    def get_new_parser():
        new_parser = Parser(en_vocab, model)
        return new_parser

    parser = Parser(en_vocab, model)
    parser.cfg["foo"] = "bar"
    new_parser = get_new_parser().from_bytes(parser.to_bytes(exclude=["vocab"]))
    assert "foo" in new_parser.cfg
    new_parser = get_new_parser().from_bytes(
        parser.to_bytes(exclude=["vocab"]), exclude=["cfg"]
    )
    assert "foo" not in new_parser.cfg
    new_parser = get_new_parser().from_bytes(
        parser.to_bytes(exclude=["cfg"]), exclude=["vocab"]
    )
    assert "foo" not in new_parser.cfg


def test_serialize_sentencerecognizer(en_vocab):
    cfg = {"model": DEFAULT_SENTER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    sr = SentenceRecognizer(en_vocab, model)
    sr_b = sr.to_bytes()
    sr_d = SentenceRecognizer(en_vocab, model).from_bytes(sr_b)
    assert sr.to_bytes() == sr_d.to_bytes()


def test_serialize_pipeline_disable_enable():
    nlp = English()
    nlp.add_pipe("ner")
    nlp.add_pipe("tagger")
    nlp.disable_pipe("tagger")
    assert nlp.config["nlp"]["disabled"] == ["tagger"]
    config = nlp.config.copy()
    nlp2 = English.from_config(config)
    assert nlp2.pipe_names == ["ner"]
    assert nlp2.component_names == ["ner", "tagger"]
    assert nlp2.disabled == ["tagger"]
    assert nlp2.config["nlp"]["disabled"] == ["tagger"]
    with make_tempdir() as d:
        nlp2.to_disk(d)
        nlp3 = spacy.load(d)
    assert nlp3.pipe_names == ["ner"]
    assert nlp3.component_names == ["ner", "tagger"]
    with make_tempdir() as d:
        nlp3.to_disk(d)
        nlp4 = spacy.load(d, disable=["ner"])
    assert nlp4.pipe_names == []
    assert nlp4.component_names == ["ner", "tagger"]
    assert nlp4.disabled == ["ner", "tagger"]
    with make_tempdir() as d:
        nlp.to_disk(d)
        nlp5 = spacy.load(d, exclude=["tagger"])
    assert nlp5.pipe_names == ["ner"]
    assert nlp5.component_names == ["ner"]
    assert nlp5.disabled == []


def test_serialize_custom_trainable_pipe():
    class BadCustomPipe1(TrainablePipe):
        def __init__(self, vocab):
            pass

    class BadCustomPipe2(TrainablePipe):
        def __init__(self, vocab):
            self.vocab = vocab
            self.model = None

    class CustomPipe(TrainablePipe):
        def __init__(self, vocab, model):
            self.vocab = vocab
            self.model = model

    pipe = BadCustomPipe1(Vocab())
    with pytest.raises(ValueError):
        pipe.to_bytes()
    with make_tempdir() as d:
        with pytest.raises(ValueError):
            pipe.to_disk(d)
    pipe = BadCustomPipe2(Vocab())
    with pytest.raises(ValueError):
        pipe.to_bytes()
    with make_tempdir() as d:
        with pytest.raises(ValueError):
            pipe.to_disk(d)
    pipe = CustomPipe(Vocab(), Linear())
    pipe_bytes = pipe.to_bytes()
    new_pipe = CustomPipe(Vocab(), Linear()).from_bytes(pipe_bytes)
    assert new_pipe.to_bytes() == pipe_bytes
    with make_tempdir() as d:
        pipe.to_disk(d)
        new_pipe = CustomPipe(Vocab(), Linear()).from_disk(d)
    assert new_pipe.to_bytes() == pipe_bytes


def test_load_without_strings():
    nlp = spacy.blank("en")
    orig_strings_length = len(nlp.vocab.strings)
    word = "unlikely_word_" * 20
    nlp.vocab.strings.add(word)
    assert len(nlp.vocab.strings) == orig_strings_length + 1
    with make_tempdir() as d:
        nlp.to_disk(d)
        # reload with strings
        reloaded_nlp = load(d)
        assert len(nlp.vocab.strings) == len(reloaded_nlp.vocab.strings)
        assert word in reloaded_nlp.vocab.strings
        # reload without strings
        reloaded_nlp = load(d, exclude=["strings"])
        assert orig_strings_length == len(reloaded_nlp.vocab.strings)
        assert word not in reloaded_nlp.vocab.strings
