diff --git a/tests/test_kin_091_regression.py b/tests/test_kin_091_regression.py index d23f22b..8806a51 100644 --- a/tests/test_kin_091_regression.py +++ b/tests/test_kin_091_regression.py @@ -417,10 +417,10 @@ class TestSpecDrivenRoute: assert "spec_driven" in data.get("routes", {}) def test_spec_driven_route_steps_order(self): - """spec_driven route: шаги [constitution, spec, architect, task_decomposer].""" + """spec_driven route: шаги [constitution, spec, architect, constitutional_validator, task_decomposer].""" data = self._load_specialists() steps = data["routes"]["spec_driven"]["steps"] - assert steps == ["constitution", "spec", "architect", "task_decomposer"] + assert steps == ["constitution", "spec", "architect", "constitutional_validator", "task_decomposer"] def test_spec_driven_all_roles_exist(self): """Все роли в spec_driven route должны быть объявлены в specialists.""" diff --git a/tests/test_kin_111_regression.py b/tests/test_kin_111_regression.py index e0db27b..eb2e707 100644 --- a/tests/test_kin_111_regression.py +++ b/tests/test_kin_111_regression.py @@ -1838,3 +1838,159 @@ class TestMultipleJsonColumnsEmptyArraySingleRow: f"Expected title to be str, got {type(task['title'])}: {task['title']!r}" ) assert task["title"] == "Normal title" + + +# --------------------------------------------------------------------------- +# KIN-P1-001 deeper revision — update_task with additional JSON columns as [] +# +# Previous update_task tests covered brief= and labels=. +# This section covers the remaining JSON columns in the tasks table: +# pending_steps, spec (via update), smoke_test_result, review. +# All are in _JSON_COLUMNS and must round-trip correctly. +# --------------------------------------------------------------------------- + + +class TestUpdateTaskAdditionalJsonColumnsEmptyList: + """update_task with remaining JSON columns set to [] — all must round-trip correctly.""" + + def test_update_task_pending_steps_to_empty_list(self, conn): + """update_task(pending_steps=[]) → get_task returns pending_steps as [] list.""" + models.update_task(conn, "PROJ-001", pending_steps=[]) + task = models.get_task(conn, "PROJ-001") + assert isinstance(task["pending_steps"], list), ( + f"Expected pending_steps=[] after update, got {type(task['pending_steps'])}: " + f"{task['pending_steps']!r}" + ) + assert task["pending_steps"] == [] + + def test_update_task_spec_to_empty_list(self, conn): + """update_task(spec=[]) on existing task → get_task returns spec as [] list.""" + models.update_task(conn, "PROJ-001", spec=[]) + task = models.get_task(conn, "PROJ-001") + assert isinstance(task["spec"], list), ( + f"Expected spec=[] after update_task, got {type(task['spec'])}: {task['spec']!r}" + ) + assert task["spec"] == [] + + def test_update_task_smoke_test_result_to_empty_list(self, conn): + """update_task(smoke_test_result=[]) → get_task returns smoke_test_result as [] list.""" + models.update_task(conn, "PROJ-001", smoke_test_result=[]) + task = models.get_task(conn, "PROJ-001") + assert isinstance(task["smoke_test_result"], list), ( + f"Expected smoke_test_result=[] after update, got {type(task['smoke_test_result'])}" + ) + assert task["smoke_test_result"] == [] + + def test_update_task_review_to_empty_list(self, conn): + """update_task(review=[]) → get_task returns review as [] list.""" + models.update_task(conn, "PROJ-001", review=[]) + task = models.get_task(conn, "PROJ-001") + assert isinstance(task["review"], list), ( + f"Expected review=[] after update, got {type(task['review'])}: {task['review']!r}" + ) + assert task["review"] == [] + + +# --------------------------------------------------------------------------- +# KIN-P1-001 deeper revision — generate_followups on non-existent / missing task +# +# generate_followups must return an empty result (not crash) when: +# (a) task_id refers to a task that does not exist in DB +# (b) task_id refers to a task whose project has been deleted +# --------------------------------------------------------------------------- + + +class TestGenerateFollowupsNonExistentTask: + """generate_followups with invalid task_id — must return empty result, not crash.""" + + def test_nonexistent_task_returns_empty_result(self, conn): + """generate_followups(conn, 'NO-SUCH-TASK') → {created: [], pending_actions: []}.""" + from core.followup import generate_followups + result = generate_followups(conn, "NO-SUCH-TASK-9999") + assert result == {"created": [], "pending_actions": []}, ( + f"Expected empty result for non-existent task, got: {result}" + ) + + def test_nonexistent_task_creates_no_tasks_in_db(self, conn): + """generate_followups on non-existent task must not create any tasks in DB.""" + from core.followup import generate_followups + tasks_before = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0] + generate_followups(conn, "PHANTOM-TASK") + tasks_after = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0] + assert tasks_after == tasks_before, ( + f"Task count changed from {tasks_before} to {tasks_after}. " + "generate_followups must not create tasks for a non-existent task_id." + ) + + +# --------------------------------------------------------------------------- +# KIN-P1-001 deeper revision — auto_resolve_pending_actions with empty list +# +# auto_resolve_pending_actions(conn, task_id, []) must return [] immediately +# without side effects. Previously untested. +# --------------------------------------------------------------------------- + + +class TestAutoResolvePendingActionsEmptyList: + """auto_resolve_pending_actions with empty pending_actions list.""" + + def test_empty_pending_actions_returns_empty_list(self, conn): + """auto_resolve_pending_actions(conn, task_id, []) → [] (nothing to resolve).""" + from core.followup import auto_resolve_pending_actions + result = auto_resolve_pending_actions(conn, "PROJ-001", []) + assert result == [], ( + f"Expected [] for empty pending_actions input, got: {result}" + ) + + def test_empty_pending_actions_creates_no_tasks_in_db(self, conn): + """auto_resolve_pending_actions(conn, task_id, []) → no tasks created.""" + from core.followup import auto_resolve_pending_actions + tasks_before = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0] + auto_resolve_pending_actions(conn, "PROJ-001", []) + tasks_after = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0] + assert tasks_after == tasks_before, ( + f"Task count changed from {tasks_before} to {tasks_after} for empty pending_actions" + ) + + def test_empty_pending_actions_does_not_mutate_task_status(self, conn): + """auto_resolve_pending_actions(conn, task_id, []) → task status unchanged.""" + from core.followup import auto_resolve_pending_actions + before = models.get_task(conn, "PROJ-001")["status"] + auto_resolve_pending_actions(conn, "PROJ-001", []) + after = models.get_task(conn, "PROJ-001")["status"] + assert after == before, ( + f"Task status changed from '{before}' to '{after}' for empty pending_actions" + ) + + +# --------------------------------------------------------------------------- +# KIN-P1-001 deeper revision — brief with nested empty list [[]] +# +# brief=[[ ]] is a valid JSON value (list containing empty list). +# _json_encode must serialize it; _row_to_dict must decode it back identically. +# --------------------------------------------------------------------------- + + +class TestTaskBriefNestedEmptyListRoundTrip: + """create_task with brief=[[]] — nested empty list must round-trip correctly.""" + + def test_brief_nested_empty_list_round_trips(self, conn): + """create_task(brief=[[]]) → get_task returns brief as [[]] (nested empty list).""" + models.create_task(conn, "PROJ-NESTED", "proj", "Nested empty brief", brief=[[]]) + task = models.get_task(conn, "PROJ-NESTED") + assert isinstance(task["brief"], list), ( + f"Expected brief to be list, got {type(task['brief'])}: {task['brief']!r}" + ) + assert task["brief"] == [[]], ( + f"Expected brief=[[]], got: {task['brief']!r}. " + "Nested empty list must survive JSON encode/decode round-trip." + ) + + def test_try_parse_json_nested_empty_list(self): + """_try_parse_json('[[]]') → [[]] (list containing empty list).""" + from agents.runner import _try_parse_json + result = _try_parse_json("[[]]") + assert result == [[]], f"Expected [[]], got: {result!r}" + assert isinstance(result, list) + assert isinstance(result[0], list) + assert result[0] == []