main.py aktualisiert

Refactoring
This commit is contained in:
frank 2025-03-03 11:21:41 +00:00
parent 2f764208bd
commit 733a7abc9d

501
main.py
View file

@ -4,6 +4,7 @@
import os
import redis
import requests
import re
from dotenv import load_dotenv
@ -26,10 +27,22 @@ MASTODON_URL = os.getenv("MASTODON_URL")
# Load NodeBB
ALLOWED_USER = os.getenv("ALLOWED_USER", "").split(',')
DISALLOWED_CATEGORIES = os.getenv("DISALLOWED_CATEGORIES")
DISALLOWED_CATEGORIES = os.getenv("DISALLOWED_CATEGORIES", "").split(',')
URL = os.getenv("URL")
# Debug mode for more detailed output
DEBUG = False
###############################################
# Debug helper
###############################################
def debug_print(message):
if DEBUG:
print(f"DEBUG: {message}")
###############################################
# Redis connection
###############################################
@ -41,281 +54,367 @@ def connect_redis():
# Connect to the Redis database with authentication
redis_connection = redis.Redis(host=REDIS_HOST,
port=REDIS_PORT,
db= 0,
db=0,
username=REDIS_USERNAME,
password=REDIS_PASSWORD)
return redis_connection
except redis.exceptions.RedisError as e:
print('Error connecting to Redis:', e)
return None
return redis_connection
###############################################
# Get last_topic_id from redis db
# Find all numeric topic IDs from redis db
###############################################
def get_last_topic_id():
def find_numeric_topic_ids():
r = connect_redis()
if not r:
return []
# Get the last Topic ID from the sorted set
last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True)
# we get [(b'1468', 1690834233147.0)]
try:
# Get all topic IDs from the sorted set
all_topic_ids = r.zrange('topics:tid', 0, -1)
if last_topic_id:
last_topic_id = int(last_topic_id[0][0].decode())
return last_topic_id
else:
print('No Topics found in the sorted set.')
# Filter only numeric IDs
numeric_ids = []
for topic_id in all_topic_ids:
id_str = topic_id.decode()
if id_str.isdigit():
numeric_ids.append(id_str)
# Sort them numerically
numeric_ids.sort(key=int)
return numeric_ids
except Exception as e:
print(f"Error finding numeric topic IDs: {e}")
return []
###############################################
# Get last numeric topic ID
###############################################
def get_last_numeric_topic_id():
numeric_ids = find_numeric_topic_ids()
if not numeric_ids:
print("No numeric topic IDs found.")
return None
# Return the highest numeric ID
last_id = numeric_ids[-1]
print(f"Found {len(numeric_ids)} numeric topic IDs. Last ID: {last_id}")
return last_id
###############################################
# Get last_topic_uid from redis db
# Get uid for a specific topic_id
###############################################
def get_last_topic_uid():
def get_topic_uid(topic_id):
r = connect_redis()
if not r:
return None
# Get the last Topic ID from the sorted set
last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True)
if last_topic_id:
last_topic_id = int(last_topic_id[0][0].decode())
# Get the topic data using the last_topic_id
topic_data = r.hgetall(f'topic:{last_topic_id}')
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Extract the 'uid' from the topic_data dictionary
uid = int(topic_data.get(b'uid', -1).decode())
if uid != -1:
if b'uid' in topic_data:
uid = topic_data[b'uid'].decode()
return uid
else:
print('UID not found in the topic data.')
return None
else:
print('No Topics found in the sorted set.')
except Exception as e:
print(f"Error getting topic UID: {e}")
return None
###############################################
# Get last_topic_cid from redis db
# Get cid for a specific topic_id
###############################################
def get_last_topic_cid():
# Get the last Topic Category ID (cid) from the sorted set
def get_topic_cid(topic_id):
r = connect_redis()
if not r:
return None
last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True)
if last_topic_id:
last_topic_id = int(last_topic_id[0][0].decode())
# Get the topic data using the last_topic_id
topic_data = r.hgetall(f'topic:{last_topic_id}')
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Extract the 'cid' from the topic_data dictionary
cid = int(topic_data.get(b'cid', -1).decode())
if cid != -1:
if b'cid' in topic_data:
cid = topic_data[b'cid'].decode()
return cid
else:
print('CID not found in the topic data.')
return None
else:
print('No Topics found in the sorted set.')
return None
###############################################
# Get last_topic_slug from redis db
###############################################
def get_last_topic_slug():
r = connect_redis()
# Get the last Topic ID from the sorted set
last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True)
if last_topic_id:
last_topic_id = int(last_topic_id[0][0].decode())
# Get the topic data using the last_topic_id
topic_data = r.hgetall(f'topic:{last_topic_id}')
# Extract the 'uid' from the topic_data dictionary
slug = topic_data.get(b'slug', b'').decode()
if slug:
return slug
else:
print('Slug not found in the topic data.')
return None
else:
print('No Topics found in the sorted set.')
return None
###############################################
# Get topic_tags from redis db
###############################################
def get_topic_tags():
# Get the last Topic ID from the sorted set
r = connect_redis()
last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True)
if last_topic_id:
last_topic_id = int(last_topic_id[0][0].decode())
try:
# Get the topic data using the last_topic_id
topic_data = r.hgetall(f'topic:{last_topic_id}')
# print("Topic_Data", topic_data)
# Extract the 'tags' from the topic_data dictionary
tags = topic_data.get(b'tags', -1).decode()
tags_single = tags.split(",")
tags = []
for index, element in enumerate(tags_single):
# print(index, ":", element)
tags.append(f"#{element} ")
# Result ['#ansible', '#linux'] Build string
tags_list = ''
tags_string = tags_list.join(tags)
#print(tags_string)
# Extract the 'uid' from the topic_data dictionary
# uid = int(topic_data.get(b'uid', -1).decode())
return tags_string
except Exception as e:
print("NO Tags!: ", e)
else:
print('No Topics found in the sorted set.')
print(f"Error getting topic CID: {e}")
return None
###############################################
# Get slug for a specific topic_id
###############################################
def get_topic_slug(topic_id):
r = connect_redis()
if not r:
return None
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Extract the 'slug' from the topic_data dictionary
if b'slug' in topic_data:
slug = topic_data[b'slug'].decode()
return slug
# Try to get title if slug is not available
if b'title' in topic_data:
title = topic_data[b'title'].decode()
debug_print(f"Found title: {title}")
return title
# If neither slug nor title is found, use the topic_id
return topic_id
except Exception as e:
print(f"Error getting topic slug: {e}")
return topic_id
###############################################
# Get tags for a specific topic_id
###############################################
def get_topic_tags(topic_id):
r = connect_redis()
if not r:
return ""
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Look for tags field
if b'tags' in topic_data and topic_data[b'tags']:
tags_raw = topic_data[b'tags'].decode()
debug_print(f"Raw tags: {tags_raw}")
# Handle empty tags
if not tags_raw:
return ""
# Process tags - assuming comma-separated format
tags_list = tags_raw.split(',')
tags = [f"#{tag.strip()} " for tag in tags_list if tag.strip()]
return ''.join(tags)
return ""
except Exception as e:
print(f"Error getting topic tags: {e}")
return ""
###############################################
# Check UID if allowed to post to mastodon
###############################################
def check_uid(topic_id):
# Get the user ID for the given topic
uid = get_topic_uid(topic_id)
if uid is None:
print("Could not determine user ID")
return False
def check_uid():
# get_topic_uid
uid = get_last_topic_uid()
print(f'User ID of the topic creator: {uid}')
for item in ALLOWED_USER:
if str(uid) in item:
# print("Success",item)
# If ALLOWED_USER list is empty or contains empty string, allow all users
if not ALLOWED_USER or (len(ALLOWED_USER) == 1 and ALLOWED_USER[0] == ''):
print("No user restrictions, allowing all users")
return True
else:
print('Error')
# Check if the user is in the allowed list
for allowed in ALLOWED_USER:
allowed = allowed.strip()
if allowed and str(uid) == allowed:
print(f"User {uid} is allowed to post")
return True
print(f"User {uid} is not in the allowed list")
return False
###############################################
# Check CID if disallowed to post to mastodon
# Check if category is disallowed
###############################################
def check_category_disallowed(topic_id):
# Get the category ID for the given topic
cid = get_topic_cid(topic_id)
def check_cid(cid):
for item in DISALLOWED_CATEGORIES:
if str(cid) in item:
# print("Success",item)
return '1'
if cid is None:
print("Could not determine category ID")
return False
print(f'Category ID of the topic: {cid}')
# If DISALLOWED_CATEGORIES list is empty or contains empty string, allow all categories
if not DISALLOWED_CATEGORIES or (len(DISALLOWED_CATEGORIES) == 1 and DISALLOWED_CATEGORIES[0] == ''):
print("No category restrictions, allowing all categories")
return False
# Check if the category is in the disallowed list
for disallowed in DISALLOWED_CATEGORIES:
disallowed = disallowed.strip()
if disallowed and str(cid) == disallowed:
print(f"Category {cid} is disallowed")
return True
print(f"Category {cid} is allowed")
return False
###############################################
# Display topic details without posting to Mastodon
###############################################
def display_topic_details(topic_id):
print("\n--- Topic Details ---")
print(f"Topic ID: {topic_id}")
# Get and display the topic slug
title = get_topic_slug(topic_id)
print(f"Title/Slug: {title}")
# Get and display tags
tags = get_topic_tags(topic_id)
print(f"Tags: {tags}")
# Get and display user ID
uid = get_topic_uid(topic_id)
print(f"User ID: {uid}")
# Get and display category ID
cid = get_topic_cid(topic_id)
print(f"Category ID: {cid}")
# Display what would be posted to Mastodon
# Check if the title already contains the topic_id to avoid duplication
if title.startswith(f"{topic_id}/"):
# If title already has the ID prefix (like "1672/test"), use just the title
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title}"
else:
print('Error')
return None
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{topic_id}/{title}"
if tags:
status_parameter += f" {tags}"
print("\nWould post to Mastodon:")
print(status_parameter)
print("--------------------\n")
###############################################
# Main
# Post to Mastodon
###############################################
# Example usage
# get_last_topic_id
last_topic_id = get_last_topic_id()
print('Last Topic ID:', last_topic_id)
# get_last_topic_slug
title = get_last_topic_slug()
print("Title:", title)
# get_topic_tags
tags = get_topic_tags()
print("Tags", tags)
# get_topic_cid - Categories ID
cid = get_last_topic_cid()
print("CID:", cid)
# CID 13 - disallowed
if get_last_topic_cid() == 13:
# I have private categories that I don't post to Mastodon.
print("Disallowed CID")
else:
if check_uid(): # True - allowed to post
# read second_last_topic_id from file
with open("last_topic_id.txt", "r") as f:
data = f.readline()
f.close()
# check for new topic
last_topic_stored = int(data.strip())
if last_topic_id > int(last_topic_stored):
def post_to_mastodon(topic_id, status_message):
if DEBUG:
print(f"Would post to Mastodon: {status_message}")
return True
try:
if tags is None:
# Construct the status parameter by concatenating the title and URL
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title} {title}"
else:
# Construct the status parameter by concatenating the title and URL
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title} {title} {tags}"
# Headers with the bearer token
# Headers for the Mastodon API request
headers = {
'Authorization': f'Bearer {MASTODON_TOKEN}'
}
# Data for the POST request
data = {
'status': status_parameter
'status': status_message
}
response = None
try:
# Send the POST request using requests
response = requests.post(MASTODON_URL,
print(f"Posting to Mastodon: {status_message}")
# Send the request to Mastodon
response = requests.post(
MASTODON_URL,
headers=headers,
data=data)
# Check if the request was successful (status code 200)
if not response.ok:
print(f'Request failed with status code: {response.status_code}')
print(f'Error response: {response.text}')
except requests.exceptions.RequestException as e:
print(f'Request failed: {e}')
data=data
)
# Check if the request was successful
if response.ok:
print('Mastodon post successful!')
return True
else:
print('Request successful!')
print(f'Mastodon post failed with status code: {response.status_code}')
print(f'Error response: {response.text}')
return False
except Exception as e:
print('An unexpected error occurred:', e)
print(f'Error posting to Mastodon: {e}')
return False
# write new last_topic_id to file
with open("last_topic_id.txt", "w") as f:
f.write(str(last_topic_id))
###############################################
# Main
###############################################
print("Starting NodeBB to Mastodon script" + (" (Debug Mode)" if DEBUG else ""))
# Get the latest numeric topic ID from the database
last_topic_id = get_last_numeric_topic_id()
print('Last Numeric Topic ID:', last_topic_id)
if not last_topic_id:
print("No numeric topic IDs found, exiting")
exit()
# Read the last processed topic ID from file
try:
with open("last_topic_id.txt", "r") as f:
last_processed_id = f.readline().strip()
print(f"Last processed topic ID: {last_processed_id}")
except (FileNotFoundError, IOError):
print("No previous topic ID file found, creating new one")
last_processed_id = "0" # Start from 0 if no file exists
# Check if this is a new topic
if int(last_topic_id) > int(last_processed_id):
print(f"New topic detected (Last: {last_processed_id}, Current: {last_topic_id})")
# Display details about the new topic
display_topic_details(last_topic_id)
# Check if the category is disallowed
if check_category_disallowed(last_topic_id):
print("Topic is in a disallowed category, not posting to Mastodon")
elif not check_uid(last_topic_id):
print("User is not allowed to post to Mastodon")
else:
print("Disallowed to post")
# Get the topic details for posting
title = get_topic_slug(last_topic_id)
tags = get_topic_tags(last_topic_id)
# Prepare the status message
if title.startswith(f"{last_topic_id}/"):
# If title already has the ID prefix (like "1672/test"), use just the title
status_message = f"Ein neuer Forumbeitrag: {URL}/{title}"
else:
# Otherwise, include the ID in the URL
status_message = f"Ein neuer Forumbeitrag: {URL}/{last_topic_id}/{title}"
# Add tags if available
if tags:
status_message += f" {tags}"
# Post to Mastodon
success = post_to_mastodon(last_topic_id, status_message)
# Update the last_topic_id.txt file if successful or in debug mode
if success or DEBUG:
if DEBUG:
print(f"Debug mode: Would update last_topic_id.txt to {last_topic_id}")
else:
print(f"Updating last_topic_id.txt to {last_topic_id}")
with open("last_topic_id.txt", "w") as f:
f.write(last_topic_id)
else:
print(f"No new topics since last run (Last: {last_processed_id}, Current: {last_topic_id})")