From 526421e340b3601111d6f92b3ea92bc9400b3004 Mon Sep 17 00:00:00 2001 From: Roger Gonzalez Date: Wed, 4 Dec 2024 20:06:17 -0300 Subject: [PATCH] Refactor Bitwarden to KeePass conversion logic --- bitwarden-to-keepass.py | 217 +++++++++++++++++++++++++++------------- 1 file changed, 148 insertions(+), 69 deletions(-) diff --git a/bitwarden-to-keepass.py b/bitwarden-to-keepass.py index 6377c02..0ad3c88 100644 --- a/bitwarden-to-keepass.py +++ b/bitwarden-to-keepass.py @@ -29,6 +29,144 @@ logging.basicConfig( kp: Optional[PyKeePass] = None +def initialize_keepass_db(database_path: str, password: str, keyfile: Optional[str] = None) -> Optional[PyKeePass]: + """Initialize or open a KeePass database. + + Args: + database_path: Path to KeePass database file + password: Password for KeePass database + keyfile: Optional path to key file + + Returns: + PyKeePass instance if successful, None if failed + """ + try: + kp = PyKeePass(database_path, password=password, keyfile=keyfile) + except FileNotFoundError: + logging.info("KeePass database does not exist, creating a new one.") + kp = create_database(database_path, password=password, keyfile=keyfile) + except CredentialsError as e: + logging.error(f"Wrong password for KeePass database: {e}") + return None + return kp + + +def fetch_bitwarden_data(bw_path: str, bw_session: str) -> tuple[list, list]: + """Fetch folders and items from Bitwarden vault. + + Args: + bw_path: Path to Bitwarden CLI executable + bw_session: Bitwarden session token + + Returns: + Tuple containing (folders, items) lists from Bitwarden + """ + folders = subprocess.check_output([bw_path, "list", "folders", "--session", bw_session], encoding="utf8") + folders = json.loads(folders) + + items = subprocess.check_output([bw_path, "list", "items", "--session", bw_session], encoding="utf8") + items = json.loads(items) + + return folders, items + + +def process_entry_title(kp: PyKeePass, group: KPGroup, title: str, item_id: str) -> str: + """Generate a unique title for a KeePass entry. + + Args: + kp: KeePass database instance + group: KeePass group to check for duplicates + title: Desired entry title + item_id: Bitwarden item ID for fallback + + Returns: + Unique entry title + """ + if not kp.find_entries(title=title, group=group, first=True): + return title + return f"{title} - ({item_id})" + + +def create_keepass_entry(kp: PyKeePass, bw_item: Item, group: KPGroup) -> Optional[KPEntry]: + """Create a new KeePass entry from a Bitwarden item. + + Args: + kp: KeePass database instance + bw_item: Bitwarden item wrapper + group: KeePass group to add entry to + + Returns: + Created KeePass entry or None if creation failed + """ + try: + entry_title = process_entry_title(kp, group, bw_item.get_name(), bw_item.get_id()) + return kp.add_entry( + destination_group=group, + title=entry_title, + username=bw_item.get_username(), + password=bw_item.get_password(), + notes=bw_item.get_notes(), + ) + except Exception as e: + logging.warning(f'Failed to create entry "{bw_item.get_name()}": {repr(e)}') + return None + + +def add_totp_to_entry(entry: KPEntry, bw_item: Item) -> None: + """Add TOTP configuration to KeePass entry. + + Args: + entry: KeePass entry to modify + bw_item: Bitwarden item containing TOTP data + """ + totp_secret, totp_settings = bw_item.get_totp() + if totp_secret and totp_settings: + entry.set_custom_property("TOTP Seed", totp_secret, protect=True) + entry.set_custom_property("TOTP Settings", totp_settings) + + +def add_custom_fields_to_entry(entry: KPEntry, bw_item: Item) -> None: + """Add custom fields from Bitwarden item to KeePass entry. + + Args: + entry: KeePass entry to modify + bw_item: Bitwarden item containing custom fields + """ + for field in bw_item.get_custom_fields(): + entry.set_custom_property( + field["name"], + field["value"], + protect=field["type"] == CustomFieldType.HIDDEN, + ) + + +def add_attachments_to_entry(entry: KPEntry, bw_item: Item, bw_path: str, bw_session: str) -> None: + """Add attachments from Bitwarden item to KeePass entry. + + Args: + entry: KeePass entry to modify + bw_item: Bitwarden item containing attachments + bw_path: Path to Bitwarden CLI executable + bw_session: Bitwarden session token + """ + for attachment in bw_item.get_attachments(): + attachment_raw = subprocess.check_output( + [ + bw_path, + "get", + "attachment", + attachment["id"], + "--raw", + "--itemid", + bw_item.get_id(), + "--session", + bw_session, + ], + ) + attachment_id = entry._kp.add_binary(attachment_raw) + entry.add_attachment(attachment_id, attachment["fileName"]) + + def bitwarden_to_keepass(args): """Convert Bitwarden vault items to KeePass database entries. @@ -43,22 +181,13 @@ def bitwarden_to_keepass(args): Returns: PyKeePass: The KeePass database instance with imported items """ - try: - kp = PyKeePass(args.database_path, password=args.database_password, keyfile=args.database_keyfile) - except FileNotFoundError: - logging.info("KeePass database does not exist, creating a new one.") - kp = create_database(args.database_path, password=args.database_password, keyfile=args.database_keyfile) - except CredentialsError as e: - logging.error(f"Wrong password for KeePass database: {e}") + kp = initialize_keepass_db(args.database_path, args.database_password, args.database_keyfile) + if not kp: return None - folders = subprocess.check_output([args.bw_path, "list", "folders", "--session", args.bw_session], encoding="utf8") - folders = json.loads(folders) + folders, items = fetch_bitwarden_data(args.bw_path, args.bw_session) groups_by_id = load_folders(kp, folders) logging.info(f"Folders done ({len(groups_by_id)}).") - - items = subprocess.check_output([args.bw_path, "list", "items", "--session", args.bw_session], encoding="utf8") - items = json.loads(items) logging.info(f"Starting to process {len(items)} items.") for item in items: @@ -67,66 +196,16 @@ def bitwarden_to_keepass(args): continue bw_item = Item(item) + entry = create_keepass_entry(kp, bw_item, groups_by_id[bw_item.get_folder_id()]) - try: - is_duplicate_title = False - while True: - entry_title = ( - bw_item.get_name() - if not is_duplicate_title - else "{name} - ({item_id}".format(name=bw_item.get_name(), item_id=bw_item.get_id()) - ) - try: - entry = kp.add_entry( - destination_group=groups_by_id[bw_item.get_folder_id()], - title=entry_title, - username=bw_item.get_username(), - password=bw_item.get_password(), - notes=bw_item.get_notes(), - ) - break - except Exception as e: - if "already exists" in str(e): - is_duplicate_title = True - continue - raise - - totp_secret, totp_settings = bw_item.get_totp() - if totp_secret and totp_settings: - entry.set_custom_property("TOTP Seed", totp_secret, protect=True) - entry.set_custom_property("TOTP Settings", totp_settings) - - uris = [uri["uri"] for uri in bw_item.get_uris()] - set_kp_entry_urls(entry, uris) - - for field in bw_item.get_custom_fields(): - entry.set_custom_property( - field["name"], - field["value"], - protect=field["type"] == CustomFieldType.HIDDEN, - ) - - for attachment in bw_item.get_attachments(): - attachment_raw = subprocess.check_output( - [ - args.bw_path, - "get", - "attachment", - attachment["id"], - "--raw", - "--itemid", - bw_item.get_id(), - "--session", - args.bw_session, - ], - ) - attachment_id = kp.add_binary(attachment_raw) - entry.add_attachment(attachment_id, attachment["fileName"]) - - except Exception as e: - logging.warning(f'Skipping item named "{item["name"]}" because of this error: {repr(e)}') + if not entry: continue + add_totp_to_entry(entry, bw_item) + set_kp_entry_urls(entry, [uri["uri"] for uri in bw_item.get_uris()]) + add_custom_fields_to_entry(entry, bw_item) + add_attachments_to_entry(entry, bw_item, args.bw_path, args.bw_session) + logging.info("Saving changes to KeePass database.") kp.save() logging.info("Export completed.")