diff --git a/bitwarden-to-keepass.py b/bitwarden-to-keepass.py index bd1bdc9..6377c02 100644 --- a/bitwarden-to-keepass.py +++ b/bitwarden-to-keepass.py @@ -1,154 +1,193 @@ +"""Export Bitwarden items into a KeePass database. + +This module provides functionality to export items from a Bitwarden vault into a KeePass database, +including logins (with TOTP seeds, URIs, custom fields, attachments, notes) and secure notes. +""" + import json import logging import os import re import subprocess - from argparse import ArgumentParser from typing import Dict, List, Optional from pykeepass import PyKeePass, create_database +from pykeepass.entry import Entry as KPEntry from pykeepass.exceptions import CredentialsError from pykeepass.group import Group as KPGroup -from pykeepass.entry import Entry as KPEntry import folder as FolderType -from item import Item, ItemType, CustomFieldType +from item import CustomFieldType, Item, ItemType logging.basicConfig( level=logging.INFO, - format='%(asctime)s :: %(levelname)s :: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', + format="%(asctime)s :: %(levelname)s :: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) kp: Optional[PyKeePass] = None + def bitwarden_to_keepass(args): - global kp + """Convert Bitwarden vault items to KeePass database entries. + + Args: + args: ArgumentParser namespace containing configuration options including: + - database_path: Path to KeePass database + - database_password: Password for KeePass database + - database_keyfile: Optional path to key file + - bw_path: Path to Bitwarden CLI executable + - bw_session: Bitwarden session token + + Returns: + PyKeePass: The KeePass database instance with imported items + """ try: kp = PyKeePass(args.database_path, password=args.database_password, keyfile=args.database_keyfile) except FileNotFoundError: - logging.info('KeePass database does not exist, creating a new one.') + logging.info("KeePass database does not exist, creating a new one.") kp = create_database(args.database_path, password=args.database_password, keyfile=args.database_keyfile) except CredentialsError as e: - logging.error(f'Wrong password for KeePass database: {e}') - return + logging.error(f"Wrong password for KeePass database: {e}") + return None - folders = subprocess.check_output([args.bw_path, 'list', 'folders', '--session', args.bw_session], encoding='utf8') + folders = subprocess.check_output([args.bw_path, "list", "folders", "--session", args.bw_session], encoding="utf8") folders = json.loads(folders) - groups_by_id = load_folders(folders) - logging.info(f'Folders done ({len(groups_by_id)}).') + groups_by_id = load_folders(kp, folders) + logging.info(f"Folders done ({len(groups_by_id)}).") - items = subprocess.check_output([args.bw_path, 'list', 'items', '--session', args.bw_session], encoding='utf8') + items = subprocess.check_output([args.bw_path, "list", "items", "--session", args.bw_session], encoding="utf8") items = json.loads(items) - logging.info(f'Starting to process {len(items)} items.') + logging.info(f"Starting to process {len(items)} items.") + for item in items: - if item['type'] in [ItemType.CARD, ItemType.IDENTITY]: + if item["type"] in [ItemType.CARD, ItemType.IDENTITY]: logging.warning(f'Skipping credit card or identity item "{item["name"]}".') continue bw_item = Item(item) - is_duplicate_title = False try: + is_duplicate_title = False while True: - entry_title = bw_item.get_name() if not is_duplicate_title else '{name} - ({item_id}'.format(name=bw_item.get_name(), item_id=bw_item.get_id()) + entry_title = ( + bw_item.get_name() + if not is_duplicate_title + else "{name} - ({item_id}".format(name=bw_item.get_name(), item_id=bw_item.get_id()) + ) try: entry = kp.add_entry( destination_group=groups_by_id[bw_item.get_folder_id()], title=entry_title, username=bw_item.get_username(), password=bw_item.get_password(), - notes=bw_item.get_notes() + notes=bw_item.get_notes(), ) break except Exception as e: - if 'already exists' in str(e): + if "already exists" in str(e): is_duplicate_title = True continue raise totp_secret, totp_settings = bw_item.get_totp() if totp_secret and totp_settings: - entry.set_custom_property('TOTP Seed', totp_secret, protect=True) - entry.set_custom_property('TOTP Settings', totp_settings) + entry.set_custom_property("TOTP Seed", totp_secret, protect=True) + entry.set_custom_property("TOTP Settings", totp_settings) - uris = [uri['uri'] for uri in bw_item.get_uris()] + uris = [uri["uri"] for uri in bw_item.get_uris()] set_kp_entry_urls(entry, uris) for field in bw_item.get_custom_fields(): entry.set_custom_property( - field['name'], - field['value'], - protect=field['type'] == CustomFieldType.HIDDEN, + field["name"], + field["value"], + protect=field["type"] == CustomFieldType.HIDDEN, ) for attachment in bw_item.get_attachments(): - attachment_raw = subprocess.check_output([ - args.bw_path, 'get', 'attachment', attachment['id'], '--raw', '--itemid', bw_item.get_id(), - '--session', args.bw_session, - ]) + attachment_raw = subprocess.check_output( + [ + args.bw_path, + "get", + "attachment", + attachment["id"], + "--raw", + "--itemid", + bw_item.get_id(), + "--session", + args.bw_session, + ], + ) attachment_id = kp.add_binary(attachment_raw) - entry.add_attachment(attachment_id, attachment['fileName']) + entry.add_attachment(attachment_id, attachment["fileName"]) except Exception as e: logging.warning(f'Skipping item named "{item["name"]}" because of this error: {repr(e)}') continue - logging.info('Saving changes to KeePass database.') + logging.info("Saving changes to KeePass database.") kp.save() - logging.info('Export completed.') + logging.info("Export completed.") + return kp def set_kp_entry_urls(entry: KPEntry, urls: List[str]) -> None: - """Store a list of URLs comming from a Bitwarden entry in different - attributes and custom properties of a KeePass entry, depending on whether - it's an identifier for an Android or iOS app or it's a generic URL""" + """Store a list of URLs from a Bitwarden entry in KeePass entry attributes. + + Maps URLs to different KeePass attributes and custom properties based on their type: + - Android app identifiers + - iOS app identifiers + - Generic URLs + + Args: + entry: KeePass entry object to store URLs in + urls: List of URL strings from Bitwarden + """ android_apps = ios_apps = extra_urls = 0 for url in urls: - match url.partition('://'): - case ('androidapp', '://', app_id): - # It's an Android app registered by Bitwarden's mobile app - # Store multiple apps in AndroidApp, AndroidApp_1, etc. so that KeePassDX's autofill picks it up + match url.partition("://"): + case ("androidapp", "://", app_id): prop_name = "AndroidApp" if android_apps == 0 else f"AndroidApp_{android_apps}" android_apps += 1 entry.set_custom_property(prop_name, app_id) - case ('iosapp', '://', app_id): - # It's an iOS app registered by Bitwarden's mobile app - # XXX Maybe properly set up autofill for a macOS/iPhone/iPad KeePass-compatible app like StrongBox or Keepassium + case ("iosapp", "://", app_id): ios_apps += 1 - entry.set_custom_property(f'iOS app #{ios_apps}', app_id) + entry.set_custom_property(f"iOS app #{ios_apps}", app_id) case _: - # Assume it's a generic URL. - # First one goes to the standard URL attribute and the remaining ones go to URL_1, URL_2 and so on if entry.url is None: entry.url = url else: extra_urls += 1 - entry.set_custom_property(f'URL_{extra_urls}', url) + entry.set_custom_property(f"URL_{extra_urls}", url) -def load_folders(folders) -> Dict[str, KPGroup]: - # sort folders so that in the case of nested folders, the parents would be guaranteed to show up before the children - folders.sort(key=lambda x: x['name']) +def load_folders(kp: PyKeePass, folders) -> Dict[str, KPGroup]: + """Create KeePass folder structure from Bitwarden folders. + + Args: + kp: KeePass database instance + folders: List of folder objects from Bitwarden + + Returns: + Dictionary mapping Bitwarden folder IDs to KeePass group objects + """ + folders.sort(key=lambda x: x["name"]) - # dict to store mapping of Bitwarden folder id to keepass group groups_by_id: Dict[str, KPGroup] = {} - # build up folder tree folder_root: FolderType.Folder = FolderType.Folder(None) folder_root.keepass_group = kp.root_group groups_by_id[None] = kp.root_group for folder in folders: - if folder['id'] is not None: - new_folder: FolderType.Folder = FolderType.Folder(folder['id']) - # regex lifted from https://github.com/bitwarden/jslib/blob/ecdd08624f61ccff8128b7cb3241f39e664e1c7f/common/src/services/folder.service.ts#L108 - folder_name_parts: List[str] = re.sub(r'^\/+|\/+$', '', folder['name']).split('/') - FolderType.nested_traverse_insert(folder_root, folder_name_parts, new_folder, '/') + if folder["id"] is not None: + new_folder: FolderType.Folder = FolderType.Folder(folder["id"]) + folder_name_parts: List[str] = re.sub(r"^\/+|\/+$", "", folder["name"]).split("/") + FolderType.nested_traverse_insert(folder_root, folder_name_parts, new_folder, "/") - # create keepass groups based off folder tree def add_keepass_group(folder: FolderType.Folder): parent_group: KPGroup = folder.parent.keepass_group new_group: KPGroup = kp.add_group(parent_group, folder.name) @@ -161,50 +200,64 @@ def load_folders(folders) -> Dict[str, KPGroup]: def check_args(args): - if args.database_keyfile: - if not os.path.isfile(args.database_keyfile) or not os.access(args.database_keyfile, os.R_OK): - logging.error('Key File for KeePass database is not readable.') - return False + """Validate command line arguments. + + Args: + args: ArgumentParser namespace containing runtime configuration + + Returns: + bool: True if arguments are valid, False otherwise + """ + if args.database_keyfile and ( + not os.path.isfile(args.database_keyfile) or not os.access(args.database_keyfile, os.R_OK) + ): + logging.error("Key File for KeePass database is not readable.") + return False if not os.path.isfile(args.bw_path) or not os.access(args.bw_path, os.X_OK): - logging.error('bitwarden-cli was not found or not executable. Did you set correct \'--bw-path\'?') + logging.error("bitwarden-cli was not found or not executable. Did you set correct '--bw-path'?") return False return True def environ_or_required(key): - return ( - {'default': os.environ.get(key)} if os.environ.get(key) - else {'required': True} - ) + """Get argument configuration based on environment variable presence. + + Args: + key: Environment variable name to check + + Returns: + dict: ArgumentParser argument configuration + """ + return {"default": os.environ.get(key)} if os.environ.get(key) else {"required": True} parser = ArgumentParser() parser.add_argument( - '--bw-session', - help='Session generated from bitwarden-cli (bw login)', - **environ_or_required('BW_SESSION'), + "--bw-session", + help="Session generated from bitwarden-cli (bw login)", + **environ_or_required("BW_SESSION"), ) parser.add_argument( - '--database-path', - help='Path to KeePass database. If database does not exists it will be created.', - **environ_or_required('DATABASE_PATH'), + "--database-path", + help="Path to KeePass database. If database does not exists it will be created.", + **environ_or_required("DATABASE_PATH"), ) parser.add_argument( - '--database-password', - help='Password for KeePass database', - **environ_or_required('DATABASE_PASSWORD'), + "--database-password", + help="Password for KeePass database", + **environ_or_required("DATABASE_PASSWORD"), ) parser.add_argument( - '--database-keyfile', - help='Path to Key File for KeePass database', - default=os.environ.get('DATABASE_KEYFILE', None), + "--database-keyfile", + help="Path to Key File for KeePass database", + default=os.environ.get("DATABASE_KEYFILE", None), ) parser.add_argument( - '--bw-path', - help='Path for bw binary', - default=os.environ.get('BW_PATH', 'bw'), + "--bw-path", + help="Path for bw binary", + default=os.environ.get("BW_PATH", "bw"), ) args = parser.parse_args() diff --git a/folder.py b/folder.py index 34eb9d5..5fac7ea 100644 --- a/folder.py +++ b/folder.py @@ -1,34 +1,70 @@ +"""Folder management functionality for Bitwarden to KeePass conversion. + +Provides classes and functions for managing folder hierarchies when converting +from Bitwarden's folder structure to KeePass groups. +""" + import collections from typing import Callable, Deque, List, Optional from pykeepass.group import Group as KPGroup + class Folder: + """Represents a folder in the Bitwarden/KeePass hierarchy. + + Attributes: + id: Bitwarden folder identifier + name: Display name of the folder + children: List of child folders + parent: Parent folder reference + keepass_group: Associated KeePass group object + """ + id: Optional[str] name: Optional[str] - children: List['Folder'] - parent: Optional['Folder'] + children: List["Folder"] + parent: Optional["Folder"] keepass_group: Optional[KPGroup] - def __init__(self, id: Optional[str]): - self.id = id + def __init__(self, id_: Optional[str]): + """Initialize a new folder. + + Args: + folder_id: Bitwarden folder identifier + """ + self.id = id_ self.name = None self.children = [] self.parent = None self.keepass_group = None - def add_child(self, child: 'Folder'): + def add_child(self, child: "Folder"): + """Add a child folder to this folder. + + Args: + child: Folder object to add as a child + """ self.children.append(child) child.parent = self + # logic was lifted directly from https://github.com/bitwarden/jslib/blob/ecdd08624f61ccff8128b7cb3241f39e664e1c7f/common/src/misc/serviceUtils.ts#L7 def nested_traverse_insert(root: Folder, name_parts: List[str], new_folder: Folder, delimiter: str) -> None: + """Insert a new folder into the folder hierarchy. + + Args: + root: Root folder to start traversal from + name_parts: Path components for the new folder + new_folder: Folder object to insert + delimiter: Character used to separate path components + """ if len(name_parts) == 0: return end: bool = len(name_parts) == 1 part_name: str = name_parts[0] - + for child in root.children: if child.name != part_name: continue @@ -50,10 +86,17 @@ def nested_traverse_insert(root: Folder, name_parts: List[str], new_folder: Fold new_name_parts.extend(name_parts[2:]) nested_traverse_insert(root, new_name_parts, new_folder, delimiter) + def bfs_traverse_execute(root: Folder, callback: Callable[[Folder], None]) -> None: + """Execute a callback on each folder in breadth-first order. + + Args: + root: Root folder to start traversal from + callback: Function to execute on each folder + """ queue: Deque[Folder] = collections.deque() queue.extend(root.children) while queue: child: Folder = queue.popleft() queue.extend(child.children) - callback(child) \ No newline at end of file + callback(child) diff --git a/item.py b/item.py index 1a24f65..f10d3c0 100644 --- a/item.py +++ b/item.py @@ -1,83 +1,168 @@ +"""Item management functionality for Bitwarden to KeePass conversion. + +Provides classes and functions for handling different types of Bitwarden items +when converting them to KeePass entries. +""" + from enum import IntEnum -from urllib.parse import urlsplit, parse_qsl +from urllib.parse import parse_qsl, urlsplit class ItemType(IntEnum): + """Enumeration of Bitwarden item types. + + Attributes: + LOGIN: Login credentials + SECURE_NOTE: Secure note + CARD: Credit/debit card + IDENTITY: Identity information + """ + LOGIN = 1 SECURE_NOTE = 2 CARD = 3 IDENTITY = 4 + class CustomFieldType(IntEnum): + """Enumeration of Bitwarden custom field types. + + Attributes: + TEXT: Plain text field + HIDDEN: Hidden/masked field + BOOLEAN: Boolean/checkbox field + """ + TEXT = 0 HIDDEN = 1 BOOLEAN = 2 + class Item: + """Wrapper for Bitwarden vault items. + + Provides methods to access and format item data for conversion to KeePass. + """ + def __init__(self, item): + """Initialize a new item wrapper. + + Args: + item: Raw Bitwarden item data + """ self.item = item def get_id(self) -> str: - return self.item['id'] + """Get item's unique identifier. + + Returns: + Bitwarden item ID + """ + return self.item["id"] def get_name(self) -> str: - return self.item['name'] + """Get item's display name. + + Returns: + Item name/title + """ + return self.item["name"] def get_folder_id(self) -> str: - return self.item['folderId'] + """Get ID of folder containing this item. + + Returns: + Bitwarden folder ID + """ + return self.item["folderId"] def get_username(self) -> str: - if 'login' not in self.item: - return '' + """Get username for login items. - return self.item['login']['username'] if self.item['login']['username'] else '' + Returns: + Username string or empty string if not a login + """ + if "login" not in self.item: + return "" + + return self.item["login"]["username"] if self.item["login"]["username"] else "" def get_password(self) -> str: - if 'login' not in self.item: - return '' + """Get password for login items. - return self.item['login']['password'] if self.item['login']['password'] else '' + Returns: + Password string or empty string if not a login + """ + if "login" not in self.item: + return "" + + return self.item["login"]["password"] if self.item["login"]["password"] else "" def get_notes(self): - return self.item['notes'] + """Get item's notes field. + + Returns: + Notes text + """ + return self.item["notes"] def get_uris(self): - if 'login' not in self.item or 'uris' not in self.item['login']: + """Get URIs associated with login items. + + Returns: + List of URI objects + """ + if "login" not in self.item or "uris" not in self.item["login"]: return [] - for uri in self.item['login']['uris']: - uri['uri'] = uri['uri'] if uri['uri'] is not None else '' + for uri in self.item["login"]["uris"]: + uri["uri"] = uri["uri"] if uri["uri"] is not None else "" - return self.item['login']['uris'] + return self.item["login"]["uris"] def get_custom_fields(self): - if 'fields' not in self.item: + """Get item's custom fields. + + Returns: + List of custom field objects + """ + if "fields" not in self.item: return [] - for field in self.item['fields']: - field['name'] = field['name'] if field['name'] is not None else '' - field['value'] = field['value'] if field['value'] is not None else '' - field['type'] = CustomFieldType(field['type']) + for field in self.item["fields"]: + field["name"] = field["name"] if field["name"] is not None else "" + field["value"] = field["value"] if field["value"] is not None else "" + field["type"] = CustomFieldType(field["type"]) - return self.item['fields'] + return self.item["fields"] def get_attachments(self): - if 'attachments' not in self.item: + """Get item's attachments. + + Returns: + List of attachment objects + """ + if "attachments" not in self.item: return [] - return self.item['attachments'] + return self.item["attachments"] def get_totp(self): - if 'login' not in self.item: + """Get TOTP configuration for login items. + + Returns: + Tuple of (secret, settings) or (None, None) if not configured + """ + if "login" not in self.item: return None, None - if not self.item['login']['totp']: + if not self.item["login"]["totp"]: return None, None - params = urlsplit(self.item['login']['totp']).query + params = urlsplit(self.item["login"]["totp"]).query params = dict(parse_qsl(params)) - period = params.get('period', 30) - digits = params.get('digits', 6) - secret = params.get('secret', self.item['login']['totp']) + period = params.get("period", 30) + digits = params.get("digits", 6) + secret = params.get("secret", self.item["login"]["totp"]) - return secret, f'{period};{digits}' + return secret, f"{period};{digits}"