#!/usr/bin/env python from collections import Counter import sys, re, os, enum, itertools, shlex, io from functools import partial, partialmethod from time import sleep from dataclasses import dataclass, field from operator import itemgetter from defl import log, cl, Dath, Time, Run, sp, ShQu from defl._typing_ import * from defl._typing_ import T, F, N, U from defl._rich_ import * from defl._pydantic_ import * import defl from pathlib import Path as PLPath from defl._path._temp_ import TempPath import pytest def test_jqDump1(): tmpPath = defl.sp.tmp.makeTemp([], autoRemove=T, create=T, direct=F, ext=N) with defl.JqDump(tmpPath, append=T) as jq: for i in range(3): jq.dump({'a': i}) res = tmpPath.readText() assert res == '{"a":0}\n{"a":1}\n{"a":2}\n' def test_jqDump2(): tmpPath = defl.sp.tmp.makeTemp([], autoRemove=T, create=T, direct=F, ext=N) with defl.JqDump(tmpPath, append=T) as jq: jq.dump({'a': 'a'}) sleep(0.2) # | flushing to file is async res = tmpPath.readText() assert res == '{"a":"a"}\n'