restic_ui_pywebio/main.py
2023-04-07 14:24:53 +02:00

2990 lines
127 KiB
Python

###############################################
# Imports
###############################################
import time
import datetime
import json
import os
import subprocess
from pathlib import Path
from functools import partial
from dataclasses import dataclass
from pywebio import start_server, config
from pywebio.pin import put_input
from pywebio.session import info as session_info
from pywebio.output import put_text, \
put_table, \
use_scope, \
scroll_to, \
put_markdown, \
put_buttons, \
clear, \
put_info, \
put_error, \
put_html, \
put_tabs, \
toast, \
put_image, \
put_row, \
put_grid, \
put_loading
from pywebio.input import input, \
input_group, \
NUMBER, \
PASSWORD, \
TEXT, \
URL, \
radio
from pywebio_battery import popup_input
###############################################
# Set paths and file names
###############################################
print(datetime.datetime.now())
# Get home directory from user
USERHOME = str(Path.home())
VERSION_NUMBER = '0.4.7'
settings_path = Path(f'{USERHOME}/settings_pywebio.json')
json_path = Path(f'{USERHOME}/backup_list.json')
settings = {}
clear_list = {'control', 'control2', 'control3', 'control4',
'file', 'about', 'load_file', 'migrate', 'open_repo',
'require', 'backup', 'tools'}
clear_list_control = {'control', 'control2', 'control3', 'control4'}
###############################################
# Classes
###############################################
#####################
# Class for Settings
#####################
@dataclass
class SettingsLoad():
""" Class to store settings """
project: str
version: str
settings_path: str
json_file: str
theme: str
keep_last: str
keep_monthly: str
check: str
#####################
# Load JSON
#####################
@staticmethod
def load_json():
""" load JSON """
try:
# try to open settings file
# load json file into settings_reload
with open(settings_path, "r", encoding="utf-8") as read_file:
settings_reload = json.load(read_file)
# Deserialization settings_reload
for key, value in settings_reload.items():
settings[key] = value
# Create dataclass object
SettingsLoad(settings['project'],
settings['version'],
settings['settings_path'],
settings['json_file'],
settings['theme'],
settings['keep_last'],
settings['keep_monthly'],
settings['check'])
except OSError:
# create settings file if not exists
init_settings = {'project': 'Restic UI',
'version': f'{VERSION_NUMBER}',
'settings_path': f'{settings_path}',
'json_file': f'{json_path}',
'theme': 'dark',
'keep_last': "3",
'keep_monthly': "3",
'check': '50'}
with open(settings_path, "w", encoding="utf-8") as write_file:
json.dump(init_settings, write_file)
SettingsLoad.load_json()
@staticmethod
def version_number():
""" add correct version number """
with open(settings_path, "r", encoding="utf-8") as read_file:
reload_settings = json.load(read_file)
# json_path = settings['json_file']
if not reload_settings['version'] == VERSION_NUMBER:
reload_settings.update({'version': f'{VERSION_NUMBER}'})
with open(settings_path, "w", encoding="utf-8") as write_file:
json.dump(reload_settings, write_file)
SettingsLoad.load_json()
# Load object
SettingsLoad.load_json()
# print("Control: ", settings['theme'])
# check for new version number and set
if VERSION_NUMBER != settings['version']:
SettingsLoad.version_number()
# set theme from settings
if settings['theme']:
config(title="Restic UI", description="User Interface for backup tool restic...", theme=settings['theme'])
else:
config(title="Restic UI", description="User Interface for backup tool restic...")
#####################
# MountPath
#####################
@dataclass(kw_only=True)
class MountPath:
""" Class to store Mount Path for funktion mount & umount """
path: str
# We create the list, with an empty entry [0].
m_path = []
m_path.insert(0, MountPath(path=""))
# m_path.append(MountPath("/TEST/test"))
#####################
# Create Example File
######################
example_list = {"0": {"name": "Example 1",
"repository": f"{USERHOME}/backup 1",
"source": f"{USERHOME}/source 1",
"password": "123456",
"init": "0",
"exclude": "",
"rest": "0",
"rest_domain": "",
"rest_port": "",
"rest_user": "",
"rest_password": "",
"rest_folder": ""},
"1": {"name": "Example 2",
"repository": f"{USERHOME}/backup 2",
"source": f"{USERHOME}/source 2",
"password": "123456",
"init": "0",
"exclude": "",
"rest": "0",
"rest_domain": "",
"rest_port": "",
"rest_user": "",
"rest_password": "",
"rest_folder": ""},
"2": {"name": "Example 3",
"repository": f"{USERHOME}/backup 3",
"source": f"{USERHOME}/source3",
"password": "12345678",
"init": "0",
"exclude": "",
"rest": "1",
"rest_domain": "https://rest.example.com",
"rest_port": "12345",
"rest_user": "rest_user",
"rest_password": "rest_password",
"rest_folder": "rest_folder"}}
# we save backup_list.json
if not Path(settings['json_file']).exists():
try:
with open(settings['json_file'], 'w', encoding='utf-8') as write_file:
json.dump(example_list, write_file)
except OSError:
print("An error has occurred")
#####################
# BackupList()
#####################
@dataclass()
class BackupList:
""" I build here a class to manage the backup list
Data is in backup_list.json """
name: str
repository: str
source: str
password: str
init: str
exclude: str
rest: str
rest_domain: str
rest_port: str
rest_user: str
rest_password: str
rest_folder: str
#####################
# Save JSON
#####################
@staticmethod
def save_json():
""" We store the JSON file for data backup. """
try:
backups_json = {}
for count, value in enumerate(backups):
backups_json[value] = {'name': backups[value].name,
'repository': backups[value].repository,
'source': backups[value].source,
'password': backups[value].password,
'init': backups[value].init,
'exclude': backups[value].exclude,
'rest': backups[value].rest,
'rest_domain': backups[value].rest_domain,
'rest_port': backups[value].rest_port,
'rest_user': backups[value].rest_user,
'rest_password': backups[value].rest_password,
'rest_folder': backups[value].rest_folder
}
# save backups to JSON
with open(settings['json_file'], 'w', encoding="utf-8") as write_file:
json.dump(backups_json, write_file)
except OSError as error:
put_error(error)
#####################
# Load JSON
#####################
@staticmethod
def load_json():
""" load JSON """
with (open(settings['json_file'], 'r', encoding="utf-8")) as read_file:
backups_reload = json.load(read_file)
for count, value in enumerate(backups_reload.values()):
backups[count] = BackupList(value['name'],
value['repository'],
value['source'],
value['password'],
value['init'],
value['exclude'],
value['rest'],
value['rest_domain'],
value['rest_port'],
value['rest_user'],
value['rest_password'],
value['rest_folder'])
#####################
# Delete Entry
#####################
@staticmethod
def delete(number):
""" Delete an object """
backups.pop(number)
#####################
# ADD Entry
# #####################
@staticmethod
def add(name,
repository,
source,
password,
init,
exclude,
rest,
rest_domain,
rest_port,
rest_user,
rest_password,
rest_folder):
""" Add an object"""
backups[(BackupList.last_key()) + 1] = BackupList(name,
repository,
source,
password,
init,
exclude,
rest,
rest_domain,
rest_port,
rest_user,
rest_password,
rest_folder)
#####################
# Last Key
#####################
@staticmethod
def last_key():
""" Create last key """
last_key = list(dict.keys(backups))[-1]
return last_key
#####################
# Update
#####################
@staticmethod
def update(key, **kwargs):
""" Update an object """
# print(kwargs)
for i, k in kwargs.items():
# print(i, '=', k)
# print("I", i)
match i:
case 'name':
backups[key].name = str(k)
backups.update()
case 'repository':
backups[key].repository = str(k)
backups.update()
case 'source':
backups[key].source = str(k)
backups.update()
case 'password':
backups[key].password = str(k)
backups.update()
case 'init':
backups[key].init = str(k)
backups.update()
case 'exclude':
backups[key].exclude = str(k)
backups.update()
case 'rest':
backups[key].rest = str(k)
backups.update()
case 'rest_domain':
backups[key].rest_domain = str(k)
backups.update()
case 'rest_port':
backups[key].rest_port = str(k)
backups.update()
case 'rest_user':
backups[key].rest_user = str(k)
backups.update()
case 'rest_password':
backups[key].rest_password = str(k)
backups.update()
case 'rest_folder':
backups[key].rest_folder = str(k)
backups.update()
case _:
print("Error")
# create an empty dict
backups = {}
# Load data from filesystem
BackupList.load_json()
#####################
# SnapshotID()
######################
@dataclass
class SnapshotID:
"""
Structure of snapshot_id
[4, 'f00c870b', '5de67304']
4 = actual row (row)
f00c870b = ID Repository
5de67304 = ID Repository
ID Repositories are variable!
"""
id1: str
# def __init__(self, id1):
# """ Snapshot ID from input in restic_ls2 """
# self.id1 = id1
@staticmethod
def id_remove():
""" If the list snapshot_id is not exactly 1, then delete! """
if snapshot_id != ['']:
for count, value in enumerate(snapshot_id):
snapshot_id.remove(snapshot_id[count])
@staticmethod
def id_append(result):
"""
What do we do here? We get the ID's of the repositories
and save them in a list.
Why?
I want to store the ID's in a list to be able to query them
with the List Snapshot function to see if they exist.
Save current row in snapshot_id[0].
snapshot_id[0] = mainWin.listWidget.currentRow() # removed for testing!!
We split the result into individual lines.
Result is passed and a list is generated
"""
result_list = result.split('\n')
# We loop through the list and output the ID's and store in snapshot_id
for count, value in enumerate(result_list):
# Save data only if it comes from a row with snapshot ID.
# But this causes problems if you have snapshots with different source entries!?!?
# if backups[mainWin.listWidget.currentRow()].source in value:
snapshot_id.append(value[0:8])
# We create the list, with an empty entry [0].
snapshot_id = []
snapshot_id.append("")
snap_id = SnapshotID("0")
###############################################
# Main loop
###############################################
def main(): # PyWebIO application function
###############################################
# Functions
###############################################
def clear_scope():
for count, value in enumerate(clear_list):
clear(value)
def clear_scope_control():
for count, value in enumerate(clear_list_control):
clear(value)
#####################
# About
#####################
@use_scope('about')
def about():
#####################
# Version
#####################
@use_scope('about', clear=False)
def version():
with use_scope('about'):
about()
result = None
try:
args = ['restic', 'version']
result = subprocess.run(args,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr, position=-1, scope='about')
else:
pass
# we build header and tdata for table
tab_version = []
tab_version.append([put_markdown(r''' # Restic Version
''')])
tab_version.append([put_text(result.stdout)])
put_table(tab_version, position=-1, scope='about')
return False
#####################
# About / Settings
#####################
@use_scope("about", clear=False)
def settings_tab():
with use_scope('about'):
about()
SettingsLoad.load_json()
scroll_to("about", position='top')
#####################
# Input checks
#####################
def check(value): # return None when the check passes, otherwise return the error message
if value > 100 or value < 1:
return 'Value must between 1 - 100!!'
def check2(value): # return None when the check passes, otherwise return the error message
if value > 100 or value < 1:
return 'Value must between 1 - 100!!'
def check3(value): # return None when the check passes, otherwise return the error message
theme_list = ['dark', 'sketchy', 'minty', 'yeti']
if value not in theme_list:
return "This theme is not valid"
#####################
# Input group settings
#####################
# edit settings data
tab_settings = input_group("Edit Settings", [
input('Project*', name='project', type=TEXT, required=True,
readonly=True, value=settings["project"]),
input('Version*', name='version', type=TEXT, required=True,
readonly=True, value=settings["version"]),
input('Settings Path*', name='settings_path', type=TEXT, required=True,
readonly=True, value=settings["settings_path"]),
input('JSON File*', name='json_file', help_text="Please restart app if changed!", required=True,
value=settings["json_file"]),
input('Theme', type=TEXT, name='theme',
help_text='dark sketchy minty yeti (Please restart app if changed!)',
value=settings["theme"],
validate=check3),
input('Keep Last', type=NUMBER, name='keep_last', help_text='Value 1 - 100',
value=settings["keep_last"], validate=check2),
input('Keep Monthly', type=NUMBER, name='keep_monthly', help_text='Value 1 - 100',
value=settings["keep_monthly"], validate=check2),
input('Check', type=NUMBER, name='check', help_text='Value 1 - 100', value=settings["check"],
validate=check)
], cancelable=True)
# Cancel edit settings
if not tab_settings:
about()
scroll_to("about", position='bottom')
return
try:
with open(settings["settings_path"], "w", encoding="utf-8") as write_file:
json.dump(tab_settings, write_file)
put_info('File saved successful')
except OSError:
put_error("File Error")
scroll_to("about", position='bottom')
return False
clear_scope()
put_text('')
put_text('')
put_buttons(['Restic Version', 'Settings'], scope='about', onclick=[version, settings_tab])
put_markdown(f"""
version: {settings['version']}
""")
put_markdown(r"""
Gitlab -> https://gitlab.com/Bullet64/restic_ui_pywebio
Documentation ->
Forum: -> https://linux-nerds.org/category/62/pywebio
----------------
This project is not related to https://restic.net
**Thanks for this nice tool!**
Logo is © https://restic.net
""")
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>')
#####################
# Backup
#####################
@use_scope('backup')
def backup():
clear_scope()
BackupList.load_json()
# we build header and tdata for table
tab_init = []
# Print data from backups
for count, value in enumerate(backups):
# print("Control", count, backups[value].name)
if count == 0:
tab_init.append(['No.', 'Backup name of the restic data backup', 'Actions'])
if backups[value].init == "1":
button_list = {"label": 'Init', "value": 'Init', "color": 'primary', "disabled": True}
tab_init.append([count + 1,
backups[count].name,
put_buttons([
button_list],
onclick=partial(actions, count + 1))
])
else:
button_list = {'label': 'Init', 'value': 'Init', 'color': 'primary', "disabled": False}
tab_init.append([count + 1,
backups[count].name,
put_buttons([
button_list],
onclick=partial(actions, count + 1))
])
# we build header and tdata for table
tab_backup = []
for count, value in enumerate(backups):
if count == 0:
tab_backup.append(['No.', 'Backup name of the restic data backup', 'Actions'])
if backups[value].init == "0":
button_list1 = {"label": 'Backup', "value": 'Backup', "color": 'primary', "disabled": True}
button_list2 = {"label": 'Prune', "value": 'Prune', "color": 'primary', "disabled": True}
tab_backup.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2],
onclick=partial(actions, count + 1))
])
else:
button_list1 = {"label": 'Backup', "value": 'Backup', "color": 'primary', "disabled": False}
button_list2 = {"label": 'Prune', "value": 'Prune', "color": 'primary', "disabled": False}
tab_backup.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2],
onclick=partial(actions, count + 1))
])
# we build header and tdata for table
tab_mount = []
for count, value in enumerate(backups):
if count == 0:
tab_mount.append(['No.', 'Backup name of the restic data backup', 'Actions'])
if backups[value].init == "0":
button_list1 = {"label": 'Mount', "value": 'Mount', "color": 'primary', "disabled": True}
button_list2 = {"label": 'UMount', "value": 'UMount', "color": 'primary', "disabled": True}
button_list3 = {"label": 'Restore', "value": 'Restore', "color": 'primary', "disabled": True}
tab_mount.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2,
button_list3],
onclick=partial(actions, count + 1))
])
else:
button_list1 = {"label": 'Mount', "value": 'Mount', "color": 'primary', "disabled": False}
button_list2 = {"label": 'UMount', "value": 'UMount', "color": 'primary', "disabled": False}
button_list3 = {"label": 'Restore', "value": 'Restore', "color": 'primary', "disabled": False}
tab_mount.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2,
button_list3],
onclick=partial(actions, count + 1))
])
# we build header and tdata for table
tab_edit = []
for count, value in enumerate(backups):
if count == 0:
tab_edit.append(['No.', 'Backup name of the restic data backup', 'Actions'])
button_list1 = {"label": 'Edit', "value": 'Edit', "color": 'primary', "disabled": False}
button_list2 = {"label": 'Delete', "value": 'Delete', "color": 'danger', "disabled": False}
tab_edit.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2],
onclick=partial(actions, count + 1))
])
put_tabs([
{'title': 'Backup', 'content': [
put_table(tab_backup),
]},
{'title': 'Init', 'content': [
put_table(tab_init),
]},
{'title': 'Mount', 'content': [
put_table(tab_mount),
]},
{'title': 'Edit', 'content': [
put_table(tab_edit),
]},
])
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
#####################
# Migrate
#####################
@use_scope('migrate', position=-1)
def migrate():
clear_scope()
SettingsLoad.load_json()
# we build header and tdata for table
tab_migrate = []
for count, value in enumerate(backups):
if count == 0:
tab_migrate.append(['No.', 'Backup name of the restic data backup', 'Actions'])
if backups[value].init == "0":
button_list1 = {"label": 'Upgrade -> v2', "value": 'Update', "color": 'primary', "disabled": True}
tab_migrate.append([count + 1,
backups[count].name,
put_buttons([
button_list1],
onclick=partial(actions3, count + 1))
])
else:
button_list1 = {"label": 'Upgrade -> v2', "value": 'Update', "color": 'primary', "disabled": False}
tab_migrate.append([count + 1,
backups[count].name,
put_buttons([
button_list1],
onclick=partial(actions3, count + 1))
])
put_tabs([
{'title': 'Migrate from repo version V1 to V2', 'content': [
put_table(tab_migrate),
]},
])
# put_text("© Frank Mankel 2022", position=-1).style('text-align:center')
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
#####################
# Tools
#####################
@use_scope('tools')
def tools():
clear_scope()
SettingsLoad.load_json()
# we build header and tdata for table
tab_snapshot = []
for count, value in enumerate(backups):
if count == 0:
tab_snapshot.append(['No.', 'Backup name of the restic data backup', 'Actions'])
if backups[value].init == "0":
button_list1 = {"label": 'Snapshots', "value": 'Snapshots', "color": 'primary', "disabled": True}
button_list2 = {"label": 'List Snapshot', "value": 'List Snapshot', "color": 'primary',
"disabled": True}
tab_snapshot.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2],
onclick=partial(actions2, count + 1))
])
else:
button_list1 = {"label": 'Snapshots', "value": 'Snapshots', "color": 'primary', "disabled": False}
button_list2 = {"label": 'List Snapshot', "value": 'List Snapshot', "color": 'primary',
"disabled": False}
tab_snapshot.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2],
onclick=partial(actions2, count + 1))
])
# we build header and tdata for table
tab_check = []
for count, value in enumerate(backups):
if count == 0:
tab_check.append(['No.', 'Backup name of the restic data backup', 'Actions'])
if backups[value].init == "0":
button_list1 = {"label": 'Check', "value": 'Check', "color": 'primary', "disabled": True}
button_list2 = {"label": 'Stats', "value": 'Stats', "color": 'primary', "disabled": True}
button_list3 = {"label": 'Cache', "value": 'Cache', "color": 'primary', "disabled": True}
button_list4 = {"label": 'Unlock', "value": 'Unlock', "color": 'primary', "disabled": True}
tab_check.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2,
button_list3,
button_list4],
onclick=partial(actions2, count + 1))
])
else:
button_list1 = {"label": 'Check', "value": 'Check', "color": 'primary', "disabled": False}
button_list2 = {"label": 'Stats', "value": 'Stats', "color": 'primary', "disabled": False}
button_list3 = {"label": 'Cache', "value": 'Cache', "color": 'primary', "disabled": False}
button_list4 = {"label": 'Unlock', "value": 'Unlock', "color": 'primary', "disabled": False}
tab_check.append([count + 1,
backups[count].name,
put_buttons([
button_list1,
button_list2,
button_list3,
button_list4],
onclick=partial(actions2, count + 1))
])
put_tabs([
{'title': 'Snapshots', 'content': [
put_table(tab_snapshot),
]},
{'title': 'More Tools', 'content': [
put_table(tab_check),
]},
])
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
#####################
# Add backup
#####################
@use_scope('file')
def add_backup():
clear_scope()
rest_backup = radio(label='REST-Server', options=['Yes', 'No'], inline=True, value='No')
#####################
# checks for input
#####################
def check_repository(repo):
path = Path(repo)
if path.exists():
is_empty = not any(Path(path).iterdir())
if not is_empty:
return 'The directory is not empty, please choose another directory!'
else:
path.mkdir()
toast("Repo path created", duration=2, position='center', color='info')
def check_source(source):
path = Path(source)
if path.exists():
pass
else:
return 'Source path does not exist'
with use_scope('main_menu'):
#####################
# No REST backup
#####################
match rest_backup:
case 'No':
with use_scope('require'):
put_text('')
put_text('')
put_text('* Required field', scope='require')
# input data
add_backup = input_group("Add Backup", [
input('Backup Name*', name='name', type=TEXT, required=True),
input('Repository*', name='repository', type=TEXT, required=True, value=f'{USERHOME}/repo_1',
validate=check_repository),
input('Source*', name='source', type=TEXT, required=True, value=f'{USERHOME}/source_1',
validate=check_source),
input('Password', name='password', type=PASSWORD),
input('Exclude List', type=TEXT, name='exclude')
], cancelable=True)
if not add_backup:
clear('require')
file()
return False
# control output
clear('require')
BackupList.add(add_backup['name'],
add_backup['repository'],
add_backup['source'],
add_backup['password'],
"0",
add_backup['exclude'],
"0",
"",
"",
"",
"",
"")
BackupList.save_json()
with use_scope('file'):
try:
BackupList.load_json()
except OSError:
put_error("Error")
else:
file()
put_info("File saved", position=-1, scope='file')
return False
#####################
# REST-Backup
#####################
case 'Yes':
with use_scope('require'):
put_text('')
put_text('')
put_text('* Required field', scope='require')
# input data
add_backup = input_group("Add REST-Backup", [
input('Backup Name*', name='name', type=TEXT, required=True),
input('Source*', name='source', type=TEXT, required=True, value=f'{USERHOME}/source_1',
validate=check_source),
input('Password', name='password', type=PASSWORD),
input('Exclude List', name='exclude', type=TEXT),
input('REST Domain*', name='rest_domain', type=URL, required=True),
input('REST Port*', name='rest_port', type=NUMBER, required=True),
input('REST User*', name='rest_user', type=TEXT, required=True),
input('REST Password*', name='rest_password', type=PASSWORD, required=True),
input('REST Folder*', name='rest_folder', type=TEXT, required=True)
], cancelable=True)
if not add_backup:
clear('require')
file()
return False
# control output
clear('require')
BackupList.add(add_backup['name'],
"",
add_backup['source'],
add_backup['password'],
"0",
add_backup['exclude'],
"1",
add_backup['rest_domain'],
add_backup['rest_port'],
add_backup['rest_user'],
add_backup['rest_password'],
add_backup['rest_folder'])
BackupList.save_json()
with use_scope('file'):
try:
BackupList.load_json()
except OSError:
put_error("Error")
else:
file()
put_info("File saved", position=-1, scope='file')
return False
case _:
file()
#####################
# Open Repo
#####################
def open_repo():
with use_scope('file'):
with use_scope('open_repo'):
clear('load_file')
clear('open_repo')
put_info("Open existing repo.You must provide a valid password for that repo!")
# get repo path
path = popup_input([
put_input("path", label="Repository Path?"),
], 'Open Repo')
if path is None:
clear('open_repo')
return
# Test if passed value is also an existing repo
if not path['path']:
put_error("Empty string. Open Repo function aborted")
return
if path:
config_path = Path(path['path']+'/config')
repo_path = Path(path['path'])
if config_path.exists():
pass
else:
put_error('Source path does not contain an restic repo')
return
# got passwd for repo
pass_word = popup_input([
put_input("pass_word", label="Please enter PW", type=PASSWORD),
], 'Password')
if not pass_word:
clear('open_repo')
return False
if not pass_word['pass_word']:
put_error("Empty string. Open Repo function aborted")
return
with put_loading():
# check if repo in path is valid
try:
args = ['restic',
'-r',
repo_path,
'snapshots']
result = subprocess.run(args,
input=pass_word['pass_word'],
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
# Process don't successful, send signal
put_error(error.stderr)
return False
else:
# Process successful, send signal
put_info('Please add the source path manually')
put_info(result.stdout)
finally:
pass
BackupList.add("Open_Repo",
str(repo_path),
"",
pass_word['pass_word'],
"1",
"",
"0",
"",
"",
"",
"",
"")
# We save the entry to the JSON
BackupList.save_json()
# load JSON
BackupList.load_json()
pass_word = {'password': ''}
#####################
# File
#####################
@use_scope('file')
def file():
clear_scope()
put_buttons(['Open Repo', 'Add Backup'], position=0, onclick=[open_repo, add_backup])
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
#####################
# Actions **control**
#####################
@use_scope('control')
def actions(line, action):
match action:
#####################
# Backup
#####################
case 'Backup':
clear_scope_control()
put_info(f"Backup from {backups[line - 1].name}")
pass_word = ""
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
# print(pass_word['password'])
if pass_word is None:
return
with put_loading():
try:
# REST == 0
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
# Restic function
if backups[line - 1].exclude == "":
args = ['restic',
'-r',
backups[line - 1].repository,
'backup',
backups[line - 1].source]
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
args = ['restic',
'-r',
backups[line - 1].repository,
'backup',
backups[line - 1].source,
f'--exclude-file={backups[line - 1].exclude}']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
if backups[line - 1].exclude == "":
args = ['restic',
'-r',
backups[line - 1].repository,
'backup',
backups[line - 1].source]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
args = ['restic',
'-r',
backups[line - 1].repository,
'backup',
backups[line - 1].source,
f'--exclude-file={backups[line - 1].exclude}']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
# BACKUP for REST == 1
if not backups[line - 1].password:
if backups[line - 1].exclude == "":
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'backup',
backups[line - 1].source]
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'backup',
backups[line - 1].source,
f'--exclude-file={backups[line - 1].exclude}']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
if backups[line - 1].exclude == "":
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'backup',
backups[line - 1].source]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'backup',
backups[line - 1].source,
f'--exclude-file={backups[line - 1].exclude}']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control')
else:
put_info(result.stdout, position=-1, scope='control')
toast("Backup successful created", duration=4, position='center', color='success')
#####################
# Prune
#####################
case 'Prune':
clear_scope_control()
put_info(f"Prune from {backups[line - 1].name}")
pass_word = ""
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
# print(pass_word['password'])
if pass_word is None:
return
try:
with put_loading():
# restic function prune
if backups[line - 1].rest == '0':
if not backups[line - 1].password:
args = ['restic',
'-r',
backups[line - 1].repository,
'forget',
'--keep-last',
str(settings['keep_last']),
'--keep-monthly',
str(settings['keep_monthly']),
'--prune']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
args = ['restic',
'-r',
backups[line - 1].repository,
'forget',
'--keep-last',
str(settings['keep_last']),
'--keep-monthly',
str(settings['keep_monthly']),
'--prune']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'forget',
'--keep-last',
str(settings['keep_last']),
'--keep-monthly',
str(settings['keep_monthly']),
'--prune']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'forget',
'--keep-last',
str(settings['keep_last']),
'--keep-monthly',
str(settings['keep_monthly']),
'--prune']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control')
else:
put_info(result.stdout, scope='control')
toast("Process Prune successfully completed", duration=4, position='center', color='success')
#####################
# Edit
#####################
case 'Edit':
clear_scope_control()
#####################
# checks for input
#####################
def check_source(source):
path = Path(source)
if path.exists():
pass
else:
return 'Source path does not exist'
with use_scope('edit'):
clear('backup')
clear('control')
match backups[line - 1].rest:
case '0':
# input data
edit_backup = input_group("Edit Backup", [
input('Backup Name*', name='backup_name', type=TEXT, required=True,
value=backups[line - 1].name),
input('Repository*', name='repository', type=TEXT, required=True,
value=backups[line - 1].repository),
input('Source*', name='source', type=TEXT, required=True,
value=backups[line - 1].source, validate=check_source),
input('Password', name='password', value=backups[line - 1].password),
input('Exclude List', type=TEXT, name='exclude_list', value=backups[line - 1].exclude)
], cancelable=True)
# Cancel Input Group
if not edit_backup:
backup()
return
BackupList.update((line - 1), name=edit_backup['backup_name'],
repository=edit_backup['repository'],
source=edit_backup['source'],
password=edit_backup['password'],
exclude=edit_backup['exclude_list'])
BackupList.save_json()
backup()
case '1':
clear('backup')
# input data
edit_backup = input_group("Edit REST-Backup", [
input('Backup Name*', name='backup_name', type=TEXT, required=True,
value=backups[line - 1].name),
input('Source*', name='source', type=TEXT, required=True,
value=backups[line - 1].source, validate=check_source),
input('Password', name='password', value=backups[line - 1].password),
input('Exclude List', name='exclude_list', type=TEXT, value=backups[line - 1].exclude),
input('REST Domain*', name='rest_domain', type=TEXT, required=True,
value=backups[line - 1].rest_domain),
input('REST Port*', name='rest_port', type=NUMBER, required=True,
value=backups[line - 1].rest_port),
input('REST User*', name='rest_user', type=TEXT, required=True,
value=backups[line - 1].rest_user),
input('REST Password*', name='rest_password', required=True,
value=backups[line - 1].rest_password),
input('REST Folder*', name='rest_folder', type=TEXT, required=True,
value=backups[line - 1].rest_folder)
], cancelable=True)
# Cancel Input Group
if not edit_backup:
backup()
return
BackupList.update((line - 1), name=edit_backup['backup_name'],
source=edit_backup['source'],
password=edit_backup['password'],
exclude=edit_backup['exclude_list'],
rest_domain=edit_backup['rest_domain'],
rest_port=edit_backup['rest_port'],
rest_user=edit_backup['rest_user'],
rest_password=edit_backup['rest_password'],
rest_folder=edit_backup['rest_folder'])
BackupList.save_json()
backup()
case _:
put_row(put_buttons(['Backup', 'File', 'Tools', 'Migrate', 'About'], group=True,
onclick=[backup, file, tools, migrate, about]))
backup()
#####################
# Delete
#####################
case 'Delete':
# Delete entry?
if not backups[line - 1].source:
backups[line - 1].source = 'EMPTY'
form_delete = popup_input([
put_info(f"We don't delete your repository data. Please delete the data yourself."
f" Repo-Path is {backups[line - 1].source}"),
], "Delete Entry?")
match form_delete:
case 'None':
backup()
case {}:
# delete entries
BackupList.delete((line - 1))
# save json
BackupList.save_json()
# clear dict
backups.clear()
# Reload Tab Backup
backup()
case _:
pass
#####################
# Mount
#####################
case 'Mount':
clear_scope_control()
put_info(f"Mount from {backups[line - 1].name}")
pass_word = {'password': ""}
mount_path = popup_input([
put_input("mount_path", label="Where to mount the repo?"),
])
if mount_path is None:
clear_scope_control()
return
if not mount_path['mount_path']:
put_error("You have not specified a mount folder. Mount function aborted")
return
m_path[0] = mount_path['mount_path']
path = Path(m_path[0])
if path.exists():
put_error("You have specified a existing mount path. Mount function aborted")
return
path.mkdir()
put_info(f"Successful created mount folder {mount_path['mount_path']} !")
# if no password is in backups
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
# print(pass_word['password'])
if pass_word is None:
return
with put_loading():
try:
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
# Create cmd
cmd = ['restic',
'-r',
backups[line - 1].repository,
'mount',
m_path[0]]
# We create the object, here with stdin and open the asynchronous subprocess
process = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
process.stdin.write(pass_word['password'].encode())
process.stdin.close()
# Delete the variable from the input
pass_word = {'password': ''}
# if password is in backups
else:
# Create cmd
cmd = ['restic',
'-r',
backups[line - 1].repository,
'mount',
m_path[0]]
pwd = backups[line - 1].password.encode()
# We create the object, here with stdin and open the asynchronous subprocess
process = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
process.stdin.write(pwd)
process.stdin.close()
else:
# if no password is in backups
if not backups[line - 1].password:
# Create cmd
cmd = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'mount',
m_path[0]]
# We create the object, here with stdin and open the asynchronous subprocess
process = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
process.stdin.write(pass_word['password'].encode())
process.stdin.close()
# Delete the variable from the input
pass_word = {'password': ''}
# if password is in backups
else:
# Create cmd
cmd = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'mount',
m_path[0]]
pwd = backups[line - 1].password.encode()
# We create the object, here with stdin and open the asynchronous subprocess
process = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
process.stdin.write(pwd)
process.stdin.close()
except subprocess.CalledProcessError as error:
print(error)
finally:
# we give some time for our subprocess.Popen
time.sleep(1.0)
try:
if process.poll() == 1:
raise Exception(
"The Restic mount command was not successful.\n"
"Probable cause: wrong password or no key found")
try:
process = subprocess.run(['ls',
'-lha',
m_path[0]],
check=True,
stdout=subprocess.PIPE,
universal_newlines=True)
output = process.stdout
except subprocess.CalledProcessError as error:
put_error("Error while fetching the directory contents")
print(error)
else:
put_info(output)
toast("Mount point successful created", duration=4, position='center',
color='success')
except Exception as error:
put_error(error)
#####################
# UMount
#####################
case 'UMount':
clear_scope_control()
put_info(f"UMount from {backups[line - 1].name}")
# Mount Path comes from m_path[0] MountPath-Class
if not type(m_path[0]) == str or m_path[0] == "":
put_error("No mount path available, you must first mount an repository!")
return
if backups[line - 1].init == "0":
put_error("Status INIT = 0, please initialize first!")
else:
try:
# Create cmd
cmd = ['umount', m_path[0]]
subprocess.run(cmd,
check=True,
stdin=subprocess.PIPE)
except subprocess.CalledProcessError as error:
put_error(error)
else:
put_info("UMount successful!")
toast("UMount successful", duration=4, position='center', color='success')
# After umount, we will delete mount folder!
try:
# Delete mount folder
result = subprocess.run(['rm',
'-rf',
m_path[0]],
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error)
else:
put_info(f"Mount Folder {m_path[0]} deleted")
finally:
m_path[0] = ""
#####################
# Restore
#####################
case 'Restore':
clear_scope_control()
put_info(f"Restore from {backups[line - 1].name}")
# Ask for restore path
restore_path = popup_input([
put_input("restore_path", label="Where to restore the repo?"),
])
# check return from restore_path
if restore_path is None:
clear_scope_control()
return
if not restore_path['restore_path']:
put_error("You have not specified a restore folder. Restore function aborted")
return
# Check path and create
path = Path(restore_path['restore_path'])
if path.exists():
put_error("You have specified a existing restore path. Restore function aborted")
return
path.mkdir()
put_info(f"Successful created mount folder in {restore_path['restore_path']} !")
result = False
# Ask for snap_id
restore_id = popup_input([
put_input("id", label="Snapshot ID"),
put_info(f"If you do not enter one, the last snapshot from {backups[line - 1].name} will be used."),
], "Please enter the snapshot ID")
# print(restore_id)
if restore_id is None:
# print("ID cancel")
clear_scope_control()
return
if restore_id == {'id': ''}:
# print("Empty ID")
snap_id.id1 = 'latest'
result = True
else:
# We loop through the list and output the ID's
for count, value in enumerate(snapshot_id):
if restore_id['id'] == snapshot_id[count]:
result = True
snap_id.id1 = restore_id['id']
if result is True:
try:
pass_word = {'password': ""}
# restic function restore
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
# print(pass_word['password'])
if pass_word is None:
return
args = ['restic', '-r',
backups[line - 1].repository,
'restore',
snap_id.id1,
'--target',
restore_path['restore_path']]
subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
args = ['restic', '-r',
backups[line - 1].repository,
'restore',
snap_id.id1,
'--target',
restore_path['restore_path']]
subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'restore',
snap_id.id1,
'--target',
restore_path['restore_path']]
subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'restore',
snap_id.id1,
'--target',
restore_path['restore_path']]
subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr)
else:
try:
process = subprocess.run(['ls',
'-lha',
restore_path['restore_path']],
check=True,
stdout=subprocess.PIPE,
universal_newlines=True)
output = process.stdout
except subprocess.CalledProcessError as error:
put_error(error)
else:
put_info(output)
toast("Restore successful created", duration=4, position='center', color='success')
#####################
# Init
#####################
case 'Init':
clear_scope_control()
# pass_word = ""
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
# print(pass_word['password'])
if pass_word is None:
clear_scope_control()
return
with put_loading():
result = None
pass_word = {'password': ""}
try:
if backups[line - 1].rest == "0":
# INIT for normal backup
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'init']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'init']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
# INIT for REST backup
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'init']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'init']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
toast(error.stderr, duration=8, color='warn')
else:
BackupList.update((line - 1), init="1")
put_info(result.stdout, scope='control')
toast("Init successful created", duration=8, color='success')
scroll_to("control", position='top')
BackupList.save_json()
BackupList.load_json()
backup()
scroll_to("control", position='bottom')
###############################################
# Function actions **control2**
###############################################
@use_scope('control2', clear=True)
def actions2(line, action):
match action:
#####################
# Snapshots
#####################
case 'Snapshots':
clear_scope_control()
put_info(f"Snapshots from {backups[line - 1].name}")
pass_word = ""
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
if pass_word is None:
clear_scope_control()
return
with put_loading():
try:
# REST == 0
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'snapshots']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'snapshots']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'snapshots']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'snapshots']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control2')
else:
SnapshotID.id_append(result.stdout)
put_info(result.stdout, scope='control2')
toast("Snapshots successful listed", duration=4, position='center', color='success')
#####################
# List Snapshot
#####################
case 'List Snapshot':
clear_scope_control()
put_info(f"List Snapshot from {backups[line - 1].name}")
pass_word = ""
result = False
# print("Snapshot", snapshot_id)
snaps_id = popup_input([
put_input("id", label="Snapshot ID"),
put_info(f"If you do not enter one, the last snapshot from {backups[line - 1].name} will be used."),
], "Please enter the snapshot ID")
# print(ID)
if snaps_id is None:
# print("ID cancel")
clear_scope_control()
return
if snaps_id == {'id': ''}:
# print("Empty ID")
snap_id.id1 = 'latest'
result = True
else:
# We loop through the list and output the ID's
for count, value in enumerate(snapshot_id):
if snaps_id['id'] == snapshot_id[count]:
result = True
snap_id.id1 = snaps_id['id']
if result is True:
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
if pass_word is None:
return
with put_loading():
try:
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'ls',
snap_id.id1]
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'ls',
snap_id.id1]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'ls',
snap_id.id1]
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'ls',
snap_id.id1]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control2')
else:
put_info(result.stdout, scope='control2')
toast("Snapshot successful listed", duration=4, position='center', color='success')
SnapshotID.id_remove()
snap_id.id1 = ""
else:
toast("Wrong <ID>, please enter correct Snapshot <ID>", duration=4,
position='center', color='error')
#####################
# Check
#####################
case 'Check':
clear_scope_control()
put_info(f"Check from {backups[line - 1].name}")
check = settings['check']
check2 = f"{check}%"
pass_word = {'password': ''}
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
print(pass_word)
if pass_word is None:
clear_scope_control()
return
with put_loading():
try:
match_list = [backups[line - 1].rest, bool(backups[line - 1].password)]
match match_list:
# backups[row].rest, bool(backups[row].password)
case ["0", False]:
# restic function check
args = ['restic',
'-r',
backups[line - 1].repository,
'check',
'--read-data-subset',
check2]
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
case ["0", True]:
# restic function check
args = ['restic',
'-r',
backups[line - 1].repository,
'check',
'--read-data-subset',
check2]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
case ["1", False]:
# restic function check
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'check',
'--read-data-subset',
check2]
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
case ["1", True]:
# restic function check
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'check',
'--read-data-subset',
check2]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
case _:
put_error("Pattern match not found")
return False
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control2')
else:
put_info(result.stdout, scope='control2')
toast("Check successful done", duration=4, position='center', color='success')
#####################
# Stats
#####################
case 'Stats':
clear_scope_control()
put_info(f"Stats from {backups[line - 1].name}")
pass_word = ""
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
if pass_word is None:
clear_scope_control()
return
with put_loading():
try:
match_list = [backups[line - 1].rest, bool(backups[line - 1].password)]
match match_list:
# backups[row].rest, bool(backups[row].password)
case ["0", False]:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'stats']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
# backups[row].rest, bool(backups[row].password)
case ["0", True]:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'stats']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
# backups[row].rest, bool(backups[row].password)
case ["1", False]:
# Restic function
args = ['restic',
'-r',
f"rest:https://"
f"{backups[line - 1].rest_user}:"
f"{backups[line - 1].rest_password}@"
f"{backups[line - 1].rest_domain}:"
f"{backups[line - 1].rest_port}/"
f"{backups[line - 1].rest_folder}",
'stats']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
# backups[row].rest, bool(backups[row].password)
case ["1", True]:
# Restic function
args = ['restic',
'-r',
f"rest:https://"
f"{backups[line - 1].rest_user}:"
f"{backups[line - 1].rest_password}@"
f"{backups[line - 1].rest_domain}:"
f"{backups[line - 1].rest_port}/"
f"{backups[line - 1].rest_folder}",
'stats']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
case _:
put_error("Pattern match not found")
return False
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control2')
else:
put_info(result.stdout, scope='control2')
toast("Stats successful done", duration=4, position='center', color='success')
#####################
# Cache
#####################
case 'Cache':
clear_scope_control()
put_info(f"Clear Cache from {backups[line - 1].name}")
pass_word = ""
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
if pass_word is None:
clear_scope_control()
return
with put_loading():
try:
# restic function check
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'cache',
'--cleanup']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'cache',
'--cleanup']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'cache',
'--cleanup']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'cache',
'--cleanup']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control2')
else:
put_info(result.stdout, scope='control2')
toast("Stats successful listed", duration=4, position='center', color='success')
#####################
# Unlock
#####################
case 'Unlock':
clear_scope_control()
put_info(f"Unlock from {backups[line - 1].name}")
pass_word = ""
if not backups[line - 1].password:
pass_word = popup_input([
put_input("password", label="Password?", type=PASSWORD),
])
if pass_word is None:
clear_scope_control()
return
with put_loading():
try:
# restic function check
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'unlock']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
backups[line - 1].repository,
'unlock']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
else:
if not backups[line - 1].password:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'unlock']
result = subprocess.run(args,
input=pass_word['password'],
check=True,
capture_output=True,
text=True)
# Delete the variable from the input
pass_word = {'password': ''}
else:
# Restic function
args = ['restic',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}',
'unlock']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control2')
else:
put_info(result.stdout, scope='control2')
toast("Unlock successful done", duration=4, position='center', color='success')
###############################################
# Function actions **control3**
###############################################
@use_scope('control3', clear=True)
def actions3(line, action):
match action:
#####################
# Migrate Update
#####################
case 'Update':
put_info(f"Migrate Update from {backups[line - 1].name}")
with put_loading():
try:
if backups[line - 1].rest == "0":
if not backups[line - 1].password:
# check if upgrade possible
args = ['restic',
'migrate',
'-r',
backups[line - 1].repository]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
if result.stdout.find("no migrations found"):
put_error('Not possible, no migration path available', scope='control3')
return
# upgrade
args = ['restic',
'migrate',
'upgrade_repo_v2',
'-r',
backups[line - 1].repository]
result = subprocess.run(args,
env=dict(os.environ,
RESTIC_PASSWORD=backups[line - 1].password),
check=True,
capture_output=True,
text=True)
else:
# check if upgrade possible
args = ['restic',
'migrate',
'-r',
backups[line - 1].repository]
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
if result.stdout.find("no migrations found"):
put_error('Not possible, no migration path available', scope='control3')
toast("Not possible, no migration path available", duration=4, position='center',
color='error')
return
# upgrade
args = ['restic',
'migrate',
'upgrade_repo_v2',
'-r',
backups[line - 1].repository]
result = subprocess.run(args,
env=dict(os.environ,
RESTIC_PASSWORD=backups[line - 1].password),
check=True,
capture_output=True,
text=True)
else:
if not backups[line - 1].password:
# check if upgrade possible
args = ['restic',
'migrate',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
if result.stdout.find("no migrations found"):
put_error('Not possible, no migration path available', scope='control3')
toast("Not possible, no migration path available", duration=4, position='center',
color='error')
return
# upgrade
args = ['restic',
'migrate',
'upgrade_repo_v2',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}']
result = subprocess.run(args,
env=dict(os.environ,
RESTIC_PASSWORD=backups[line - 1].password),
check=True,
capture_output=True,
text=True)
else:
# check if upgrade possible
args = ['restic',
'migrate',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}']
result = subprocess.run(args,
input=backups[line - 1].password,
check=True,
capture_output=True,
text=True)
if result.stdout.find("no migrations found"):
put_error('Not possible, no migration path available', scope='control3')
toast("Not possible, no migration path available", duration=4, position='center',
color='error')
return
# upgrade
args = ['restic',
'migrate',
'upgrade_repo_v2',
'-r',
f'rest:https://'
f'{backups[line - 1].rest_user}:'
f'{backups[line - 1].rest_password}@'
f'{backups[line - 1].rest_domain}:'
f'{backups[line - 1].rest_port}/'
f'{backups[line - 1].rest_folder}']
except subprocess.CalledProcessError as error:
put_error(error.stderr, scope='control3')
else:
put_info(result.stdout, scope='control3')
toast("Migrate Update successful done", duration=4, position='center', color='success')
###############################################
# Scopes
###############################################
with use_scope('main_menu', position=-1): # open and enter a new output: 'main_menu'
with open('images/Restic_Logo.png', 'rb') as load_image:
restic_image = load_image.read()
# read session info
# put_code(json.dumps({
# k: str(getattr(session_info, k))
# for k in ['user_agent', 'user_language', 'server_host',
# 'origin', 'user_ip', 'backend', 'protocol', 'request']
# }, indent=4), 'json')
user_info = str(session_info.user_agent)
if user_info == 'Generic Smartphone / Android 13 / Firefox Mobile 109.0':
put_grid([
[put_html('<h2>Restic UI - Webinterface for backup tool restic</h2>')]
], )
put_text("")
else:
put_grid([
[put_image(restic_image, width='auto', title="TEST"),
put_html('<h2>Restic UI - Webinterface for backup tool restic</h2>')]
], )
put_text("")
put_row(put_buttons(['Backup', 'File', 'Tools', 'Migrate', 'About'], group=True,
onclick=[backup, file, tools, migrate, about]))
backup()
###############################################
# Start server (Main loop)
###############################################
start_server(main, debug=True, port=9090)