defl/defl/_userInput_.py
2024-09-11 11:14:03 -04:00

328 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python
import atexit
import functools
import itertools
import os
import re
import select
import signal
import sys
import time
import sys
from io import TextIOWrapper, BufferedReader, BufferedWriter
import select
import toml
from ._stringAssembly_ import StringAssembly
from ._ansii_ import cl
from ._except_ import TryCM, logExceptAndExit
from ._logger_ import log
from ._table_ import printCondenseList
from ._timeUtils_ import TimeoutCM
from ._tty_ import TtyNoBufferCM, TtyNoEchoCM
from ._typing_ import *
from ._pipe_ import AutoReader
from ._math_ import BaseNumberSystemMaker
from ._typeCheck_ import IterableType, isIterableType
_inputStringNumSysTokens = '1234567890qwertyuiopasdfghjklzxcvbnm'
class UserInputTimeoutError(Exception):
__slot__ = ()
class InvalidUserIn(Exception):
__slot__ = ()
class UserInException(Exception):
__slot__ = ()
@dataclass(slots=T, kw_only=T, frozen=F)
class UserInput:
message: str = field(default='', kw_only=F)
src: BufferedReader = sys.stdin.buffer
raiseOnTimeout: bool = F
assertRegex: re.Pattern | N = N
echo: bool = T
timeout: int | N = N
decode: str | N | bool = 'utf8'
singleLineMode: bool = T
preClearInputBuffer: bool = T
_outputLog: BufferedWriter = sys.stderr.buffer
_autoReader: AutoReader = N
_isReDispatched: bool = F
@property
def ar(_):
return _._autoReader
@property
def a(_):
return _._autoReader.a
def __post_init__(_):
if isinstance(_.assertRegex, str):
_.assertRegex = re.compile(_.assertRegex, flags=re.I)
_._autoReader = AutoReader(_.src, threadMode=F)
def readGather(_, wait: float) -> bytearray | list[bytearray]:
assert wait > 0
return _.read(chars=2**20, gather=wait)
def readLine(_) -> bytearray | list[bytearray]:
return _.read(chars=0, gather=0)
def readChars(_, chars: int) -> bytearray | list[bytearray]:
return _.read(chars=chars, gather=0)
def readWrap(func) -> Any:
@functools.wraps(func)
def inner(_, *args, **kargs) -> Any:
if _._isReDispatched:
raise ValueError('Nested user input')
try:
_._isReDispatched = T
def run():
if _.preClearInputBuffer:
_._autoReader.clear()
if _.message:
_._outputLog.write(f'{cl.startOfLine}{_.message}'.encode())
try:
_._outputLog.flush()
except TypeError:
...
res = func(_, *args, **kargs)
if res is N:
return N
elif isinstance(_.decode, str):
res = res.decode(_.decode)
elif _.decode is T:
res = res.decode('utf8')
if _.assertRegex:
if not _.assertRegex.search(res):
raise InvalidUserIn(f'"{res}" not regex "{_.assertRegex}"')
return res
if _.src.isatty():
with TtyNoBufferCM(_.src), TtyNoEchoCM(_.src):
res = run()
else:
res = run()
finally:
_._isReDispatched = F
return res
return inner
@readWrap
def read(_, chars: int, gather: float) -> bytearray | list[bytearray]:
assert chars >= 0 and isinstance(chars, int)
assert gather >= 0 and isinstance(gather, float | int)
i = 0
while (i := i + 1) > 0:
# == line
if chars == 0:
if (gather and i == 2) or _.a.lineReady():
if _.singleLineMode:
return _.a.readLine()
else:
return [x for x in _.a.readLines()]
# == char
else:
if len(_.ar.a) >= chars or _.a.lineReady() or (gather and i == 2):
if _.singleLineMode:
return _.a.readChar(count=chars, stripNewLine=T)
else:
return _.a.read(end=chars, start=chars)
#| Reset the timer every time a new character is entered
# print('ready', _.ar.ready(), _.src)
if not _.ar.pipeReady(timeout=_.timeout):
if _.raiseOnTimeout:
raise UserInputTimeoutError()
return N
if gather and i == 1:
time.sleep(gather)
_._autoReader.readTry()
if _.echo:
...
# TODO use cl.underline to show length of count. try anssi to save char location then `cl.ul + (count*" ")`
_._outputLog.write(f'{cl.startOfLine}{cl.clearLine}{_.message}{cl.r}'.encode() + _.a.bytz)
try:
_._outputLog.flush()
except TypeError:
...
class QuitError(Exception):
__slot__ = ()
_quitReg = re.compile(r'^\!+$', flags=re.I)
def userSelectItem(
items: Iterable | Mapping,
/,
autoSelectSingle: bool = F,
condenseTextPrint: bool = T,
loop: bool = F,
reversePrint: bool = F,
header: int = 0,
quitSysExits=T,
raiseExcept=T,
timeout: int = 0,
) -> tuple[N, N] | tuple[Any, Any]:
if (iterType := isIterableType(items)) is IterableType.Map:
...
elif iterType is IterableType.Seq:
choiceNumSys = BaseNumberSystemMaker(_inputStringNumSysTokens)
choice = [''.join(choiceNumSys.fromInt(x)) for x in range(len(items))]
items = dict(zip(choice, [x for x in items]))
else:
raise TypeError(type(items))
keys = list(items.keys())[header:]
values = list(items.values())[header:]
maxWidth = max([len(str(x)) for x in keys])
keysStr = [str(x) for x in keys]
for i in keysStr:
if _quitReg.search(i):
raise ValueError(f'Can not use {_quitReg} in your keys. Used for quitting.')
if header:
keys = keys[header:]
values = values[header:]
keysStr = keysStr[header:]
if condenseTextPrint:
string = printCondenseList([f'[{cl.mag}{k}{cl.r} {cl.cyn}{v}{cl.r}]' for k, v in zip(keys, values)],
sortLen=F, left=T)
else:
string = '\n'.join([
f'{cl.mag}{k:>{maxWidth}}{cl.r} {cl.cyn}{v}{cl.r}' for k, v in zip(keys, values)
])
if reversePrint:
string = '\n'.join(string.split('\n')[::-1])
log.info(string, tgt='err')
inp = N
startLine = f'{cl.wht + " " * (maxWidth + 3) + cl.startOfLine} > {cl.r}'
if autoSelectSingle:
if len(values) == 0:
return (N, N)
elif len(values) == 1:
inp = keysStr[0]
log.info(startLine + inp, tgt='err', end='')
while inp is N or inp not in keysStr:
userIn = UserInput(
message=startLine,
timeout=timeout if timeout else N,
raiseOnTimeout=(T if timeout else F),
)
inp = userIn.readChars(chars=maxWidth)
if _quitReg.search(inp):
if quitSysExits:
log.error(f'USER EXIT', '')
sys.exit(1)
raise QuitError()
if (inp is N or inp not in keysStr):
log.error(f'"{inp}" not in', keys, tgt='err')
inp = N
if not loop:
if raiseExcept:
raise UserInException()
log.info(start='')
index = keysStr.index(inp)
if iterType is IterableType.Seq:
i = choiceNumSys.toInt([x for x in keys[index]])
else:
i = keys[index]
return (i, values[index])
@dataclass(slots=T, kw_only=T, frozen=F)
class ConfirmContinue:
successChar: str = 'y'
failChar: str = 'q'
successDetail: str = 'continue'
failDetail: str = 'quit'
question: str = ''
msgFmt: str = f"{cl.wht}{{q}}{cl.r}{cl.grn}{{sc}}={{sd}}{cl.r} {cl.red}{{fc}}={{fd}}{cl.r} {cl.cyn}>{cl.r} "
showMsg: bool = T
def __str__(_) -> N:
return _.msgFmt.format(
sc=_.successChar, fc=_.failChar, sd=_.successDetail, fd=_.failDetail, q=_.question
)
def prompt(_, **kargs) -> bool:
res = N
args = dict(
message=str(_) if _.showMsg else N, echo=T, preClearInputBuffer=T, timeout=N, raiseOnTimeout=T
) | kargs
while res is N:
uin = UserInput(**args).readGather(.1)
if uin.lower() == _.successChar:
res = T
elif uin.lower() == _.failChar:
res = F
if _.showMsg:
log.info(start='', tgt='err')
return res
def yesNo(question: str = '', **kargs):
return ConfirmContinue(
failChar='n', question=question,
msgFmt=f"{cl.wht}{{q}}{cl.r} ({cl.grn}{{sc}}{cl.r}/{cl.red}{{fc}}{cl.r}) {cl.cyn}>{cl.r} "
).prompt(**kargs)
def confirmContinue(
msg: str = f"{cl.grn}y=continue{cl.r} {cl.red}q=quit{cl.r} {cl.cyn}>{cl.r} ",
**kargs,
) -> bool:
return ConfirmContinue(msgFmt=msg).prompt(**kargs)
def readLinesFromAnywhere(inList: list[str] = N, /, *args, ttyReader: UserInput = N) -> Generator[str, N, N]:
if inList:
for line in inList:
yield line
elif args:
for line in args:
yield line
elif sys.stdin.isatty():
userIn = AutoReader(sys.stdin.buffer, threadMode=T)
for line in userIn.keepReading():
yield line.decode('utf8')
else:
while (line := sys.stdin.readline()):
yield line
'''
# type(sys.stdin.buffer)
io.BufferedReader
# type(io.BufferedReader(io.BytesIO(b'')))
_io.BufferedReader
# sys.stdout.fileno()
1
# io.BufferedReader(io.BytesIO(b'')).fileno()
UnsupportedOperation: fileno
'''