Refactor Bitwarden to KeePass conversion logic
This commit is contained in:
parent
965c3af264
commit
526421e340
@ -29,6 +29,144 @@ logging.basicConfig(
|
||||
kp: Optional[PyKeePass] = None
|
||||
|
||||
|
||||
def initialize_keepass_db(database_path: str, password: str, keyfile: Optional[str] = None) -> Optional[PyKeePass]:
|
||||
"""Initialize or open a KeePass database.
|
||||
|
||||
Args:
|
||||
database_path: Path to KeePass database file
|
||||
password: Password for KeePass database
|
||||
keyfile: Optional path to key file
|
||||
|
||||
Returns:
|
||||
PyKeePass instance if successful, None if failed
|
||||
"""
|
||||
try:
|
||||
kp = PyKeePass(database_path, password=password, keyfile=keyfile)
|
||||
except FileNotFoundError:
|
||||
logging.info("KeePass database does not exist, creating a new one.")
|
||||
kp = create_database(database_path, password=password, keyfile=keyfile)
|
||||
except CredentialsError as e:
|
||||
logging.error(f"Wrong password for KeePass database: {e}")
|
||||
return None
|
||||
return kp
|
||||
|
||||
|
||||
def fetch_bitwarden_data(bw_path: str, bw_session: str) -> tuple[list, list]:
|
||||
"""Fetch folders and items from Bitwarden vault.
|
||||
|
||||
Args:
|
||||
bw_path: Path to Bitwarden CLI executable
|
||||
bw_session: Bitwarden session token
|
||||
|
||||
Returns:
|
||||
Tuple containing (folders, items) lists from Bitwarden
|
||||
"""
|
||||
folders = subprocess.check_output([bw_path, "list", "folders", "--session", bw_session], encoding="utf8")
|
||||
folders = json.loads(folders)
|
||||
|
||||
items = subprocess.check_output([bw_path, "list", "items", "--session", bw_session], encoding="utf8")
|
||||
items = json.loads(items)
|
||||
|
||||
return folders, items
|
||||
|
||||
|
||||
def process_entry_title(kp: PyKeePass, group: KPGroup, title: str, item_id: str) -> str:
|
||||
"""Generate a unique title for a KeePass entry.
|
||||
|
||||
Args:
|
||||
kp: KeePass database instance
|
||||
group: KeePass group to check for duplicates
|
||||
title: Desired entry title
|
||||
item_id: Bitwarden item ID for fallback
|
||||
|
||||
Returns:
|
||||
Unique entry title
|
||||
"""
|
||||
if not kp.find_entries(title=title, group=group, first=True):
|
||||
return title
|
||||
return f"{title} - ({item_id})"
|
||||
|
||||
|
||||
def create_keepass_entry(kp: PyKeePass, bw_item: Item, group: KPGroup) -> Optional[KPEntry]:
|
||||
"""Create a new KeePass entry from a Bitwarden item.
|
||||
|
||||
Args:
|
||||
kp: KeePass database instance
|
||||
bw_item: Bitwarden item wrapper
|
||||
group: KeePass group to add entry to
|
||||
|
||||
Returns:
|
||||
Created KeePass entry or None if creation failed
|
||||
"""
|
||||
try:
|
||||
entry_title = process_entry_title(kp, group, bw_item.get_name(), bw_item.get_id())
|
||||
return kp.add_entry(
|
||||
destination_group=group,
|
||||
title=entry_title,
|
||||
username=bw_item.get_username(),
|
||||
password=bw_item.get_password(),
|
||||
notes=bw_item.get_notes(),
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warning(f'Failed to create entry "{bw_item.get_name()}": {repr(e)}')
|
||||
return None
|
||||
|
||||
|
||||
def add_totp_to_entry(entry: KPEntry, bw_item: Item) -> None:
|
||||
"""Add TOTP configuration to KeePass entry.
|
||||
|
||||
Args:
|
||||
entry: KeePass entry to modify
|
||||
bw_item: Bitwarden item containing TOTP data
|
||||
"""
|
||||
totp_secret, totp_settings = bw_item.get_totp()
|
||||
if totp_secret and totp_settings:
|
||||
entry.set_custom_property("TOTP Seed", totp_secret, protect=True)
|
||||
entry.set_custom_property("TOTP Settings", totp_settings)
|
||||
|
||||
|
||||
def add_custom_fields_to_entry(entry: KPEntry, bw_item: Item) -> None:
|
||||
"""Add custom fields from Bitwarden item to KeePass entry.
|
||||
|
||||
Args:
|
||||
entry: KeePass entry to modify
|
||||
bw_item: Bitwarden item containing custom fields
|
||||
"""
|
||||
for field in bw_item.get_custom_fields():
|
||||
entry.set_custom_property(
|
||||
field["name"],
|
||||
field["value"],
|
||||
protect=field["type"] == CustomFieldType.HIDDEN,
|
||||
)
|
||||
|
||||
|
||||
def add_attachments_to_entry(entry: KPEntry, bw_item: Item, bw_path: str, bw_session: str) -> None:
|
||||
"""Add attachments from Bitwarden item to KeePass entry.
|
||||
|
||||
Args:
|
||||
entry: KeePass entry to modify
|
||||
bw_item: Bitwarden item containing attachments
|
||||
bw_path: Path to Bitwarden CLI executable
|
||||
bw_session: Bitwarden session token
|
||||
"""
|
||||
for attachment in bw_item.get_attachments():
|
||||
attachment_raw = subprocess.check_output(
|
||||
[
|
||||
bw_path,
|
||||
"get",
|
||||
"attachment",
|
||||
attachment["id"],
|
||||
"--raw",
|
||||
"--itemid",
|
||||
bw_item.get_id(),
|
||||
"--session",
|
||||
bw_session,
|
||||
],
|
||||
)
|
||||
attachment_id = entry._kp.add_binary(attachment_raw)
|
||||
entry.add_attachment(attachment_id, attachment["fileName"])
|
||||
|
||||
|
||||
def bitwarden_to_keepass(args):
|
||||
"""Convert Bitwarden vault items to KeePass database entries.
|
||||
|
||||
@ -43,22 +181,13 @@ def bitwarden_to_keepass(args):
|
||||
Returns:
|
||||
PyKeePass: The KeePass database instance with imported items
|
||||
"""
|
||||
try:
|
||||
kp = PyKeePass(args.database_path, password=args.database_password, keyfile=args.database_keyfile)
|
||||
except FileNotFoundError:
|
||||
logging.info("KeePass database does not exist, creating a new one.")
|
||||
kp = create_database(args.database_path, password=args.database_password, keyfile=args.database_keyfile)
|
||||
except CredentialsError as e:
|
||||
logging.error(f"Wrong password for KeePass database: {e}")
|
||||
kp = initialize_keepass_db(args.database_path, args.database_password, args.database_keyfile)
|
||||
if not kp:
|
||||
return None
|
||||
|
||||
folders = subprocess.check_output([args.bw_path, "list", "folders", "--session", args.bw_session], encoding="utf8")
|
||||
folders = json.loads(folders)
|
||||
folders, items = fetch_bitwarden_data(args.bw_path, args.bw_session)
|
||||
groups_by_id = load_folders(kp, folders)
|
||||
logging.info(f"Folders done ({len(groups_by_id)}).")
|
||||
|
||||
items = subprocess.check_output([args.bw_path, "list", "items", "--session", args.bw_session], encoding="utf8")
|
||||
items = json.loads(items)
|
||||
logging.info(f"Starting to process {len(items)} items.")
|
||||
|
||||
for item in items:
|
||||
@ -67,65 +196,15 @@ def bitwarden_to_keepass(args):
|
||||
continue
|
||||
|
||||
bw_item = Item(item)
|
||||
entry = create_keepass_entry(kp, bw_item, groups_by_id[bw_item.get_folder_id()])
|
||||
|
||||
try:
|
||||
is_duplicate_title = False
|
||||
while True:
|
||||
entry_title = (
|
||||
bw_item.get_name()
|
||||
if not is_duplicate_title
|
||||
else "{name} - ({item_id}".format(name=bw_item.get_name(), item_id=bw_item.get_id())
|
||||
)
|
||||
try:
|
||||
entry = kp.add_entry(
|
||||
destination_group=groups_by_id[bw_item.get_folder_id()],
|
||||
title=entry_title,
|
||||
username=bw_item.get_username(),
|
||||
password=bw_item.get_password(),
|
||||
notes=bw_item.get_notes(),
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
if "already exists" in str(e):
|
||||
is_duplicate_title = True
|
||||
if not entry:
|
||||
continue
|
||||
raise
|
||||
|
||||
totp_secret, totp_settings = bw_item.get_totp()
|
||||
if totp_secret and totp_settings:
|
||||
entry.set_custom_property("TOTP Seed", totp_secret, protect=True)
|
||||
entry.set_custom_property("TOTP Settings", totp_settings)
|
||||
|
||||
uris = [uri["uri"] for uri in bw_item.get_uris()]
|
||||
set_kp_entry_urls(entry, uris)
|
||||
|
||||
for field in bw_item.get_custom_fields():
|
||||
entry.set_custom_property(
|
||||
field["name"],
|
||||
field["value"],
|
||||
protect=field["type"] == CustomFieldType.HIDDEN,
|
||||
)
|
||||
|
||||
for attachment in bw_item.get_attachments():
|
||||
attachment_raw = subprocess.check_output(
|
||||
[
|
||||
args.bw_path,
|
||||
"get",
|
||||
"attachment",
|
||||
attachment["id"],
|
||||
"--raw",
|
||||
"--itemid",
|
||||
bw_item.get_id(),
|
||||
"--session",
|
||||
args.bw_session,
|
||||
],
|
||||
)
|
||||
attachment_id = kp.add_binary(attachment_raw)
|
||||
entry.add_attachment(attachment_id, attachment["fileName"])
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f'Skipping item named "{item["name"]}" because of this error: {repr(e)}')
|
||||
continue
|
||||
add_totp_to_entry(entry, bw_item)
|
||||
set_kp_entry_urls(entry, [uri["uri"] for uri in bw_item.get_uris()])
|
||||
add_custom_fields_to_entry(entry, bw_item)
|
||||
add_attachments_to_entry(entry, bw_item, args.bw_path, args.bw_session)
|
||||
|
||||
logging.info("Saving changes to KeePass database.")
|
||||
kp.save()
|
||||
|
Loading…
x
Reference in New Issue
Block a user