Do not download attachments to tmpfs, instead save them directly to KeePass database

This commit is contained in:
David Němec 2022-06-11 15:52:33 +02:00
parent 5aff563599
commit 76604fcbe6
No known key found for this signature in database
GPG Key ID: B1064EFFFD11AA75

View File

@ -6,7 +6,6 @@ import subprocess
from argparse import ArgumentParser from argparse import ArgumentParser
from typing import Dict, List, Optional from typing import Dict, List, Optional
from shlex import quote
from pykeepass import PyKeePass, create_database from pykeepass import PyKeePass, create_database
from pykeepass.exceptions import CredentialsError from pykeepass.exceptions import CredentialsError
@ -34,12 +33,12 @@ def bitwarden_to_keepass(args):
logging.error(f'Wrong password for KeePass database: {e}') logging.error(f'Wrong password for KeePass database: {e}')
return return
folders = subprocess.check_output(f'{quote(args.bw_path)} list folders --session {quote(args.bw_session)}', shell=True, encoding='utf8') folders = subprocess.check_output([args.bw_path, 'list', 'folders', '--session', args.bw_session], encoding='utf8')
folders = json.loads(folders) folders = json.loads(folders)
groups_by_id = load_folders(folders) groups_by_id = load_folders(folders)
logging.info(f'Folders done ({len(groups_by_id)}).') logging.info(f'Folders done ({len(groups_by_id)}).')
items = subprocess.check_output(f'{quote(args.bw_path)} list items --session {quote(args.bw_session)}', shell=True, encoding='utf8') items = subprocess.check_output([args.bw_path, 'list', 'items', '--session', args.bw_session], encoding='utf8')
items = json.loads(items) items = json.loads(items)
logging.info(f'Starting to process {len(items)} items.') logging.info(f'Starting to process {len(items)} items.')
for item in items: for item in items:
@ -81,14 +80,12 @@ def bitwarden_to_keepass(args):
entry.set_custom_property(field['name'], field['value']) entry.set_custom_property(field['name'], field['value'])
for attachment in bw_item.get_attachments(): for attachment in bw_item.get_attachments():
attachment_tmp_path = f'/tmp/attachment/{attachment["fileName"]}' attachment_raw = subprocess.check_output([
attachment_path = subprocess.check_output(f'{quote(args.bw_path)} get attachment' args.bw_path, 'get', 'attachment', attachment['id'], '--raw', '--itemid', bw_item.get_id(),
f' --raw {quote(attachment["id"])} ' '--session', args.bw_session,
f'--itemid {quote(bw_item.get_id())} ' ])
f'--output {quote(attachment_tmp_path)} --session {quote(args.bw_session)}', shell=True, encoding='utf8').rstrip() attachment_id = kp.add_binary(attachment_raw)
attachment_id = kp.add_binary(open(attachment_path, 'rb').read())
entry.add_attachment(attachment_id, attachment['fileName']) entry.add_attachment(attachment_id, attachment['fileName'])
os.remove(attachment_path)
except Exception as e: except Exception as e:
logging.warning(f'Skipping item named "{item["name"]}" because of this error: {repr(e)}') logging.warning(f'Skipping item named "{item["name"]}" because of this error: {repr(e)}')
@ -128,6 +125,7 @@ def load_folders(folders) -> Dict[str, KPGroup]:
return groups_by_id return groups_by_id
def check_args(args): def check_args(args):
if args.database_keyfile: if args.database_keyfile:
if not os.path.isfile(args.database_keyfile) or not os.access(args.database_keyfile, os.R_OK): if not os.path.isfile(args.database_keyfile) or not os.access(args.database_keyfile, os.R_OK):