defl/defl/pass_.py
2025-04-28 14:44:03 -04:00

401 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# TODO decrypt key when required not before
# TODO replace everything from /usr/bin/pass
# todo pid map to remove unexpected exit shell files
import itertools
import os
import re
import sys
from contextlib import contextmanager
from dataclasses import dataclass, field
from subprocess import PIPE, Popen
from typing import Generator
from ._ansii_ import cl
from ._argsFromObject2_ import amCliEntryPoint
from ._basic_ import dictTryKeys, splitListIntoListOfListByFilter
from ._clipboard_ import Clipboard
from ._logger_ import log
from ._path_ import Dath
from ._pathUtils_ import changeWorkingDir, Find
from ._run_ import Run
from ._time_ import Time
from ._typing_ import *
from ._pydantic_ import *
from ._userInput_ import UserInput, confirmContinue
from ._tty_ import anyNotTty
from .gocryptfs_ import Gocryptfs, GocryptfsConfNotFoundError
from .otp_ import OTP
class PassKeyNotFoundError(Exception):
__slot__ = ()
class PassKeyAlreadyExistsError(Exception):
__slot__ = ()
class PassEditError(Exception):
__slot__ = ()
class PassDecryptError(Exception):
__slot__ = ()
class ContinueError(Exception):
__slot__ = ()
@dataclass(slots=T, kw_only=T, frozen=F)
class PassDBKey:
root: Dath
relative: Dath
def __post_init__(_) -> N:
_.root = Dath(_.root)
_.relative = Dath(_.relative)
if _.relative.isRelativeTo(_.root):
_.relative = _.relative.relativeTo(_.root)
assert not _.relative.isAbsolute()
_absolute: Dath | N = field(default=N, kw_only=T, repr=F)
@property
def absolute(_) -> Dath:
if _._absolute is N:
_._absolute = _._absoluteGet()
return _._absolute
def _absoluteGet(_) -> Dath:
return _.root / _.relative
def assertExists(_) -> Self:
if not _.exists():
raise PassKeyNotFoundError(_.relative)
return _
def assertNotExists(_) -> Self:
if _.exists():
raise PassKeyAlreadyExistsError(_.relative)
return _
def exists(_):
return _.absolute.exists()
@dataclass(slots=T, kw_only=T, frozen=F)
class PassDB:
# TODO convert to Dant when argsfrom object will work. `pass.py` uses py dataclass only
passPath: Dath = field(kw_only=F, repr=T)
idleMin: int = 1
gocryptfs: Gocryptfs = N
def __post_init__(_) -> N:
if inst(_.passPath, str):
_.passPath = Dath(_.passPath)
if _.gocryptfs is N:
_.gocryptfs = Gocryptfs(path=_.passPath)
# ◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼
# ◼◼ DB management
# ◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼
_clearPath: Dath | N = field(default=N, repr=F) # | property not set
@property
def clearPath(_) -> Dath:
if _._clearPath is N:
_._clearPath = _._clearPathGet()
return _._clearPath
def _clearPathGet(_) -> Dath:
return _.gocryptfs.mountPath
@property
def git(_) -> bool:
return (_.clearPath / '.git').isdir()
def gitCommit(_) -> bool:
if _.git:
cmd = ['git', '-C', _.clearPath, 'add', '.']
Run(cmd).run(T, T).assSuc()
# TODO check dirty
cmd = ['git', '-C', _.clearPath, 'commit', '-m', f'{Time()}']
Run(cmd).run(T, T).assSuc()
return T
return F
def getPassKey(_, key: str | Dath | PassDBKey) -> PassDBKey:
_.mount()
if inst(key, PassDBKey):
return key
return PassDBKey(root=_.clearPath, relative=key)
# ◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼
# ◼◼ pass management
# ◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼◼
def isMounted_(_) -> Self:
return _.gocryptfs.isMounted()
def mount(_, fromStdin: bool = F) -> Self:
_.gocryptfs.mount(idleMin=_.idleMin, fromStdin=fromStdin)
return _
def _openEditor(
_,
path: str | Dath,
editor: list = N,
askContinue: bool = F,
_runArgs: tuple[Any, Any, Any] | N = N, # | testing
):
anyNotTty(throw=T)
CLI_STEM = T
if _runArgs is N:
_runArgs = ('so', 'se', 'si')
if editor is N:
user = os.environ['USER']
editor = [
*['vim', '-c'],
' | '.join(
[
f'set nobackup',
'set directory=/dev/shm/{user}',
'set nowritebackup',
'set nonumber',
'set noswapfile',
'set undodir=/dev/shm/{user}',
'startinsert',
]
),
]
if isinstance(editor, str):
cmd = ['bash', '-c', editor.format(path=path)]
else:
cmd = [*editor, '--', path]
run = Run(cmd).run(*_runArgs).wait()
if run.rc != 0:
raise PassEditError(f'code={popen.returncode}')
if askContinue:
if not confirmContinue():
raise ContinueError()
def edit(_, key: str | Dath, editor: list | str | N = N, askContinue: bool = F, _runArgs: tuple = N) -> Self:
with _.tempEdit(key=key) as path:
return _._openEditor(path=path, editor=editor, askContinue=askContinue, _runArgs=_runArgs)
return _
def insert(_, key: str | Dath, data: str = N, force: bool = F) -> Self:
key = _.getPassKey(key)
if not force:
key.assertNotExists()
if data is N:
data = sys.stdin.read()
_.mount() # | timeout could unmount
key.absolute.parent.mkdirForce()
key.absolute.write(data)
return _
@contextmanager
def tempEdit(_, key: str) -> Generator[Dath, None, None]:
key = _.getPassKey(key)
shm = Dath(f'/dev/shm/{os.environ["USER"]}/passdb/')
shm.mkdirForce()
key.absolute.parent.mkdirForce()
# | use temp dir because "When a process has open files or its working directory in the mount, this will keep it not idle indefinitely." -- `man gocryptfs`
fileName = re.sub(r'/', '', str(key.relative))
tmp = shm.tempFile(fmt='{d}_{r}/' + fileName, create=F, autoRemove=F)
def _rm():
tmp.remove()
tmp.makeDir() # | make new folder to prevent accidental save afte delete
tmp.addOnDelFunc(_rm)
tmp.parent.makeDir()
if key.absolute.exists():
key.absolute.copy(tmp)
pre = tmp.sha256()
else:
pre = None
yield tmp
if pre == tmp.sha256():
if amCliEntryPoint():
log.info('File is the same')
return
_.mount()
tmp.copy(key.absolute)
_.gitCommit()
def passwd(_, key: str | Dath) -> str:
res = next(_.readLines(key)).decode()
if amCliEntryPoint():
log.info(res, end='')
return res
def exists(_, key: str) -> bool:
key = _.getPassKey(key)
if amCliEntryPoint():
sys.exit(0 if key.absolute.exists() else 1)
return key.absolute.exists()
def readLines(_, key: str, strip: bool = T) -> Generator[bytes, N, N]:
key = _.getPassKey(key)
key.assertExists()
with key.absolute.open('rb') as fp:
while line := fp.readline():
yield line.rstrip(b'\n') if strip else line
def cat(_, key: str) -> list[str] | N:
res = [x.decode() for x in _.readLines(key)]
if amCliEntryPoint():
if not res:
return res
log.info(f'{cl.yel}{res[0]}{cl.r}')
res = res[1:]
while res and res[-1] == '':
res = res[:-1]
top = T
for line in res:
if len(line) == 0:
top = F
if top and ':' in line:
line = line.split(':', maxsplit=1)
log.info(f'{cl.wht}{line[0]}{cl.r}:{cl.grn}{line[1]}{cl.r}')
else:
top = F
log.info(line)
return res
def generatePass(
_,
key: str | Dath,
force: bool = F,
length: int = 120,
noLower: bool = T,
noUpper: bool = T,
noDigits: bool = T,
noPunctuation: bool = T,
):
from .crypt_ import PasswordGenerator
key = _.getPassKey(key)
args = ['length', 'noLower', 'noUpper', 'noDigits', 'noPunctuation']
args = {k: locals()[k] for k in args}
newPasswd = PasswordGenerator(**args).generateStr()
if not key.absolute.exists():
return _.insert(key=key, data=newPasswd)
with _.tempEdit(key=key) as tmpPath:
text = _.cat(key)
if text:
oldPwd = text[0]
text[0] = newPasswd
text = list(itertools.dropwhile(lambda x: not x.strip(), text[::-1]))
text = text[::-1] # | remove empty end lines
if not text[-1].startswith('oldPass | '):
text.append('')
text.append(f'oldPass | {Time()} | {oldPwd}')
tmpPath.writeText('\n'.join(text))
else:
tmpPath.writeText(newPasswd)
if amCliEntryPoint():
_._openEditor(tmpPath)
if confirmContinue():
Clipboard().copy(newPasswd)
else:
raise PassEditError()
return _
def _allKeys(_) -> Generator[Dath, N, N]:
_.mount()
yield from Find().setType('f').setExclude('_trash/', '.git/').run(_.clearPath, relTo=T)
def otpToken(_, key: str | Dath, copy: bool = T):
otp = dictTryKeys(_.toDict(key), 'otp', '2fa')
log.debug('otp', lambda x: x.otp)
if otp:
otp = OTP(cert=otp).generate()
if amCliEntryPoint():
log.info(otp)
return otp
else:
if amCliEntryPoint():
log.error('No "otp" or "2fa" found.')
sys.exit(1)
def toDict(_, key: str | Dath):
key = _.getPassKey(key)
key.assertExists()
res = {}
res['key'] = key.relative
res['url'] = key.relative.parent.name
res['username'] = key.relative.name
res['password'] = ''
res['rootPath'] = _.clearPath.absolute()
res['fullPath'] = key.absolute
out = _.cat(key=key)
# TODO iterate not splitListIntoListOfListByFilter
out = splitListIntoListOfListByFilter(out, lambda x: re.search(r'^\s*$', x))
if isinstance(out, list):
if out[0]: # reg pass (first line is pass. All touching lines to json. Stop at empty line)
res['password'] = out[0][0]
out = out[0][1:]
else: # card (first line is empty)
if len(out) == 1:
raise PassKeyNotFoundError(key)
out = out[1]
for line in out:
if toJsonLine := re.findall(r'^([^:]*):(.*)$', line):
res[toJsonLine[0][0].strip().lower()] = toJsonLine[0][1].strip()
for i in ['number']:
if i in res:
res[i] = re.compile(r' ', flags=re.I).sub(r'', res[i]) # remove space
if amCliEntryPoint():
log.info(lambda jc: jc.res)
return res
def keyPath(_, key: str | Dath):
key = _.getPassKey(key).assertExists()
path = key.absolute
if amCliEntryPoint():
log.info(path)
return path
def rm(_, key: str | Dath) -> Self:
key = _.getPassKey(key)
key.assertExists()
trash = _.clearPath / '_trash' / key.relative
trash.parent.mkdir(exist_ok=T, parents=T)
if amCliEntryPoint() is T:
log.info(f'mv {cl.cyn}{key.absolute}{cl.r}\n\t{cl.yel}{trash}{cl.r}')
key.absolute.rename(trash)
return _
@classmethod
def FromStr(cls, string: str) -> tuple[Self, str]:
string = str(string)
if '+' in string:
i = string.index('+')
passPath = string[:i]
key = string[i + 1 :]
else:
passPath = '~/pass/simple/'
key = string
return (cls(passPath), key)