Added docstrings

This commit is contained in:
Roger Gonzalez 2024-12-04 19:32:09 -03:00
parent b8dcff0ce2
commit 4b3e89669a
Signed by: rogs
GPG Key ID: C7ECE9C6C36EC2E6
3 changed files with 299 additions and 118 deletions

View File

@ -1,154 +1,193 @@
"""Export Bitwarden items into a KeePass database.
This module provides functionality to export items from a Bitwarden vault into a KeePass database,
including logins (with TOTP seeds, URIs, custom fields, attachments, notes) and secure notes.
"""
import json import json
import logging import logging
import os import os
import re import re
import subprocess import subprocess
from argparse import ArgumentParser from argparse import ArgumentParser
from typing import Dict, List, Optional from typing import Dict, List, Optional
from pykeepass import PyKeePass, create_database from pykeepass import PyKeePass, create_database
from pykeepass.entry import Entry as KPEntry
from pykeepass.exceptions import CredentialsError from pykeepass.exceptions import CredentialsError
from pykeepass.group import Group as KPGroup from pykeepass.group import Group as KPGroup
from pykeepass.entry import Entry as KPEntry
import folder as FolderType import folder as FolderType
from item import Item, ItemType, CustomFieldType from item import CustomFieldType, Item, ItemType
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s :: %(levelname)s :: %(message)s', format="%(asctime)s :: %(levelname)s :: %(message)s",
datefmt='%Y-%m-%d %H:%M:%S', datefmt="%Y-%m-%d %H:%M:%S",
) )
kp: Optional[PyKeePass] = None kp: Optional[PyKeePass] = None
def bitwarden_to_keepass(args): def bitwarden_to_keepass(args):
global kp """Convert Bitwarden vault items to KeePass database entries.
Args:
args: ArgumentParser namespace containing configuration options including:
- database_path: Path to KeePass database
- database_password: Password for KeePass database
- database_keyfile: Optional path to key file
- bw_path: Path to Bitwarden CLI executable
- bw_session: Bitwarden session token
Returns:
PyKeePass: The KeePass database instance with imported items
"""
try: try:
kp = PyKeePass(args.database_path, password=args.database_password, keyfile=args.database_keyfile) kp = PyKeePass(args.database_path, password=args.database_password, keyfile=args.database_keyfile)
except FileNotFoundError: except FileNotFoundError:
logging.info('KeePass database does not exist, creating a new one.') logging.info("KeePass database does not exist, creating a new one.")
kp = create_database(args.database_path, password=args.database_password, keyfile=args.database_keyfile) kp = create_database(args.database_path, password=args.database_password, keyfile=args.database_keyfile)
except CredentialsError as e: except CredentialsError as e:
logging.error(f'Wrong password for KeePass database: {e}') logging.error(f"Wrong password for KeePass database: {e}")
return return None
folders = subprocess.check_output([args.bw_path, 'list', 'folders', '--session', args.bw_session], encoding='utf8') folders = subprocess.check_output([args.bw_path, "list", "folders", "--session", args.bw_session], encoding="utf8")
folders = json.loads(folders) folders = json.loads(folders)
groups_by_id = load_folders(folders) groups_by_id = load_folders(kp, folders)
logging.info(f'Folders done ({len(groups_by_id)}).') logging.info(f"Folders done ({len(groups_by_id)}).")
items = subprocess.check_output([args.bw_path, 'list', 'items', '--session', args.bw_session], encoding='utf8') items = subprocess.check_output([args.bw_path, "list", "items", "--session", args.bw_session], encoding="utf8")
items = json.loads(items) items = json.loads(items)
logging.info(f'Starting to process {len(items)} items.') logging.info(f"Starting to process {len(items)} items.")
for item in items: for item in items:
if item['type'] in [ItemType.CARD, ItemType.IDENTITY]: if item["type"] in [ItemType.CARD, ItemType.IDENTITY]:
logging.warning(f'Skipping credit card or identity item "{item["name"]}".') logging.warning(f'Skipping credit card or identity item "{item["name"]}".')
continue continue
bw_item = Item(item) bw_item = Item(item)
is_duplicate_title = False
try: try:
is_duplicate_title = False
while True: while True:
entry_title = bw_item.get_name() if not is_duplicate_title else '{name} - ({item_id}'.format(name=bw_item.get_name(), item_id=bw_item.get_id()) entry_title = (
bw_item.get_name()
if not is_duplicate_title
else "{name} - ({item_id}".format(name=bw_item.get_name(), item_id=bw_item.get_id())
)
try: try:
entry = kp.add_entry( entry = kp.add_entry(
destination_group=groups_by_id[bw_item.get_folder_id()], destination_group=groups_by_id[bw_item.get_folder_id()],
title=entry_title, title=entry_title,
username=bw_item.get_username(), username=bw_item.get_username(),
password=bw_item.get_password(), password=bw_item.get_password(),
notes=bw_item.get_notes() notes=bw_item.get_notes(),
) )
break break
except Exception as e: except Exception as e:
if 'already exists' in str(e): if "already exists" in str(e):
is_duplicate_title = True is_duplicate_title = True
continue continue
raise raise
totp_secret, totp_settings = bw_item.get_totp() totp_secret, totp_settings = bw_item.get_totp()
if totp_secret and totp_settings: if totp_secret and totp_settings:
entry.set_custom_property('TOTP Seed', totp_secret, protect=True) entry.set_custom_property("TOTP Seed", totp_secret, protect=True)
entry.set_custom_property('TOTP Settings', totp_settings) entry.set_custom_property("TOTP Settings", totp_settings)
uris = [uri['uri'] for uri in bw_item.get_uris()] uris = [uri["uri"] for uri in bw_item.get_uris()]
set_kp_entry_urls(entry, uris) set_kp_entry_urls(entry, uris)
for field in bw_item.get_custom_fields(): for field in bw_item.get_custom_fields():
entry.set_custom_property( entry.set_custom_property(
field['name'], field["name"],
field['value'], field["value"],
protect=field['type'] == CustomFieldType.HIDDEN, protect=field["type"] == CustomFieldType.HIDDEN,
) )
for attachment in bw_item.get_attachments(): for attachment in bw_item.get_attachments():
attachment_raw = subprocess.check_output([ attachment_raw = subprocess.check_output(
args.bw_path, 'get', 'attachment', attachment['id'], '--raw', '--itemid', bw_item.get_id(), [
'--session', args.bw_session, args.bw_path,
]) "get",
"attachment",
attachment["id"],
"--raw",
"--itemid",
bw_item.get_id(),
"--session",
args.bw_session,
],
)
attachment_id = kp.add_binary(attachment_raw) attachment_id = kp.add_binary(attachment_raw)
entry.add_attachment(attachment_id, attachment['fileName']) entry.add_attachment(attachment_id, attachment["fileName"])
except Exception as e: except Exception as e:
logging.warning(f'Skipping item named "{item["name"]}" because of this error: {repr(e)}') logging.warning(f'Skipping item named "{item["name"]}" because of this error: {repr(e)}')
continue continue
logging.info('Saving changes to KeePass database.') logging.info("Saving changes to KeePass database.")
kp.save() kp.save()
logging.info('Export completed.') logging.info("Export completed.")
return kp
def set_kp_entry_urls(entry: KPEntry, urls: List[str]) -> None: def set_kp_entry_urls(entry: KPEntry, urls: List[str]) -> None:
"""Store a list of URLs comming from a Bitwarden entry in different """Store a list of URLs from a Bitwarden entry in KeePass entry attributes.
attributes and custom properties of a KeePass entry, depending on whether
it's an identifier for an Android or iOS app or it's a generic URL""" Maps URLs to different KeePass attributes and custom properties based on their type:
- Android app identifiers
- iOS app identifiers
- Generic URLs
Args:
entry: KeePass entry object to store URLs in
urls: List of URL strings from Bitwarden
"""
android_apps = ios_apps = extra_urls = 0 android_apps = ios_apps = extra_urls = 0
for url in urls: for url in urls:
match url.partition('://'): match url.partition("://"):
case ('androidapp', '://', app_id): case ("androidapp", "://", app_id):
# It's an Android app registered by Bitwarden's mobile app
# Store multiple apps in AndroidApp, AndroidApp_1, etc. so that KeePassDX's autofill picks it up
prop_name = "AndroidApp" if android_apps == 0 else f"AndroidApp_{android_apps}" prop_name = "AndroidApp" if android_apps == 0 else f"AndroidApp_{android_apps}"
android_apps += 1 android_apps += 1
entry.set_custom_property(prop_name, app_id) entry.set_custom_property(prop_name, app_id)
case ('iosapp', '://', app_id): case ("iosapp", "://", app_id):
# It's an iOS app registered by Bitwarden's mobile app
# XXX Maybe properly set up autofill for a macOS/iPhone/iPad KeePass-compatible app like StrongBox or Keepassium
ios_apps += 1 ios_apps += 1
entry.set_custom_property(f'iOS app #{ios_apps}', app_id) entry.set_custom_property(f"iOS app #{ios_apps}", app_id)
case _: case _:
# Assume it's a generic URL.
# First one goes to the standard URL attribute and the remaining ones go to URL_1, URL_2 and so on
if entry.url is None: if entry.url is None:
entry.url = url entry.url = url
else: else:
extra_urls += 1 extra_urls += 1
entry.set_custom_property(f'URL_{extra_urls}', url) entry.set_custom_property(f"URL_{extra_urls}", url)
def load_folders(folders) -> Dict[str, KPGroup]: def load_folders(kp: PyKeePass, folders) -> Dict[str, KPGroup]:
# sort folders so that in the case of nested folders, the parents would be guaranteed to show up before the children """Create KeePass folder structure from Bitwarden folders.
folders.sort(key=lambda x: x['name'])
Args:
kp: KeePass database instance
folders: List of folder objects from Bitwarden
Returns:
Dictionary mapping Bitwarden folder IDs to KeePass group objects
"""
folders.sort(key=lambda x: x["name"])
# dict to store mapping of Bitwarden folder id to keepass group
groups_by_id: Dict[str, KPGroup] = {} groups_by_id: Dict[str, KPGroup] = {}
# build up folder tree
folder_root: FolderType.Folder = FolderType.Folder(None) folder_root: FolderType.Folder = FolderType.Folder(None)
folder_root.keepass_group = kp.root_group folder_root.keepass_group = kp.root_group
groups_by_id[None] = kp.root_group groups_by_id[None] = kp.root_group
for folder in folders: for folder in folders:
if folder['id'] is not None: if folder["id"] is not None:
new_folder: FolderType.Folder = FolderType.Folder(folder['id']) new_folder: FolderType.Folder = FolderType.Folder(folder["id"])
# regex lifted from https://github.com/bitwarden/jslib/blob/ecdd08624f61ccff8128b7cb3241f39e664e1c7f/common/src/services/folder.service.ts#L108 folder_name_parts: List[str] = re.sub(r"^\/+|\/+$", "", folder["name"]).split("/")
folder_name_parts: List[str] = re.sub(r'^\/+|\/+$', '', folder['name']).split('/') FolderType.nested_traverse_insert(folder_root, folder_name_parts, new_folder, "/")
FolderType.nested_traverse_insert(folder_root, folder_name_parts, new_folder, '/')
# create keepass groups based off folder tree
def add_keepass_group(folder: FolderType.Folder): def add_keepass_group(folder: FolderType.Folder):
parent_group: KPGroup = folder.parent.keepass_group parent_group: KPGroup = folder.parent.keepass_group
new_group: KPGroup = kp.add_group(parent_group, folder.name) new_group: KPGroup = kp.add_group(parent_group, folder.name)
@ -161,50 +200,64 @@ def load_folders(folders) -> Dict[str, KPGroup]:
def check_args(args): def check_args(args):
if args.database_keyfile: """Validate command line arguments.
if not os.path.isfile(args.database_keyfile) or not os.access(args.database_keyfile, os.R_OK):
logging.error('Key File for KeePass database is not readable.') Args:
return False args: ArgumentParser namespace containing runtime configuration
Returns:
bool: True if arguments are valid, False otherwise
"""
if args.database_keyfile and (
not os.path.isfile(args.database_keyfile) or not os.access(args.database_keyfile, os.R_OK)
):
logging.error("Key File for KeePass database is not readable.")
return False
if not os.path.isfile(args.bw_path) or not os.access(args.bw_path, os.X_OK): if not os.path.isfile(args.bw_path) or not os.access(args.bw_path, os.X_OK):
logging.error('bitwarden-cli was not found or not executable. Did you set correct \'--bw-path\'?') logging.error("bitwarden-cli was not found or not executable. Did you set correct '--bw-path'?")
return False return False
return True return True
def environ_or_required(key): def environ_or_required(key):
return ( """Get argument configuration based on environment variable presence.
{'default': os.environ.get(key)} if os.environ.get(key)
else {'required': True} Args:
) key: Environment variable name to check
Returns:
dict: ArgumentParser argument configuration
"""
return {"default": os.environ.get(key)} if os.environ.get(key) else {"required": True}
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
'--bw-session', "--bw-session",
help='Session generated from bitwarden-cli (bw login)', help="Session generated from bitwarden-cli (bw login)",
**environ_or_required('BW_SESSION'), **environ_or_required("BW_SESSION"),
) )
parser.add_argument( parser.add_argument(
'--database-path', "--database-path",
help='Path to KeePass database. If database does not exists it will be created.', help="Path to KeePass database. If database does not exists it will be created.",
**environ_or_required('DATABASE_PATH'), **environ_or_required("DATABASE_PATH"),
) )
parser.add_argument( parser.add_argument(
'--database-password', "--database-password",
help='Password for KeePass database', help="Password for KeePass database",
**environ_or_required('DATABASE_PASSWORD'), **environ_or_required("DATABASE_PASSWORD"),
) )
parser.add_argument( parser.add_argument(
'--database-keyfile', "--database-keyfile",
help='Path to Key File for KeePass database', help="Path to Key File for KeePass database",
default=os.environ.get('DATABASE_KEYFILE', None), default=os.environ.get("DATABASE_KEYFILE", None),
) )
parser.add_argument( parser.add_argument(
'--bw-path', "--bw-path",
help='Path for bw binary', help="Path for bw binary",
default=os.environ.get('BW_PATH', 'bw'), default=os.environ.get("BW_PATH", "bw"),
) )
args = parser.parse_args() args = parser.parse_args()

View File

@ -1,34 +1,70 @@
"""Folder management functionality for Bitwarden to KeePass conversion.
Provides classes and functions for managing folder hierarchies when converting
from Bitwarden's folder structure to KeePass groups.
"""
import collections import collections
from typing import Callable, Deque, List, Optional from typing import Callable, Deque, List, Optional
from pykeepass.group import Group as KPGroup from pykeepass.group import Group as KPGroup
class Folder: class Folder:
"""Represents a folder in the Bitwarden/KeePass hierarchy.
Attributes:
id: Bitwarden folder identifier
name: Display name of the folder
children: List of child folders
parent: Parent folder reference
keepass_group: Associated KeePass group object
"""
id: Optional[str] id: Optional[str]
name: Optional[str] name: Optional[str]
children: List['Folder'] children: List["Folder"]
parent: Optional['Folder'] parent: Optional["Folder"]
keepass_group: Optional[KPGroup] keepass_group: Optional[KPGroup]
def __init__(self, id: Optional[str]): def __init__(self, id_: Optional[str]):
self.id = id """Initialize a new folder.
Args:
folder_id: Bitwarden folder identifier
"""
self.id = id_
self.name = None self.name = None
self.children = [] self.children = []
self.parent = None self.parent = None
self.keepass_group = None self.keepass_group = None
def add_child(self, child: 'Folder'): def add_child(self, child: "Folder"):
"""Add a child folder to this folder.
Args:
child: Folder object to add as a child
"""
self.children.append(child) self.children.append(child)
child.parent = self child.parent = self
# logic was lifted directly from https://github.com/bitwarden/jslib/blob/ecdd08624f61ccff8128b7cb3241f39e664e1c7f/common/src/misc/serviceUtils.ts#L7 # logic was lifted directly from https://github.com/bitwarden/jslib/blob/ecdd08624f61ccff8128b7cb3241f39e664e1c7f/common/src/misc/serviceUtils.ts#L7
def nested_traverse_insert(root: Folder, name_parts: List[str], new_folder: Folder, delimiter: str) -> None: def nested_traverse_insert(root: Folder, name_parts: List[str], new_folder: Folder, delimiter: str) -> None:
"""Insert a new folder into the folder hierarchy.
Args:
root: Root folder to start traversal from
name_parts: Path components for the new folder
new_folder: Folder object to insert
delimiter: Character used to separate path components
"""
if len(name_parts) == 0: if len(name_parts) == 0:
return return
end: bool = len(name_parts) == 1 end: bool = len(name_parts) == 1
part_name: str = name_parts[0] part_name: str = name_parts[0]
for child in root.children: for child in root.children:
if child.name != part_name: if child.name != part_name:
continue continue
@ -50,10 +86,17 @@ def nested_traverse_insert(root: Folder, name_parts: List[str], new_folder: Fold
new_name_parts.extend(name_parts[2:]) new_name_parts.extend(name_parts[2:])
nested_traverse_insert(root, new_name_parts, new_folder, delimiter) nested_traverse_insert(root, new_name_parts, new_folder, delimiter)
def bfs_traverse_execute(root: Folder, callback: Callable[[Folder], None]) -> None: def bfs_traverse_execute(root: Folder, callback: Callable[[Folder], None]) -> None:
"""Execute a callback on each folder in breadth-first order.
Args:
root: Root folder to start traversal from
callback: Function to execute on each folder
"""
queue: Deque[Folder] = collections.deque() queue: Deque[Folder] = collections.deque()
queue.extend(root.children) queue.extend(root.children)
while queue: while queue:
child: Folder = queue.popleft() child: Folder = queue.popleft()
queue.extend(child.children) queue.extend(child.children)
callback(child) callback(child)

145
item.py
View File

@ -1,83 +1,168 @@
"""Item management functionality for Bitwarden to KeePass conversion.
Provides classes and functions for handling different types of Bitwarden items
when converting them to KeePass entries.
"""
from enum import IntEnum from enum import IntEnum
from urllib.parse import urlsplit, parse_qsl from urllib.parse import parse_qsl, urlsplit
class ItemType(IntEnum): class ItemType(IntEnum):
"""Enumeration of Bitwarden item types.
Attributes:
LOGIN: Login credentials
SECURE_NOTE: Secure note
CARD: Credit/debit card
IDENTITY: Identity information
"""
LOGIN = 1 LOGIN = 1
SECURE_NOTE = 2 SECURE_NOTE = 2
CARD = 3 CARD = 3
IDENTITY = 4 IDENTITY = 4
class CustomFieldType(IntEnum): class CustomFieldType(IntEnum):
"""Enumeration of Bitwarden custom field types.
Attributes:
TEXT: Plain text field
HIDDEN: Hidden/masked field
BOOLEAN: Boolean/checkbox field
"""
TEXT = 0 TEXT = 0
HIDDEN = 1 HIDDEN = 1
BOOLEAN = 2 BOOLEAN = 2
class Item: class Item:
"""Wrapper for Bitwarden vault items.
Provides methods to access and format item data for conversion to KeePass.
"""
def __init__(self, item): def __init__(self, item):
"""Initialize a new item wrapper.
Args:
item: Raw Bitwarden item data
"""
self.item = item self.item = item
def get_id(self) -> str: def get_id(self) -> str:
return self.item['id'] """Get item's unique identifier.
Returns:
Bitwarden item ID
"""
return self.item["id"]
def get_name(self) -> str: def get_name(self) -> str:
return self.item['name'] """Get item's display name.
Returns:
Item name/title
"""
return self.item["name"]
def get_folder_id(self) -> str: def get_folder_id(self) -> str:
return self.item['folderId'] """Get ID of folder containing this item.
Returns:
Bitwarden folder ID
"""
return self.item["folderId"]
def get_username(self) -> str: def get_username(self) -> str:
if 'login' not in self.item: """Get username for login items.
return ''
return self.item['login']['username'] if self.item['login']['username'] else '' Returns:
Username string or empty string if not a login
"""
if "login" not in self.item:
return ""
return self.item["login"]["username"] if self.item["login"]["username"] else ""
def get_password(self) -> str: def get_password(self) -> str:
if 'login' not in self.item: """Get password for login items.
return ''
return self.item['login']['password'] if self.item['login']['password'] else '' Returns:
Password string or empty string if not a login
"""
if "login" not in self.item:
return ""
return self.item["login"]["password"] if self.item["login"]["password"] else ""
def get_notes(self): def get_notes(self):
return self.item['notes'] """Get item's notes field.
Returns:
Notes text
"""
return self.item["notes"]
def get_uris(self): def get_uris(self):
if 'login' not in self.item or 'uris' not in self.item['login']: """Get URIs associated with login items.
Returns:
List of URI objects
"""
if "login" not in self.item or "uris" not in self.item["login"]:
return [] return []
for uri in self.item['login']['uris']: for uri in self.item["login"]["uris"]:
uri['uri'] = uri['uri'] if uri['uri'] is not None else '' uri["uri"] = uri["uri"] if uri["uri"] is not None else ""
return self.item['login']['uris'] return self.item["login"]["uris"]
def get_custom_fields(self): def get_custom_fields(self):
if 'fields' not in self.item: """Get item's custom fields.
Returns:
List of custom field objects
"""
if "fields" not in self.item:
return [] return []
for field in self.item['fields']: for field in self.item["fields"]:
field['name'] = field['name'] if field['name'] is not None else '' field["name"] = field["name"] if field["name"] is not None else ""
field['value'] = field['value'] if field['value'] is not None else '' field["value"] = field["value"] if field["value"] is not None else ""
field['type'] = CustomFieldType(field['type']) field["type"] = CustomFieldType(field["type"])
return self.item['fields'] return self.item["fields"]
def get_attachments(self): def get_attachments(self):
if 'attachments' not in self.item: """Get item's attachments.
Returns:
List of attachment objects
"""
if "attachments" not in self.item:
return [] return []
return self.item['attachments'] return self.item["attachments"]
def get_totp(self): def get_totp(self):
if 'login' not in self.item: """Get TOTP configuration for login items.
Returns:
Tuple of (secret, settings) or (None, None) if not configured
"""
if "login" not in self.item:
return None, None return None, None
if not self.item['login']['totp']: if not self.item["login"]["totp"]:
return None, None return None, None
params = urlsplit(self.item['login']['totp']).query params = urlsplit(self.item["login"]["totp"]).query
params = dict(parse_qsl(params)) params = dict(parse_qsl(params))
period = params.get('period', 30) period = params.get("period", 30)
digits = params.get('digits', 6) digits = params.get("digits", 6)
secret = params.get('secret', self.item['login']['totp']) secret = params.get("secret", self.item["login"]["totp"])
return secret, f'{period};{digits}' return secret, f"{period};{digits}"