2990 lines
127 KiB
Python
2990 lines
127 KiB
Python
###############################################
|
|
# Imports
|
|
###############################################
|
|
|
|
import time
|
|
import datetime
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from functools import partial
|
|
from dataclasses import dataclass
|
|
|
|
from pywebio import start_server, config
|
|
from pywebio.pin import put_input
|
|
from pywebio.session import info as session_info
|
|
from pywebio.output import put_text, \
|
|
put_table, \
|
|
use_scope, \
|
|
scroll_to, \
|
|
put_markdown, \
|
|
put_buttons, \
|
|
clear, \
|
|
put_info, \
|
|
put_error, \
|
|
put_html, \
|
|
put_tabs, \
|
|
toast, \
|
|
put_image, \
|
|
put_row, \
|
|
put_grid, \
|
|
put_loading
|
|
from pywebio.input import input, \
|
|
input_group, \
|
|
NUMBER, \
|
|
PASSWORD, \
|
|
TEXT, \
|
|
URL, \
|
|
radio
|
|
from pywebio_battery import popup_input
|
|
|
|
|
|
###############################################
|
|
# Set paths and file names
|
|
###############################################
|
|
|
|
print(datetime.datetime.now())
|
|
|
|
# Get home directory from user
|
|
USERHOME = str(Path.home())
|
|
|
|
VERSION_NUMBER = '0.4.7'
|
|
|
|
settings_path = Path(f'{USERHOME}/settings_pywebio.json')
|
|
json_path = Path(f'{USERHOME}/backup_list.json')
|
|
settings = {}
|
|
|
|
clear_list = {'control', 'control2', 'control3', 'control4',
|
|
'file', 'about', 'load_file', 'migrate', 'open_repo',
|
|
'require', 'backup', 'tools'}
|
|
clear_list_control = {'control', 'control2', 'control3', 'control4'}
|
|
|
|
|
|
###############################################
|
|
# Classes
|
|
###############################################
|
|
|
|
#####################
|
|
# Class for Settings
|
|
#####################
|
|
@dataclass
|
|
class SettingsLoad():
|
|
""" Class to store settings """
|
|
project: str
|
|
version: str
|
|
settings_path: str
|
|
json_file: str
|
|
theme: str
|
|
keep_last: str
|
|
keep_monthly: str
|
|
check: str
|
|
|
|
#####################
|
|
# Load JSON
|
|
#####################
|
|
@staticmethod
|
|
def load_json():
|
|
""" load JSON """
|
|
try:
|
|
# try to open settings file
|
|
# load json file into settings_reload
|
|
with open(settings_path, "r", encoding="utf-8") as read_file:
|
|
settings_reload = json.load(read_file)
|
|
|
|
# Deserialization settings_reload
|
|
for key, value in settings_reload.items():
|
|
settings[key] = value
|
|
|
|
# Create dataclass object
|
|
SettingsLoad(settings['project'],
|
|
settings['version'],
|
|
settings['settings_path'],
|
|
settings['json_file'],
|
|
settings['theme'],
|
|
settings['keep_last'],
|
|
settings['keep_monthly'],
|
|
settings['check'])
|
|
|
|
except OSError:
|
|
# create settings file if not exists
|
|
init_settings = {'project': 'Restic UI',
|
|
'version': f'{VERSION_NUMBER}',
|
|
'settings_path': f'{settings_path}',
|
|
'json_file': f'{json_path}',
|
|
'theme': 'dark',
|
|
'keep_last': "3",
|
|
'keep_monthly': "3",
|
|
'check': '50'}
|
|
|
|
with open(settings_path, "w", encoding="utf-8") as write_file:
|
|
json.dump(init_settings, write_file)
|
|
|
|
SettingsLoad.load_json()
|
|
|
|
@staticmethod
|
|
def version_number():
|
|
""" add correct version number """
|
|
with open(settings_path, "r", encoding="utf-8") as read_file:
|
|
reload_settings = json.load(read_file)
|
|
|
|
# json_path = settings['json_file']
|
|
|
|
if not reload_settings['version'] == VERSION_NUMBER:
|
|
reload_settings.update({'version': f'{VERSION_NUMBER}'})
|
|
|
|
with open(settings_path, "w", encoding="utf-8") as write_file:
|
|
json.dump(reload_settings, write_file)
|
|
|
|
SettingsLoad.load_json()
|
|
|
|
|
|
# Load object
|
|
SettingsLoad.load_json()
|
|
# print("Control: ", settings['theme'])
|
|
|
|
# check for new version number and set
|
|
if VERSION_NUMBER != settings['version']:
|
|
SettingsLoad.version_number()
|
|
|
|
# set theme from settings
|
|
if settings['theme']:
|
|
config(title="Restic UI", description="User Interface for backup tool restic...", theme=settings['theme'])
|
|
else:
|
|
config(title="Restic UI", description="User Interface for backup tool restic...")
|
|
|
|
|
|
#####################
|
|
# MountPath
|
|
#####################
|
|
@dataclass(kw_only=True)
|
|
class MountPath:
|
|
""" Class to store Mount Path for funktion mount & umount """
|
|
path: str
|
|
|
|
|
|
# We create the list, with an empty entry [0].
|
|
m_path = []
|
|
m_path.insert(0, MountPath(path=""))
|
|
# m_path.append(MountPath("/TEST/test"))
|
|
|
|
|
|
#####################
|
|
# Create Example File
|
|
######################
|
|
|
|
example_list = {"0": {"name": "Example 1",
|
|
"repository": f"{USERHOME}/backup 1",
|
|
"source": f"{USERHOME}/source 1",
|
|
"password": "123456",
|
|
"init": "0",
|
|
"exclude": "",
|
|
"rest": "0",
|
|
"rest_domain": "",
|
|
"rest_port": "",
|
|
"rest_user": "",
|
|
"rest_password": "",
|
|
"rest_folder": ""},
|
|
"1": {"name": "Example 2",
|
|
"repository": f"{USERHOME}/backup 2",
|
|
"source": f"{USERHOME}/source 2",
|
|
"password": "123456",
|
|
"init": "0",
|
|
"exclude": "",
|
|
"rest": "0",
|
|
"rest_domain": "",
|
|
"rest_port": "",
|
|
"rest_user": "",
|
|
"rest_password": "",
|
|
"rest_folder": ""},
|
|
"2": {"name": "Example 3",
|
|
"repository": f"{USERHOME}/backup 3",
|
|
"source": f"{USERHOME}/source3",
|
|
"password": "12345678",
|
|
"init": "0",
|
|
"exclude": "",
|
|
"rest": "1",
|
|
"rest_domain": "https://rest.example.com",
|
|
"rest_port": "12345",
|
|
"rest_user": "rest_user",
|
|
"rest_password": "rest_password",
|
|
"rest_folder": "rest_folder"}}
|
|
|
|
# we save backup_list.json
|
|
if not Path(settings['json_file']).exists():
|
|
try:
|
|
with open(settings['json_file'], 'w', encoding='utf-8') as write_file:
|
|
json.dump(example_list, write_file)
|
|
|
|
except OSError:
|
|
print("An error has occurred")
|
|
|
|
|
|
#####################
|
|
# BackupList()
|
|
#####################
|
|
@dataclass()
|
|
class BackupList:
|
|
""" I build here a class to manage the backup list
|
|
Data is in backup_list.json """
|
|
name: str
|
|
repository: str
|
|
source: str
|
|
password: str
|
|
init: str
|
|
exclude: str
|
|
rest: str
|
|
rest_domain: str
|
|
rest_port: str
|
|
rest_user: str
|
|
rest_password: str
|
|
rest_folder: str
|
|
|
|
#####################
|
|
# Save JSON
|
|
#####################
|
|
@staticmethod
|
|
def save_json():
|
|
""" We store the JSON file for data backup. """
|
|
|
|
try:
|
|
backups_json = {}
|
|
for count, value in enumerate(backups):
|
|
backups_json[value] = {'name': backups[value].name,
|
|
'repository': backups[value].repository,
|
|
'source': backups[value].source,
|
|
'password': backups[value].password,
|
|
'init': backups[value].init,
|
|
'exclude': backups[value].exclude,
|
|
'rest': backups[value].rest,
|
|
'rest_domain': backups[value].rest_domain,
|
|
'rest_port': backups[value].rest_port,
|
|
'rest_user': backups[value].rest_user,
|
|
'rest_password': backups[value].rest_password,
|
|
'rest_folder': backups[value].rest_folder
|
|
}
|
|
|
|
# save backups to JSON
|
|
with open(settings['json_file'], 'w', encoding="utf-8") as write_file:
|
|
json.dump(backups_json, write_file)
|
|
|
|
except OSError as error:
|
|
put_error(error)
|
|
|
|
#####################
|
|
# Load JSON
|
|
#####################
|
|
@staticmethod
|
|
def load_json():
|
|
""" load JSON """
|
|
with (open(settings['json_file'], 'r', encoding="utf-8")) as read_file:
|
|
backups_reload = json.load(read_file)
|
|
|
|
for count, value in enumerate(backups_reload.values()):
|
|
backups[count] = BackupList(value['name'],
|
|
value['repository'],
|
|
value['source'],
|
|
value['password'],
|
|
value['init'],
|
|
value['exclude'],
|
|
value['rest'],
|
|
value['rest_domain'],
|
|
value['rest_port'],
|
|
value['rest_user'],
|
|
value['rest_password'],
|
|
value['rest_folder'])
|
|
|
|
#####################
|
|
# Delete Entry
|
|
#####################
|
|
@staticmethod
|
|
def delete(number):
|
|
""" Delete an object """
|
|
backups.pop(number)
|
|
|
|
#####################
|
|
# ADD Entry
|
|
# #####################
|
|
@staticmethod
|
|
def add(name,
|
|
repository,
|
|
source,
|
|
password,
|
|
init,
|
|
exclude,
|
|
rest,
|
|
rest_domain,
|
|
rest_port,
|
|
rest_user,
|
|
rest_password,
|
|
rest_folder):
|
|
""" Add an object"""
|
|
|
|
backups[(BackupList.last_key()) + 1] = BackupList(name,
|
|
repository,
|
|
source,
|
|
password,
|
|
init,
|
|
exclude,
|
|
rest,
|
|
rest_domain,
|
|
rest_port,
|
|
rest_user,
|
|
rest_password,
|
|
rest_folder)
|
|
|
|
#####################
|
|
# Last Key
|
|
#####################
|
|
@staticmethod
|
|
def last_key():
|
|
""" Create last key """
|
|
last_key = list(dict.keys(backups))[-1]
|
|
return last_key
|
|
|
|
#####################
|
|
# Update
|
|
#####################
|
|
@staticmethod
|
|
def update(key, **kwargs):
|
|
""" Update an object """
|
|
# print(kwargs)
|
|
for i, k in kwargs.items():
|
|
|
|
# print(i, '=', k)
|
|
# print("I", i)
|
|
match i:
|
|
case 'name':
|
|
backups[key].name = str(k)
|
|
backups.update()
|
|
case 'repository':
|
|
backups[key].repository = str(k)
|
|
backups.update()
|
|
case 'source':
|
|
backups[key].source = str(k)
|
|
backups.update()
|
|
case 'password':
|
|
backups[key].password = str(k)
|
|
backups.update()
|
|
case 'init':
|
|
backups[key].init = str(k)
|
|
backups.update()
|
|
case 'exclude':
|
|
backups[key].exclude = str(k)
|
|
backups.update()
|
|
case 'rest':
|
|
backups[key].rest = str(k)
|
|
backups.update()
|
|
case 'rest_domain':
|
|
backups[key].rest_domain = str(k)
|
|
backups.update()
|
|
case 'rest_port':
|
|
backups[key].rest_port = str(k)
|
|
backups.update()
|
|
case 'rest_user':
|
|
backups[key].rest_user = str(k)
|
|
backups.update()
|
|
case 'rest_password':
|
|
backups[key].rest_password = str(k)
|
|
backups.update()
|
|
case 'rest_folder':
|
|
backups[key].rest_folder = str(k)
|
|
backups.update()
|
|
case _:
|
|
print("Error")
|
|
|
|
|
|
# create an empty dict
|
|
backups = {}
|
|
|
|
# Load data from filesystem
|
|
BackupList.load_json()
|
|
|
|
|
|
#####################
|
|
# SnapshotID()
|
|
######################
|
|
@dataclass
|
|
class SnapshotID:
|
|
"""
|
|
Structure of snapshot_id
|
|
[4, 'f00c870b', '5de67304']
|
|
4 = actual row (row)
|
|
f00c870b = ID Repository
|
|
5de67304 = ID Repository
|
|
ID Repositories are variable!
|
|
"""
|
|
id1: str
|
|
|
|
# def __init__(self, id1):
|
|
# """ Snapshot ID from input in restic_ls2 """
|
|
# self.id1 = id1
|
|
|
|
@staticmethod
|
|
def id_remove():
|
|
""" If the list snapshot_id is not exactly 1, then delete! """
|
|
|
|
if snapshot_id != ['']:
|
|
|
|
for count, value in enumerate(snapshot_id):
|
|
snapshot_id.remove(snapshot_id[count])
|
|
|
|
@staticmethod
|
|
def id_append(result):
|
|
|
|
"""
|
|
What do we do here? We get the ID's of the repositories
|
|
and save them in a list.
|
|
|
|
Why?
|
|
I want to store the ID's in a list to be able to query them
|
|
with the List Snapshot function to see if they exist.
|
|
|
|
Save current row in snapshot_id[0].
|
|
snapshot_id[0] = mainWin.listWidget.currentRow() # removed for testing!!
|
|
|
|
We split the result into individual lines.
|
|
Result is passed and a list is generated
|
|
"""
|
|
|
|
result_list = result.split('\n')
|
|
|
|
# We loop through the list and output the ID's and store in snapshot_id
|
|
for count, value in enumerate(result_list):
|
|
# Save data only if it comes from a row with snapshot ID.
|
|
# But this causes problems if you have snapshots with different source entries!?!?
|
|
# if backups[mainWin.listWidget.currentRow()].source in value:
|
|
snapshot_id.append(value[0:8])
|
|
|
|
|
|
# We create the list, with an empty entry [0].
|
|
snapshot_id = []
|
|
snapshot_id.append("")
|
|
|
|
snap_id = SnapshotID("0")
|
|
|
|
|
|
###############################################
|
|
# Main loop
|
|
###############################################
|
|
|
|
def main(): # PyWebIO application function
|
|
###############################################
|
|
# Functions
|
|
###############################################
|
|
|
|
def clear_scope():
|
|
for count, value in enumerate(clear_list):
|
|
clear(value)
|
|
|
|
def clear_scope_control():
|
|
for count, value in enumerate(clear_list_control):
|
|
clear(value)
|
|
|
|
#####################
|
|
# About
|
|
#####################
|
|
@use_scope('about')
|
|
def about():
|
|
|
|
#####################
|
|
# Version
|
|
#####################
|
|
@use_scope('about', clear=False)
|
|
def version():
|
|
|
|
with use_scope('about'):
|
|
|
|
about()
|
|
|
|
result = None
|
|
try:
|
|
args = ['restic', 'version']
|
|
|
|
result = subprocess.run(args,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, position=-1, scope='about')
|
|
|
|
else:
|
|
pass
|
|
|
|
# we build header and tdata for table
|
|
tab_version = []
|
|
tab_version.append([put_markdown(r''' # Restic Version
|
|
''')])
|
|
tab_version.append([put_text(result.stdout)])
|
|
|
|
put_table(tab_version, position=-1, scope='about')
|
|
|
|
return False
|
|
|
|
#####################
|
|
# About / Settings
|
|
#####################
|
|
|
|
@use_scope("about", clear=False)
|
|
def settings_tab():
|
|
|
|
with use_scope('about'):
|
|
|
|
about()
|
|
|
|
SettingsLoad.load_json()
|
|
scroll_to("about", position='top')
|
|
|
|
#####################
|
|
# Input checks
|
|
#####################
|
|
|
|
def check(value): # return None when the check passes, otherwise return the error message
|
|
if value > 100 or value < 1:
|
|
return 'Value must between 1 - 100!!'
|
|
|
|
def check2(value): # return None when the check passes, otherwise return the error message
|
|
if value > 100 or value < 1:
|
|
return 'Value must between 1 - 100!!'
|
|
|
|
def check3(value): # return None when the check passes, otherwise return the error message
|
|
theme_list = ['dark', 'sketchy', 'minty', 'yeti']
|
|
if value not in theme_list:
|
|
return "This theme is not valid"
|
|
|
|
#####################
|
|
# Input group settings
|
|
#####################
|
|
|
|
# edit settings data
|
|
tab_settings = input_group("Edit Settings", [
|
|
input('Project*', name='project', type=TEXT, required=True,
|
|
readonly=True, value=settings["project"]),
|
|
input('Version*', name='version', type=TEXT, required=True,
|
|
readonly=True, value=settings["version"]),
|
|
input('Settings Path*', name='settings_path', type=TEXT, required=True,
|
|
readonly=True, value=settings["settings_path"]),
|
|
input('JSON File*', name='json_file', help_text="Please restart app if changed!", required=True,
|
|
value=settings["json_file"]),
|
|
input('Theme', type=TEXT, name='theme',
|
|
help_text='dark sketchy minty yeti (Please restart app if changed!)',
|
|
value=settings["theme"],
|
|
validate=check3),
|
|
input('Keep Last', type=NUMBER, name='keep_last', help_text='Value 1 - 100',
|
|
value=settings["keep_last"], validate=check2),
|
|
input('Keep Monthly', type=NUMBER, name='keep_monthly', help_text='Value 1 - 100',
|
|
value=settings["keep_monthly"], validate=check2),
|
|
input('Check', type=NUMBER, name='check', help_text='Value 1 - 100', value=settings["check"],
|
|
validate=check)
|
|
], cancelable=True)
|
|
|
|
# Cancel edit settings
|
|
if not tab_settings:
|
|
about()
|
|
scroll_to("about", position='bottom')
|
|
return
|
|
|
|
try:
|
|
with open(settings["settings_path"], "w", encoding="utf-8") as write_file:
|
|
json.dump(tab_settings, write_file)
|
|
|
|
put_info('File saved successful')
|
|
|
|
except OSError:
|
|
put_error("File Error")
|
|
|
|
scroll_to("about", position='bottom')
|
|
|
|
return False
|
|
|
|
clear_scope()
|
|
|
|
put_text('')
|
|
put_text('')
|
|
|
|
put_buttons(['Restic Version', 'Settings'], scope='about', onclick=[version, settings_tab])
|
|
|
|
put_markdown(f"""
|
|
version: {settings['version']}
|
|
""")
|
|
|
|
put_markdown(r"""
|
|
|
|
Gitlab -> https://gitlab.com/Bullet64/restic_ui_pywebio
|
|
|
|
Documentation ->
|
|
|
|
Forum: -> https://linux-nerds.org/category/62/pywebio
|
|
|
|
----------------
|
|
|
|
This project is not related to https://restic.net
|
|
|
|
|
|
**Thanks for this nice tool!**
|
|
|
|
Logo is © https://restic.net
|
|
""")
|
|
|
|
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>')
|
|
|
|
#####################
|
|
# Backup
|
|
#####################
|
|
@use_scope('backup')
|
|
def backup():
|
|
clear_scope()
|
|
BackupList.load_json()
|
|
|
|
# we build header and tdata for table
|
|
tab_init = []
|
|
|
|
# Print data from backups
|
|
for count, value in enumerate(backups):
|
|
# print("Control", count, backups[value].name)
|
|
|
|
if count == 0:
|
|
tab_init.append(['No.', 'Backup name of the restic data backup', 'Actions'])
|
|
|
|
if backups[value].init == "1":
|
|
button_list = {"label": 'Init', "value": 'Init', "color": 'primary', "disabled": True}
|
|
tab_init.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list],
|
|
onclick=partial(actions, count + 1))
|
|
])
|
|
else:
|
|
button_list = {'label': 'Init', 'value': 'Init', 'color': 'primary', "disabled": False}
|
|
tab_init.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list],
|
|
onclick=partial(actions, count + 1))
|
|
])
|
|
|
|
# we build header and tdata for table
|
|
tab_backup = []
|
|
|
|
for count, value in enumerate(backups):
|
|
if count == 0:
|
|
tab_backup.append(['No.', 'Backup name of the restic data backup', 'Actions'])
|
|
|
|
if backups[value].init == "0":
|
|
button_list1 = {"label": 'Backup', "value": 'Backup', "color": 'primary', "disabled": True}
|
|
button_list2 = {"label": 'Prune', "value": 'Prune', "color": 'primary', "disabled": True}
|
|
tab_backup.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2],
|
|
onclick=partial(actions, count + 1))
|
|
])
|
|
else:
|
|
button_list1 = {"label": 'Backup', "value": 'Backup', "color": 'primary', "disabled": False}
|
|
button_list2 = {"label": 'Prune', "value": 'Prune', "color": 'primary', "disabled": False}
|
|
tab_backup.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2],
|
|
onclick=partial(actions, count + 1))
|
|
])
|
|
|
|
# we build header and tdata for table
|
|
tab_mount = []
|
|
|
|
for count, value in enumerate(backups):
|
|
if count == 0:
|
|
tab_mount.append(['No.', 'Backup name of the restic data backup', 'Actions'])
|
|
|
|
if backups[value].init == "0":
|
|
button_list1 = {"label": 'Mount', "value": 'Mount', "color": 'primary', "disabled": True}
|
|
button_list2 = {"label": 'UMount', "value": 'UMount', "color": 'primary', "disabled": True}
|
|
button_list3 = {"label": 'Restore', "value": 'Restore', "color": 'primary', "disabled": True}
|
|
tab_mount.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2,
|
|
button_list3],
|
|
onclick=partial(actions, count + 1))
|
|
])
|
|
else:
|
|
button_list1 = {"label": 'Mount', "value": 'Mount', "color": 'primary', "disabled": False}
|
|
button_list2 = {"label": 'UMount', "value": 'UMount', "color": 'primary', "disabled": False}
|
|
button_list3 = {"label": 'Restore', "value": 'Restore', "color": 'primary', "disabled": False}
|
|
tab_mount.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2,
|
|
button_list3],
|
|
onclick=partial(actions, count + 1))
|
|
])
|
|
|
|
# we build header and tdata for table
|
|
tab_edit = []
|
|
|
|
for count, value in enumerate(backups):
|
|
if count == 0:
|
|
tab_edit.append(['No.', 'Backup name of the restic data backup', 'Actions'])
|
|
button_list1 = {"label": 'Edit', "value": 'Edit', "color": 'primary', "disabled": False}
|
|
button_list2 = {"label": 'Delete', "value": 'Delete', "color": 'danger', "disabled": False}
|
|
tab_edit.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2],
|
|
onclick=partial(actions, count + 1))
|
|
])
|
|
|
|
put_tabs([
|
|
{'title': 'Backup', 'content': [
|
|
put_table(tab_backup),
|
|
]},
|
|
{'title': 'Init', 'content': [
|
|
put_table(tab_init),
|
|
]},
|
|
{'title': 'Mount', 'content': [
|
|
put_table(tab_mount),
|
|
]},
|
|
{'title': 'Edit', 'content': [
|
|
put_table(tab_edit),
|
|
]},
|
|
])
|
|
|
|
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
|
|
|
|
#####################
|
|
# Migrate
|
|
#####################
|
|
@use_scope('migrate', position=-1)
|
|
def migrate():
|
|
clear_scope()
|
|
|
|
SettingsLoad.load_json()
|
|
|
|
# we build header and tdata for table
|
|
tab_migrate = []
|
|
|
|
for count, value in enumerate(backups):
|
|
if count == 0:
|
|
tab_migrate.append(['No.', 'Backup name of the restic data backup', 'Actions'])
|
|
|
|
if backups[value].init == "0":
|
|
button_list1 = {"label": 'Upgrade -> v2', "value": 'Update', "color": 'primary', "disabled": True}
|
|
tab_migrate.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1],
|
|
onclick=partial(actions3, count + 1))
|
|
])
|
|
else:
|
|
button_list1 = {"label": 'Upgrade -> v2', "value": 'Update', "color": 'primary', "disabled": False}
|
|
tab_migrate.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1],
|
|
onclick=partial(actions3, count + 1))
|
|
])
|
|
|
|
put_tabs([
|
|
{'title': 'Migrate from repo version V1 to V2', 'content': [
|
|
put_table(tab_migrate),
|
|
]},
|
|
])
|
|
|
|
# put_text("© Frank Mankel 2022", position=-1).style('text-align:center')
|
|
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
|
|
|
|
#####################
|
|
# Tools
|
|
#####################
|
|
@use_scope('tools')
|
|
def tools():
|
|
|
|
clear_scope()
|
|
|
|
SettingsLoad.load_json()
|
|
|
|
# we build header and tdata for table
|
|
tab_snapshot = []
|
|
|
|
for count, value in enumerate(backups):
|
|
if count == 0:
|
|
tab_snapshot.append(['No.', 'Backup name of the restic data backup', 'Actions'])
|
|
|
|
if backups[value].init == "0":
|
|
button_list1 = {"label": 'Snapshots', "value": 'Snapshots', "color": 'primary', "disabled": True}
|
|
button_list2 = {"label": 'List Snapshot', "value": 'List Snapshot', "color": 'primary',
|
|
"disabled": True}
|
|
tab_snapshot.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2],
|
|
onclick=partial(actions2, count + 1))
|
|
])
|
|
else:
|
|
button_list1 = {"label": 'Snapshots', "value": 'Snapshots', "color": 'primary', "disabled": False}
|
|
button_list2 = {"label": 'List Snapshot', "value": 'List Snapshot', "color": 'primary',
|
|
"disabled": False}
|
|
tab_snapshot.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2],
|
|
onclick=partial(actions2, count + 1))
|
|
])
|
|
|
|
# we build header and tdata for table
|
|
tab_check = []
|
|
|
|
for count, value in enumerate(backups):
|
|
if count == 0:
|
|
tab_check.append(['No.', 'Backup name of the restic data backup', 'Actions'])
|
|
|
|
if backups[value].init == "0":
|
|
button_list1 = {"label": 'Check', "value": 'Check', "color": 'primary', "disabled": True}
|
|
button_list2 = {"label": 'Stats', "value": 'Stats', "color": 'primary', "disabled": True}
|
|
button_list3 = {"label": 'Cache', "value": 'Cache', "color": 'primary', "disabled": True}
|
|
button_list4 = {"label": 'Unlock', "value": 'Unlock', "color": 'primary', "disabled": True}
|
|
tab_check.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2,
|
|
button_list3,
|
|
button_list4],
|
|
onclick=partial(actions2, count + 1))
|
|
])
|
|
else:
|
|
button_list1 = {"label": 'Check', "value": 'Check', "color": 'primary', "disabled": False}
|
|
button_list2 = {"label": 'Stats', "value": 'Stats', "color": 'primary', "disabled": False}
|
|
button_list3 = {"label": 'Cache', "value": 'Cache', "color": 'primary', "disabled": False}
|
|
button_list4 = {"label": 'Unlock', "value": 'Unlock', "color": 'primary', "disabled": False}
|
|
tab_check.append([count + 1,
|
|
backups[count].name,
|
|
put_buttons([
|
|
button_list1,
|
|
button_list2,
|
|
button_list3,
|
|
button_list4],
|
|
onclick=partial(actions2, count + 1))
|
|
])
|
|
|
|
put_tabs([
|
|
{'title': 'Snapshots', 'content': [
|
|
put_table(tab_snapshot),
|
|
]},
|
|
{'title': 'More Tools', 'content': [
|
|
put_table(tab_check),
|
|
]},
|
|
])
|
|
|
|
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
|
|
|
|
#####################
|
|
# Add backup
|
|
#####################
|
|
@use_scope('file')
|
|
def add_backup():
|
|
|
|
clear_scope()
|
|
|
|
rest_backup = radio(label='REST-Server', options=['Yes', 'No'], inline=True, value='No')
|
|
|
|
#####################
|
|
# checks for input
|
|
#####################
|
|
|
|
def check_repository(repo):
|
|
|
|
path = Path(repo)
|
|
if path.exists():
|
|
is_empty = not any(Path(path).iterdir())
|
|
if not is_empty:
|
|
return 'The directory is not empty, please choose another directory!'
|
|
else:
|
|
path.mkdir()
|
|
toast("Repo path created", duration=2, position='center', color='info')
|
|
|
|
def check_source(source):
|
|
|
|
path = Path(source)
|
|
if path.exists():
|
|
pass
|
|
else:
|
|
return 'Source path does not exist'
|
|
|
|
with use_scope('main_menu'):
|
|
|
|
#####################
|
|
# No REST backup
|
|
#####################
|
|
|
|
match rest_backup:
|
|
|
|
case 'No':
|
|
with use_scope('require'):
|
|
put_text('')
|
|
put_text('')
|
|
put_text('* Required field', scope='require')
|
|
|
|
# input data
|
|
add_backup = input_group("Add Backup", [
|
|
input('Backup Name*', name='name', type=TEXT, required=True),
|
|
input('Repository*', name='repository', type=TEXT, required=True, value=f'{USERHOME}/repo_1',
|
|
validate=check_repository),
|
|
input('Source*', name='source', type=TEXT, required=True, value=f'{USERHOME}/source_1',
|
|
validate=check_source),
|
|
input('Password', name='password', type=PASSWORD),
|
|
input('Exclude List', type=TEXT, name='exclude')
|
|
], cancelable=True)
|
|
|
|
if not add_backup:
|
|
clear('require')
|
|
file()
|
|
return False
|
|
|
|
# control output
|
|
clear('require')
|
|
|
|
BackupList.add(add_backup['name'],
|
|
add_backup['repository'],
|
|
add_backup['source'],
|
|
add_backup['password'],
|
|
"0",
|
|
add_backup['exclude'],
|
|
"0",
|
|
"",
|
|
"",
|
|
"",
|
|
"",
|
|
"")
|
|
|
|
BackupList.save_json()
|
|
|
|
with use_scope('file'):
|
|
try:
|
|
BackupList.load_json()
|
|
except OSError:
|
|
put_error("Error")
|
|
else:
|
|
file()
|
|
put_info("File saved", position=-1, scope='file')
|
|
return False
|
|
|
|
#####################
|
|
# REST-Backup
|
|
#####################
|
|
case 'Yes':
|
|
with use_scope('require'):
|
|
put_text('')
|
|
put_text('')
|
|
put_text('* Required field', scope='require')
|
|
|
|
# input data
|
|
add_backup = input_group("Add REST-Backup", [
|
|
input('Backup Name*', name='name', type=TEXT, required=True),
|
|
input('Source*', name='source', type=TEXT, required=True, value=f'{USERHOME}/source_1',
|
|
validate=check_source),
|
|
input('Password', name='password', type=PASSWORD),
|
|
input('Exclude List', name='exclude', type=TEXT),
|
|
input('REST Domain*', name='rest_domain', type=URL, required=True),
|
|
input('REST Port*', name='rest_port', type=NUMBER, required=True),
|
|
input('REST User*', name='rest_user', type=TEXT, required=True),
|
|
input('REST Password*', name='rest_password', type=PASSWORD, required=True),
|
|
input('REST Folder*', name='rest_folder', type=TEXT, required=True)
|
|
], cancelable=True)
|
|
|
|
if not add_backup:
|
|
clear('require')
|
|
file()
|
|
return False
|
|
|
|
# control output
|
|
clear('require')
|
|
|
|
BackupList.add(add_backup['name'],
|
|
"",
|
|
add_backup['source'],
|
|
add_backup['password'],
|
|
"0",
|
|
add_backup['exclude'],
|
|
"1",
|
|
add_backup['rest_domain'],
|
|
add_backup['rest_port'],
|
|
add_backup['rest_user'],
|
|
add_backup['rest_password'],
|
|
add_backup['rest_folder'])
|
|
|
|
BackupList.save_json()
|
|
|
|
with use_scope('file'):
|
|
try:
|
|
BackupList.load_json()
|
|
except OSError:
|
|
put_error("Error")
|
|
else:
|
|
file()
|
|
put_info("File saved", position=-1, scope='file')
|
|
return False
|
|
|
|
case _:
|
|
file()
|
|
|
|
#####################
|
|
# Open Repo
|
|
#####################
|
|
def open_repo():
|
|
with use_scope('file'):
|
|
with use_scope('open_repo'):
|
|
clear('load_file')
|
|
clear('open_repo')
|
|
|
|
put_info("Open existing repo.You must provide a valid password for that repo!")
|
|
|
|
# get repo path
|
|
path = popup_input([
|
|
put_input("path", label="Repository Path?"),
|
|
], 'Open Repo')
|
|
|
|
if path is None:
|
|
clear('open_repo')
|
|
return
|
|
|
|
# Test if passed value is also an existing repo
|
|
if not path['path']:
|
|
put_error("Empty string. Open Repo function aborted")
|
|
return
|
|
|
|
if path:
|
|
config_path = Path(path['path']+'/config')
|
|
repo_path = Path(path['path'])
|
|
if config_path.exists():
|
|
pass
|
|
else:
|
|
put_error('Source path does not contain an restic repo')
|
|
return
|
|
|
|
# got passwd for repo
|
|
pass_word = popup_input([
|
|
put_input("pass_word", label="Please enter PW", type=PASSWORD),
|
|
], 'Password')
|
|
|
|
if not pass_word:
|
|
clear('open_repo')
|
|
return False
|
|
|
|
if not pass_word['pass_word']:
|
|
put_error("Empty string. Open Repo function aborted")
|
|
return
|
|
|
|
with put_loading():
|
|
# check if repo in path is valid
|
|
try:
|
|
args = ['restic',
|
|
'-r',
|
|
repo_path,
|
|
'snapshots']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['pass_word'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
# Process don't successful, send signal
|
|
put_error(error.stderr)
|
|
return False
|
|
|
|
else:
|
|
# Process successful, send signal
|
|
put_info('Please add the source path manually')
|
|
put_info(result.stdout)
|
|
|
|
finally:
|
|
pass
|
|
|
|
BackupList.add("Open_Repo",
|
|
str(repo_path),
|
|
"",
|
|
pass_word['pass_word'],
|
|
"1",
|
|
"",
|
|
"0",
|
|
"",
|
|
"",
|
|
"",
|
|
"",
|
|
"")
|
|
|
|
# We save the entry to the JSON
|
|
BackupList.save_json()
|
|
|
|
# load JSON
|
|
BackupList.load_json()
|
|
|
|
pass_word = {'password': ''}
|
|
|
|
#####################
|
|
# File
|
|
#####################
|
|
@use_scope('file')
|
|
def file():
|
|
|
|
clear_scope()
|
|
|
|
put_buttons(['Open Repo', 'Add Backup'], position=0, onclick=[open_repo, add_backup])
|
|
|
|
put_html('<center><a href="https://linux-nerds.org">© linux-nerds.org</a></center>', position=-1)
|
|
|
|
#####################
|
|
# Actions **control**
|
|
#####################
|
|
@use_scope('control')
|
|
def actions(line, action):
|
|
|
|
match action:
|
|
|
|
#####################
|
|
# Backup
|
|
#####################
|
|
case 'Backup':
|
|
|
|
clear_scope_control()
|
|
|
|
put_info(f"Backup from {backups[line - 1].name}")
|
|
|
|
pass_word = ""
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
# print(pass_word['password'])
|
|
|
|
if pass_word is None:
|
|
return
|
|
|
|
with put_loading():
|
|
try:
|
|
# REST == 0
|
|
if backups[line - 1].rest == "0":
|
|
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
if backups[line - 1].exclude == "":
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'backup',
|
|
backups[line - 1].source]
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'backup',
|
|
backups[line - 1].source,
|
|
f'--exclude-file={backups[line - 1].exclude}']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
if backups[line - 1].exclude == "":
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'backup',
|
|
backups[line - 1].source]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'backup',
|
|
backups[line - 1].source,
|
|
f'--exclude-file={backups[line - 1].exclude}']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
# BACKUP for REST == 1
|
|
if not backups[line - 1].password:
|
|
|
|
if backups[line - 1].exclude == "":
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'backup',
|
|
backups[line - 1].source]
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'backup',
|
|
backups[line - 1].source,
|
|
f'--exclude-file={backups[line - 1].exclude}']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
if backups[line - 1].exclude == "":
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'backup',
|
|
backups[line - 1].source]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'backup',
|
|
backups[line - 1].source,
|
|
f'--exclude-file={backups[line - 1].exclude}']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control')
|
|
|
|
else:
|
|
put_info(result.stdout, position=-1, scope='control')
|
|
toast("Backup successful created", duration=4, position='center', color='success')
|
|
|
|
#####################
|
|
# Prune
|
|
#####################
|
|
case 'Prune':
|
|
clear_scope_control()
|
|
|
|
put_info(f"Prune from {backups[line - 1].name}")
|
|
|
|
pass_word = ""
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
# print(pass_word['password'])
|
|
|
|
if pass_word is None:
|
|
return
|
|
|
|
try:
|
|
with put_loading():
|
|
# restic function prune
|
|
if backups[line - 1].rest == '0':
|
|
|
|
if not backups[line - 1].password:
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'forget',
|
|
'--keep-last',
|
|
str(settings['keep_last']),
|
|
'--keep-monthly',
|
|
str(settings['keep_monthly']),
|
|
'--prune']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'forget',
|
|
'--keep-last',
|
|
str(settings['keep_last']),
|
|
'--keep-monthly',
|
|
str(settings['keep_monthly']),
|
|
'--prune']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'forget',
|
|
'--keep-last',
|
|
str(settings['keep_last']),
|
|
'--keep-monthly',
|
|
str(settings['keep_monthly']),
|
|
'--prune']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'forget',
|
|
'--keep-last',
|
|
str(settings['keep_last']),
|
|
'--keep-monthly',
|
|
str(settings['keep_monthly']),
|
|
'--prune']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control')
|
|
|
|
else:
|
|
put_info(result.stdout, scope='control')
|
|
toast("Process Prune successfully completed", duration=4, position='center', color='success')
|
|
|
|
#####################
|
|
# Edit
|
|
#####################
|
|
|
|
case 'Edit':
|
|
|
|
clear_scope_control()
|
|
|
|
#####################
|
|
# checks for input
|
|
#####################
|
|
|
|
def check_source(source):
|
|
|
|
path = Path(source)
|
|
if path.exists():
|
|
pass
|
|
else:
|
|
return 'Source path does not exist'
|
|
|
|
with use_scope('edit'):
|
|
clear('backup')
|
|
clear('control')
|
|
|
|
match backups[line - 1].rest:
|
|
|
|
case '0':
|
|
# input data
|
|
edit_backup = input_group("Edit Backup", [
|
|
input('Backup Name*', name='backup_name', type=TEXT, required=True,
|
|
value=backups[line - 1].name),
|
|
input('Repository*', name='repository', type=TEXT, required=True,
|
|
value=backups[line - 1].repository),
|
|
input('Source*', name='source', type=TEXT, required=True,
|
|
value=backups[line - 1].source, validate=check_source),
|
|
input('Password', name='password', value=backups[line - 1].password),
|
|
input('Exclude List', type=TEXT, name='exclude_list', value=backups[line - 1].exclude)
|
|
], cancelable=True)
|
|
|
|
# Cancel Input Group
|
|
if not edit_backup:
|
|
backup()
|
|
return
|
|
|
|
BackupList.update((line - 1), name=edit_backup['backup_name'],
|
|
repository=edit_backup['repository'],
|
|
source=edit_backup['source'],
|
|
password=edit_backup['password'],
|
|
exclude=edit_backup['exclude_list'])
|
|
|
|
BackupList.save_json()
|
|
|
|
backup()
|
|
|
|
case '1':
|
|
|
|
clear('backup')
|
|
|
|
# input data
|
|
edit_backup = input_group("Edit REST-Backup", [
|
|
input('Backup Name*', name='backup_name', type=TEXT, required=True,
|
|
value=backups[line - 1].name),
|
|
input('Source*', name='source', type=TEXT, required=True,
|
|
value=backups[line - 1].source, validate=check_source),
|
|
input('Password', name='password', value=backups[line - 1].password),
|
|
input('Exclude List', name='exclude_list', type=TEXT, value=backups[line - 1].exclude),
|
|
input('REST Domain*', name='rest_domain', type=TEXT, required=True,
|
|
value=backups[line - 1].rest_domain),
|
|
input('REST Port*', name='rest_port', type=NUMBER, required=True,
|
|
value=backups[line - 1].rest_port),
|
|
input('REST User*', name='rest_user', type=TEXT, required=True,
|
|
value=backups[line - 1].rest_user),
|
|
input('REST Password*', name='rest_password', required=True,
|
|
value=backups[line - 1].rest_password),
|
|
input('REST Folder*', name='rest_folder', type=TEXT, required=True,
|
|
value=backups[line - 1].rest_folder)
|
|
], cancelable=True)
|
|
|
|
# Cancel Input Group
|
|
if not edit_backup:
|
|
backup()
|
|
return
|
|
|
|
BackupList.update((line - 1), name=edit_backup['backup_name'],
|
|
source=edit_backup['source'],
|
|
password=edit_backup['password'],
|
|
exclude=edit_backup['exclude_list'],
|
|
rest_domain=edit_backup['rest_domain'],
|
|
rest_port=edit_backup['rest_port'],
|
|
rest_user=edit_backup['rest_user'],
|
|
rest_password=edit_backup['rest_password'],
|
|
rest_folder=edit_backup['rest_folder'])
|
|
|
|
BackupList.save_json()
|
|
|
|
backup()
|
|
|
|
case _:
|
|
put_row(put_buttons(['Backup', 'File', 'Tools', 'Migrate', 'About'], group=True,
|
|
onclick=[backup, file, tools, migrate, about]))
|
|
backup()
|
|
|
|
#####################
|
|
# Delete
|
|
#####################
|
|
|
|
case 'Delete':
|
|
# Delete entry?
|
|
|
|
if not backups[line - 1].source:
|
|
backups[line - 1].source = 'EMPTY'
|
|
|
|
form_delete = popup_input([
|
|
put_info(f"We don't delete your repository data. Please delete the data yourself."
|
|
f" Repo-Path is {backups[line - 1].source}"),
|
|
], "Delete Entry?")
|
|
|
|
match form_delete:
|
|
case 'None':
|
|
backup()
|
|
|
|
case {}:
|
|
# delete entries
|
|
BackupList.delete((line - 1))
|
|
|
|
# save json
|
|
BackupList.save_json()
|
|
|
|
# clear dict
|
|
backups.clear()
|
|
|
|
# Reload Tab Backup
|
|
backup()
|
|
|
|
case _:
|
|
pass
|
|
|
|
#####################
|
|
# Mount
|
|
#####################
|
|
|
|
case 'Mount':
|
|
clear_scope_control()
|
|
|
|
put_info(f"Mount from {backups[line - 1].name}")
|
|
|
|
pass_word = {'password': ""}
|
|
|
|
mount_path = popup_input([
|
|
put_input("mount_path", label="Where to mount the repo?"),
|
|
])
|
|
|
|
if mount_path is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
if not mount_path['mount_path']:
|
|
put_error("You have not specified a mount folder. Mount function aborted")
|
|
return
|
|
|
|
m_path[0] = mount_path['mount_path']
|
|
|
|
path = Path(m_path[0])
|
|
if path.exists():
|
|
put_error("You have specified a existing mount path. Mount function aborted")
|
|
return
|
|
|
|
path.mkdir()
|
|
put_info(f"Successful created mount folder {mount_path['mount_path']} !")
|
|
|
|
# if no password is in backups
|
|
if not backups[line - 1].password:
|
|
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
# print(pass_word['password'])
|
|
|
|
if pass_word is None:
|
|
return
|
|
|
|
with put_loading():
|
|
|
|
try:
|
|
if backups[line - 1].rest == "0":
|
|
|
|
if not backups[line - 1].password:
|
|
# Create cmd
|
|
cmd = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'mount',
|
|
m_path[0]]
|
|
|
|
# We create the object, here with stdin and open the asynchronous subprocess
|
|
process = subprocess.Popen(
|
|
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
process.stdin.write(pass_word['password'].encode())
|
|
process.stdin.close()
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
# if password is in backups
|
|
else:
|
|
# Create cmd
|
|
cmd = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'mount',
|
|
m_path[0]]
|
|
|
|
pwd = backups[line - 1].password.encode()
|
|
|
|
# We create the object, here with stdin and open the asynchronous subprocess
|
|
process = subprocess.Popen(
|
|
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
process.stdin.write(pwd)
|
|
process.stdin.close()
|
|
|
|
else:
|
|
# if no password is in backups
|
|
if not backups[line - 1].password:
|
|
|
|
# Create cmd
|
|
cmd = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'mount',
|
|
m_path[0]]
|
|
|
|
# We create the object, here with stdin and open the asynchronous subprocess
|
|
process = subprocess.Popen(
|
|
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
process.stdin.write(pass_word['password'].encode())
|
|
process.stdin.close()
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
# if password is in backups
|
|
else:
|
|
# Create cmd
|
|
cmd = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'mount',
|
|
m_path[0]]
|
|
|
|
pwd = backups[line - 1].password.encode()
|
|
|
|
# We create the object, here with stdin and open the asynchronous subprocess
|
|
process = subprocess.Popen(
|
|
cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
process.stdin.write(pwd)
|
|
process.stdin.close()
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
print(error)
|
|
|
|
finally:
|
|
# we give some time for our subprocess.Popen
|
|
time.sleep(1.0)
|
|
|
|
try:
|
|
if process.poll() == 1:
|
|
raise Exception(
|
|
"The Restic mount command was not successful.\n"
|
|
"Probable cause: wrong password or no key found")
|
|
|
|
try:
|
|
process = subprocess.run(['ls',
|
|
'-lha',
|
|
m_path[0]],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
|
|
output = process.stdout
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error("Error while fetching the directory contents")
|
|
print(error)
|
|
|
|
else:
|
|
put_info(output)
|
|
toast("Mount point successful created", duration=4, position='center',
|
|
color='success')
|
|
|
|
except Exception as error:
|
|
put_error(error)
|
|
|
|
#####################
|
|
# UMount
|
|
#####################
|
|
case 'UMount':
|
|
clear_scope_control()
|
|
|
|
put_info(f"UMount from {backups[line - 1].name}")
|
|
|
|
# Mount Path comes from m_path[0] MountPath-Class
|
|
|
|
if not type(m_path[0]) == str or m_path[0] == "":
|
|
put_error("No mount path available, you must first mount an repository!")
|
|
return
|
|
|
|
if backups[line - 1].init == "0":
|
|
put_error("Status INIT = 0, please initialize first!")
|
|
|
|
else:
|
|
try:
|
|
# Create cmd
|
|
cmd = ['umount', m_path[0]]
|
|
subprocess.run(cmd,
|
|
check=True,
|
|
stdin=subprocess.PIPE)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error)
|
|
|
|
else:
|
|
put_info("UMount successful!")
|
|
toast("UMount successful", duration=4, position='center', color='success')
|
|
|
|
# After umount, we will delete mount folder!
|
|
try:
|
|
# Delete mount folder
|
|
result = subprocess.run(['rm',
|
|
'-rf',
|
|
m_path[0]],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error)
|
|
|
|
else:
|
|
put_info(f"Mount Folder {m_path[0]} deleted")
|
|
|
|
finally:
|
|
m_path[0] = ""
|
|
|
|
#####################
|
|
# Restore
|
|
#####################
|
|
case 'Restore':
|
|
clear_scope_control()
|
|
|
|
put_info(f"Restore from {backups[line - 1].name}")
|
|
|
|
# Ask for restore path
|
|
restore_path = popup_input([
|
|
put_input("restore_path", label="Where to restore the repo?"),
|
|
])
|
|
|
|
# check return from restore_path
|
|
if restore_path is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
if not restore_path['restore_path']:
|
|
put_error("You have not specified a restore folder. Restore function aborted")
|
|
return
|
|
|
|
# Check path and create
|
|
path = Path(restore_path['restore_path'])
|
|
|
|
if path.exists():
|
|
put_error("You have specified a existing restore path. Restore function aborted")
|
|
return
|
|
|
|
path.mkdir()
|
|
put_info(f"Successful created mount folder in {restore_path['restore_path']} !")
|
|
|
|
result = False
|
|
# Ask for snap_id
|
|
restore_id = popup_input([
|
|
put_input("id", label="Snapshot ID"),
|
|
put_info(f"If you do not enter one, the last snapshot from {backups[line - 1].name} will be used."),
|
|
], "Please enter the snapshot ID")
|
|
# print(restore_id)
|
|
|
|
if restore_id is None:
|
|
# print("ID cancel")
|
|
clear_scope_control()
|
|
return
|
|
|
|
if restore_id == {'id': ''}:
|
|
# print("Empty ID")
|
|
snap_id.id1 = 'latest'
|
|
result = True
|
|
|
|
else:
|
|
# We loop through the list and output the ID's
|
|
for count, value in enumerate(snapshot_id):
|
|
if restore_id['id'] == snapshot_id[count]:
|
|
result = True
|
|
snap_id.id1 = restore_id['id']
|
|
|
|
if result is True:
|
|
|
|
try:
|
|
pass_word = {'password': ""}
|
|
|
|
# restic function restore
|
|
if backups[line - 1].rest == "0":
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
# print(pass_word['password'])
|
|
|
|
if pass_word is None:
|
|
return
|
|
|
|
args = ['restic', '-r',
|
|
backups[line - 1].repository,
|
|
'restore',
|
|
snap_id.id1,
|
|
'--target',
|
|
restore_path['restore_path']]
|
|
|
|
subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
args = ['restic', '-r',
|
|
backups[line - 1].repository,
|
|
'restore',
|
|
snap_id.id1,
|
|
'--target',
|
|
restore_path['restore_path']]
|
|
|
|
subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'restore',
|
|
snap_id.id1,
|
|
'--target',
|
|
restore_path['restore_path']]
|
|
|
|
subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'restore',
|
|
snap_id.id1,
|
|
'--target',
|
|
restore_path['restore_path']]
|
|
|
|
subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr)
|
|
|
|
else:
|
|
try:
|
|
process = subprocess.run(['ls',
|
|
'-lha',
|
|
restore_path['restore_path']],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
|
|
output = process.stdout
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error)
|
|
|
|
else:
|
|
put_info(output)
|
|
toast("Restore successful created", duration=4, position='center', color='success')
|
|
|
|
#####################
|
|
# Init
|
|
#####################
|
|
|
|
case 'Init':
|
|
|
|
clear_scope_control()
|
|
|
|
# pass_word = ""
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
# print(pass_word['password'])
|
|
|
|
if pass_word is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
with put_loading():
|
|
|
|
result = None
|
|
pass_word = {'password': ""}
|
|
|
|
try:
|
|
if backups[line - 1].rest == "0":
|
|
# INIT for normal backup
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'init']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'init']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
# INIT for REST backup
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'init']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'init']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
toast(error.stderr, duration=8, color='warn')
|
|
|
|
else:
|
|
|
|
BackupList.update((line - 1), init="1")
|
|
|
|
put_info(result.stdout, scope='control')
|
|
|
|
toast("Init successful created", duration=8, color='success')
|
|
|
|
scroll_to("control", position='top')
|
|
|
|
BackupList.save_json()
|
|
|
|
BackupList.load_json()
|
|
|
|
backup()
|
|
|
|
scroll_to("control", position='bottom')
|
|
|
|
###############################################
|
|
# Function actions **control2**
|
|
###############################################
|
|
@use_scope('control2', clear=True)
|
|
def actions2(line, action):
|
|
|
|
match action:
|
|
|
|
#####################
|
|
# Snapshots
|
|
#####################
|
|
case 'Snapshots':
|
|
|
|
clear_scope_control()
|
|
|
|
put_info(f"Snapshots from {backups[line - 1].name}")
|
|
|
|
pass_word = ""
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
|
|
if pass_word is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
with put_loading():
|
|
try:
|
|
# REST == 0
|
|
if backups[line - 1].rest == "0":
|
|
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'snapshots']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'snapshots']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'snapshots']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'snapshots']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control2')
|
|
|
|
else:
|
|
SnapshotID.id_append(result.stdout)
|
|
put_info(result.stdout, scope='control2')
|
|
toast("Snapshots successful listed", duration=4, position='center', color='success')
|
|
|
|
#####################
|
|
# List Snapshot
|
|
#####################
|
|
case 'List Snapshot':
|
|
|
|
clear_scope_control()
|
|
|
|
put_info(f"List Snapshot from {backups[line - 1].name}")
|
|
|
|
pass_word = ""
|
|
|
|
result = False
|
|
|
|
# print("Snapshot", snapshot_id)
|
|
|
|
snaps_id = popup_input([
|
|
put_input("id", label="Snapshot ID"),
|
|
put_info(f"If you do not enter one, the last snapshot from {backups[line - 1].name} will be used."),
|
|
], "Please enter the snapshot ID")
|
|
# print(ID)
|
|
|
|
if snaps_id is None:
|
|
# print("ID cancel")
|
|
clear_scope_control()
|
|
return
|
|
|
|
if snaps_id == {'id': ''}:
|
|
# print("Empty ID")
|
|
snap_id.id1 = 'latest'
|
|
result = True
|
|
|
|
else:
|
|
# We loop through the list and output the ID's
|
|
for count, value in enumerate(snapshot_id):
|
|
if snaps_id['id'] == snapshot_id[count]:
|
|
result = True
|
|
snap_id.id1 = snaps_id['id']
|
|
|
|
if result is True:
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
|
|
if pass_word is None:
|
|
return
|
|
|
|
with put_loading():
|
|
try:
|
|
if backups[line - 1].rest == "0":
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'ls',
|
|
snap_id.id1]
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'ls',
|
|
snap_id.id1]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'ls',
|
|
snap_id.id1]
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'ls',
|
|
snap_id.id1]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control2')
|
|
|
|
else:
|
|
put_info(result.stdout, scope='control2')
|
|
toast("Snapshot successful listed", duration=4, position='center', color='success')
|
|
SnapshotID.id_remove()
|
|
snap_id.id1 = ""
|
|
|
|
else:
|
|
toast("Wrong <ID>, please enter correct Snapshot <ID>", duration=4,
|
|
position='center', color='error')
|
|
|
|
#####################
|
|
# Check
|
|
#####################
|
|
case 'Check':
|
|
|
|
clear_scope_control()
|
|
|
|
put_info(f"Check from {backups[line - 1].name}")
|
|
|
|
check = settings['check']
|
|
check2 = f"{check}%"
|
|
|
|
pass_word = {'password': ''}
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
|
|
print(pass_word)
|
|
|
|
if pass_word is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
with put_loading():
|
|
try:
|
|
match_list = [backups[line - 1].rest, bool(backups[line - 1].password)]
|
|
|
|
match match_list:
|
|
|
|
# backups[row].rest, bool(backups[row].password)
|
|
case ["0", False]:
|
|
# restic function check
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'check',
|
|
'--read-data-subset',
|
|
check2]
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
case ["0", True]:
|
|
# restic function check
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'check',
|
|
'--read-data-subset',
|
|
check2]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
case ["1", False]:
|
|
# restic function check
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'check',
|
|
'--read-data-subset',
|
|
check2]
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
case ["1", True]:
|
|
# restic function check
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'check',
|
|
'--read-data-subset',
|
|
check2]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
case _:
|
|
put_error("Pattern match not found")
|
|
return False
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control2')
|
|
|
|
else:
|
|
put_info(result.stdout, scope='control2')
|
|
toast("Check successful done", duration=4, position='center', color='success')
|
|
|
|
#####################
|
|
# Stats
|
|
#####################
|
|
case 'Stats':
|
|
|
|
clear_scope_control()
|
|
|
|
put_info(f"Stats from {backups[line - 1].name}")
|
|
|
|
pass_word = ""
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
|
|
if pass_word is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
with put_loading():
|
|
try:
|
|
match_list = [backups[line - 1].rest, bool(backups[line - 1].password)]
|
|
|
|
match match_list:
|
|
|
|
# backups[row].rest, bool(backups[row].password)
|
|
case ["0", False]:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'stats']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
# backups[row].rest, bool(backups[row].password)
|
|
case ["0", True]:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'stats']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# backups[row].rest, bool(backups[row].password)
|
|
case ["1", False]:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f"rest:https://"
|
|
f"{backups[line - 1].rest_user}:"
|
|
f"{backups[line - 1].rest_password}@"
|
|
f"{backups[line - 1].rest_domain}:"
|
|
f"{backups[line - 1].rest_port}/"
|
|
f"{backups[line - 1].rest_folder}",
|
|
'stats']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
# backups[row].rest, bool(backups[row].password)
|
|
case ["1", True]:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f"rest:https://"
|
|
f"{backups[line - 1].rest_user}:"
|
|
f"{backups[line - 1].rest_password}@"
|
|
f"{backups[line - 1].rest_domain}:"
|
|
f"{backups[line - 1].rest_port}/"
|
|
f"{backups[line - 1].rest_folder}",
|
|
'stats']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
case _:
|
|
put_error("Pattern match not found")
|
|
return False
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control2')
|
|
|
|
else:
|
|
put_info(result.stdout, scope='control2')
|
|
toast("Stats successful done", duration=4, position='center', color='success')
|
|
|
|
#####################
|
|
# Cache
|
|
#####################
|
|
case 'Cache':
|
|
|
|
clear_scope_control()
|
|
|
|
put_info(f"Clear Cache from {backups[line - 1].name}")
|
|
|
|
pass_word = ""
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
|
|
if pass_word is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
with put_loading():
|
|
try:
|
|
# restic function check
|
|
if backups[line - 1].rest == "0":
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'cache',
|
|
'--cleanup']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'cache',
|
|
'--cleanup']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'cache',
|
|
'--cleanup']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'cache',
|
|
'--cleanup']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control2')
|
|
|
|
else:
|
|
put_info(result.stdout, scope='control2')
|
|
toast("Stats successful listed", duration=4, position='center', color='success')
|
|
|
|
#####################
|
|
# Unlock
|
|
#####################
|
|
case 'Unlock':
|
|
|
|
clear_scope_control()
|
|
|
|
put_info(f"Unlock from {backups[line - 1].name}")
|
|
|
|
pass_word = ""
|
|
|
|
if not backups[line - 1].password:
|
|
pass_word = popup_input([
|
|
put_input("password", label="Password?", type=PASSWORD),
|
|
])
|
|
|
|
if pass_word is None:
|
|
clear_scope_control()
|
|
return
|
|
|
|
with put_loading():
|
|
try:
|
|
# restic function check
|
|
if backups[line - 1].rest == "0":
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'unlock']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
backups[line - 1].repository,
|
|
'unlock']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
if not backups[line - 1].password:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'unlock']
|
|
|
|
result = subprocess.run(args,
|
|
input=pass_word['password'],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
# Delete the variable from the input
|
|
pass_word = {'password': ''}
|
|
|
|
else:
|
|
# Restic function
|
|
args = ['restic',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}',
|
|
'unlock']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control2')
|
|
|
|
else:
|
|
put_info(result.stdout, scope='control2')
|
|
toast("Unlock successful done", duration=4, position='center', color='success')
|
|
|
|
###############################################
|
|
# Function actions **control3**
|
|
###############################################
|
|
@use_scope('control3', clear=True)
|
|
def actions3(line, action):
|
|
|
|
match action:
|
|
|
|
#####################
|
|
# Migrate Update
|
|
#####################
|
|
case 'Update':
|
|
put_info(f"Migrate Update from {backups[line - 1].name}")
|
|
|
|
with put_loading():
|
|
try:
|
|
if backups[line - 1].rest == "0":
|
|
if not backups[line - 1].password:
|
|
# check if upgrade possible
|
|
args = ['restic',
|
|
'migrate',
|
|
'-r',
|
|
backups[line - 1].repository]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
if result.stdout.find("no migrations found"):
|
|
put_error('Not possible, no migration path available', scope='control3')
|
|
return
|
|
|
|
# upgrade
|
|
args = ['restic',
|
|
'migrate',
|
|
'upgrade_repo_v2',
|
|
'-r',
|
|
backups[line - 1].repository]
|
|
|
|
result = subprocess.run(args,
|
|
env=dict(os.environ,
|
|
RESTIC_PASSWORD=backups[line - 1].password),
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
|
|
# check if upgrade possible
|
|
args = ['restic',
|
|
'migrate',
|
|
'-r',
|
|
backups[line - 1].repository]
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
if result.stdout.find("no migrations found"):
|
|
put_error('Not possible, no migration path available', scope='control3')
|
|
toast("Not possible, no migration path available", duration=4, position='center',
|
|
color='error')
|
|
return
|
|
|
|
# upgrade
|
|
args = ['restic',
|
|
'migrate',
|
|
'upgrade_repo_v2',
|
|
'-r',
|
|
backups[line - 1].repository]
|
|
|
|
result = subprocess.run(args,
|
|
env=dict(os.environ,
|
|
RESTIC_PASSWORD=backups[line - 1].password),
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
if not backups[line - 1].password:
|
|
|
|
# check if upgrade possible
|
|
args = ['restic',
|
|
'migrate',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
if result.stdout.find("no migrations found"):
|
|
put_error('Not possible, no migration path available', scope='control3')
|
|
toast("Not possible, no migration path available", duration=4, position='center',
|
|
color='error')
|
|
return
|
|
|
|
# upgrade
|
|
args = ['restic',
|
|
'migrate',
|
|
'upgrade_repo_v2',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}']
|
|
|
|
result = subprocess.run(args,
|
|
env=dict(os.environ,
|
|
RESTIC_PASSWORD=backups[line - 1].password),
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
else:
|
|
|
|
# check if upgrade possible
|
|
args = ['restic',
|
|
'migrate',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}']
|
|
|
|
result = subprocess.run(args,
|
|
input=backups[line - 1].password,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True)
|
|
|
|
if result.stdout.find("no migrations found"):
|
|
put_error('Not possible, no migration path available', scope='control3')
|
|
toast("Not possible, no migration path available", duration=4, position='center',
|
|
color='error')
|
|
return
|
|
|
|
# upgrade
|
|
args = ['restic',
|
|
'migrate',
|
|
'upgrade_repo_v2',
|
|
'-r',
|
|
f'rest:https://'
|
|
f'{backups[line - 1].rest_user}:'
|
|
f'{backups[line - 1].rest_password}@'
|
|
f'{backups[line - 1].rest_domain}:'
|
|
f'{backups[line - 1].rest_port}/'
|
|
f'{backups[line - 1].rest_folder}']
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
put_error(error.stderr, scope='control3')
|
|
|
|
else:
|
|
put_info(result.stdout, scope='control3')
|
|
toast("Migrate Update successful done", duration=4, position='center', color='success')
|
|
|
|
###############################################
|
|
# Scopes
|
|
###############################################
|
|
|
|
with use_scope('main_menu', position=-1): # open and enter a new output: 'main_menu'
|
|
|
|
with open('images/Restic_Logo.png', 'rb') as load_image:
|
|
restic_image = load_image.read()
|
|
|
|
# read session info
|
|
# put_code(json.dumps({
|
|
# k: str(getattr(session_info, k))
|
|
# for k in ['user_agent', 'user_language', 'server_host',
|
|
# 'origin', 'user_ip', 'backend', 'protocol', 'request']
|
|
# }, indent=4), 'json')
|
|
|
|
user_info = str(session_info.user_agent)
|
|
|
|
if user_info == 'Generic Smartphone / Android 13 / Firefox Mobile 109.0':
|
|
put_grid([
|
|
[put_html('<h2>Restic UI - Webinterface for backup tool restic</h2>')]
|
|
], )
|
|
put_text("")
|
|
else:
|
|
put_grid([
|
|
[put_image(restic_image, width='auto', title="TEST"),
|
|
put_html('<h2>Restic UI - Webinterface for backup tool restic</h2>')]
|
|
], )
|
|
put_text("")
|
|
|
|
put_row(put_buttons(['Backup', 'File', 'Tools', 'Migrate', 'About'], group=True,
|
|
onclick=[backup, file, tools, migrate, about]))
|
|
backup()
|
|
|
|
|
|
###############################################
|
|
# Start server (Main loop)
|
|
###############################################
|
|
|
|
|
|
start_server(main, debug=True, port=9090)
|