From 733a7abc9d439b8a4d74fdcb6d63f1033ae896ac Mon Sep 17 00:00:00 2001 From: frank Date: Mon, 3 Mar 2025 11:21:41 +0000 Subject: [PATCH] main.py aktualisiert Refactoring --- main.py | 517 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 308 insertions(+), 209 deletions(-) diff --git a/main.py b/main.py index 38eb2b9..5fb9c75 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import os import redis import requests +import re from dotenv import load_dotenv @@ -22,14 +23,26 @@ REDIS_PASSWORD = os.getenv("REDIS_PASSWORD") # Load Mastodon MASTODON_TOKEN = os.getenv("MASTODON_TOKEN") -MASTODON_URL = os.getenv("MASTODON_URL") +MASTODON_URL = os.getenv("MASTODON_URL") # Load NodeBB ALLOWED_USER = os.getenv("ALLOWED_USER", "").split(',') -DISALLOWED_CATEGORIES = os.getenv("DISALLOWED_CATEGORIES") +DISALLOWED_CATEGORIES = os.getenv("DISALLOWED_CATEGORIES", "").split(',') URL = os.getenv("URL") +# Debug mode for more detailed output +DEBUG = False + + +############################################### +# Debug helper +############################################### +def debug_print(message): + if DEBUG: + print(f"DEBUG: {message}") + + ############################################### # Redis connection ############################################### @@ -41,281 +54,367 @@ def connect_redis(): # Connect to the Redis database with authentication redis_connection = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, - db= 0, + db=0, username=REDIS_USERNAME, password=REDIS_PASSWORD) + return redis_connection except redis.exceptions.RedisError as e: print('Error connecting to Redis:', e) return None - return redis_connection - ############################################### -# Get last_topic_id from redis db +# Find all numeric topic IDs from redis db ############################################### - -def get_last_topic_id(): +def find_numeric_topic_ids(): r = connect_redis() + if not r: + return [] - # Get the last Topic ID from the sorted set - last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True) - # we get [(b'1468', 1690834233147.0)] + try: + # Get all topic IDs from the sorted set + all_topic_ids = r.zrange('topics:tid', 0, -1) - if last_topic_id: - last_topic_id = int(last_topic_id[0][0].decode()) - return last_topic_id - else: - print('No Topics found in the sorted set.') + # Filter only numeric IDs + numeric_ids = [] + for topic_id in all_topic_ids: + id_str = topic_id.decode() + if id_str.isdigit(): + numeric_ids.append(id_str) + + # Sort them numerically + numeric_ids.sort(key=int) + + return numeric_ids + except Exception as e: + print(f"Error finding numeric topic IDs: {e}") + return [] + + +############################################### +# Get last numeric topic ID +############################################### +def get_last_numeric_topic_id(): + numeric_ids = find_numeric_topic_ids() + if not numeric_ids: + print("No numeric topic IDs found.") + return None + + # Return the highest numeric ID + last_id = numeric_ids[-1] + print(f"Found {len(numeric_ids)} numeric topic IDs. Last ID: {last_id}") + return last_id + +############################################### +# Get uid for a specific topic_id +############################################### +def get_topic_uid(topic_id): + r = connect_redis() + if not r: return None - -############################################### -# Get last_topic_uid from redis db -############################################### - -def get_last_topic_uid(): - r = connect_redis() - - # Get the last Topic ID from the sorted set - last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True) - - if last_topic_id: - last_topic_id = int(last_topic_id[0][0].decode()) - - # Get the topic data using the last_topic_id - topic_data = r.hgetall(f'topic:{last_topic_id}') + try: + # Get the topic data for the given topic_id + topic_data = r.hgetall(f'topic:{topic_id}') + debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}") # Extract the 'uid' from the topic_data dictionary - uid = int(topic_data.get(b'uid', -1).decode()) - - if uid != -1: + if b'uid' in topic_data: + uid = topic_data[b'uid'].decode() return uid else: print('UID not found in the topic data.') return None - else: - print('No Topics found in the sorted set.') + except Exception as e: + print(f"Error getting topic UID: {e}") return None ############################################### -# Get last_topic_cid from redis db +# Get cid for a specific topic_id ############################################### - -def get_last_topic_cid(): - # Get the last Topic Category ID (cid) from the sorted set +def get_topic_cid(topic_id): r = connect_redis() + if not r: + return None - last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True) - - if last_topic_id: - last_topic_id = int(last_topic_id[0][0].decode()) - - # Get the topic data using the last_topic_id - topic_data = r.hgetall(f'topic:{last_topic_id}') + try: + # Get the topic data for the given topic_id + topic_data = r.hgetall(f'topic:{topic_id}') + debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}") # Extract the 'cid' from the topic_data dictionary - cid = int(topic_data.get(b'cid', -1).decode()) - - if cid != -1: + if b'cid' in topic_data: + cid = topic_data[b'cid'].decode() return cid else: print('CID not found in the topic data.') return None - else: - print('No Topics found in the sorted set.') + except Exception as e: + print(f"Error getting topic CID: {e}") return None - ############################################### -# Get last_topic_slug from redis db +# Get slug for a specific topic_id ############################################### - -def get_last_topic_slug(): +def get_topic_slug(topic_id): r = connect_redis() + if not r: + return None - # Get the last Topic ID from the sorted set - last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True) + try: + # Get the topic data for the given topic_id + topic_data = r.hgetall(f'topic:{topic_id}') + debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}") - if last_topic_id: - last_topic_id = int(last_topic_id[0][0].decode()) - - # Get the topic data using the last_topic_id - topic_data = r.hgetall(f'topic:{last_topic_id}') - - # Extract the 'uid' from the topic_data dictionary - slug = topic_data.get(b'slug', b'').decode() - - if slug: + # Extract the 'slug' from the topic_data dictionary + if b'slug' in topic_data: + slug = topic_data[b'slug'].decode() return slug - else: - print('Slug not found in the topic data.') - return None - else: - print('No Topics found in the sorted set.') - return None + + # Try to get title if slug is not available + if b'title' in topic_data: + title = topic_data[b'title'].decode() + debug_print(f"Found title: {title}") + return title + + # If neither slug nor title is found, use the topic_id + return topic_id + except Exception as e: + print(f"Error getting topic slug: {e}") + return topic_id ############################################### -# Get topic_tags from redis db +# Get tags for a specific topic_id ############################################### - -def get_topic_tags(): - # Get the last Topic ID from the sorted set +def get_topic_tags(topic_id): r = connect_redis() + if not r: + return "" - last_topic_id = r.zrange('topics:tid', -1, -1, withscores=True) + try: + # Get the topic data for the given topic_id + topic_data = r.hgetall(f'topic:{topic_id}') + debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}") - if last_topic_id: - last_topic_id = int(last_topic_id[0][0].decode()) - try: - # Get the topic data using the last_topic_id - topic_data = r.hgetall(f'topic:{last_topic_id}') - # print("Topic_Data", topic_data) + # Look for tags field + if b'tags' in topic_data and topic_data[b'tags']: + tags_raw = topic_data[b'tags'].decode() + debug_print(f"Raw tags: {tags_raw}") - # Extract the 'tags' from the topic_data dictionary - tags = topic_data.get(b'tags', -1).decode() + # Handle empty tags + if not tags_raw: + return "" - tags_single = tags.split(",") + # Process tags - assuming comma-separated format + tags_list = tags_raw.split(',') + tags = [f"#{tag.strip()} " for tag in tags_list if tag.strip()] - tags = [] - - for index, element in enumerate(tags_single): - # print(index, ":", element) - tags.append(f"#{element} ") - - # Result ['#ansible', '#linux'] Build string - tags_list = '' - tags_string = tags_list.join(tags) - #print(tags_string) - - # Extract the 'uid' from the topic_data dictionary - # uid = int(topic_data.get(b'uid', -1).decode()) - - return tags_string - - except Exception as e: - print("NO Tags!: ", e) - else: - print('No Topics found in the sorted set.') - return None + return ''.join(tags) + return "" + except Exception as e: + print(f"Error getting topic tags: {e}") + return "" ############################################### # Check UID if allowed to post to mastodon ############################################### - -def check_uid(): - # get_topic_uid - uid = get_last_topic_uid() +def check_uid(topic_id): + # Get the user ID for the given topic + uid = get_topic_uid(topic_id) + + if uid is None: + print("Could not determine user ID") + return False + print(f'User ID of the topic creator: {uid}') + + # If ALLOWED_USER list is empty or contains empty string, allow all users + if not ALLOWED_USER or (len(ALLOWED_USER) == 1 and ALLOWED_USER[0] == ''): + print("No user restrictions, allowing all users") + return True + + # Check if the user is in the allowed list + for allowed in ALLOWED_USER: + allowed = allowed.strip() + if allowed and str(uid) == allowed: + print(f"User {uid} is allowed to post") + return True + + print(f"User {uid} is not in the allowed list") + return False - for item in ALLOWED_USER: - if str(uid) in item: - # print("Success",item) + +############################################### +# Check if category is disallowed +############################################### +def check_category_disallowed(topic_id): + # Get the category ID for the given topic + cid = get_topic_cid(topic_id) + + if cid is None: + print("Could not determine category ID") + return False + + print(f'Category ID of the topic: {cid}') + + # If DISALLOWED_CATEGORIES list is empty or contains empty string, allow all categories + if not DISALLOWED_CATEGORIES or (len(DISALLOWED_CATEGORIES) == 1 and DISALLOWED_CATEGORIES[0] == ''): + print("No category restrictions, allowing all categories") + return False + + # Check if the category is in the disallowed list + for disallowed in DISALLOWED_CATEGORIES: + disallowed = disallowed.strip() + if disallowed and str(cid) == disallowed: + print(f"Category {cid} is disallowed") + return True + + print(f"Category {cid} is allowed") + return False + +############################################### +# Display topic details without posting to Mastodon +############################################### +def display_topic_details(topic_id): + print("\n--- Topic Details ---") + print(f"Topic ID: {topic_id}") + + # Get and display the topic slug + title = get_topic_slug(topic_id) + print(f"Title/Slug: {title}") + + # Get and display tags + tags = get_topic_tags(topic_id) + print(f"Tags: {tags}") + + # Get and display user ID + uid = get_topic_uid(topic_id) + print(f"User ID: {uid}") + + # Get and display category ID + cid = get_topic_cid(topic_id) + print(f"Category ID: {cid}") + + # Display what would be posted to Mastodon + # Check if the title already contains the topic_id to avoid duplication + if title.startswith(f"{topic_id}/"): + # If title already has the ID prefix (like "1672/test"), use just the title + status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title}" + else: + status_parameter = f"Ein neuer Forumbeitrag: {URL}/{topic_id}/{title}" + + if tags: + status_parameter += f" {tags}" + + print("\nWould post to Mastodon:") + print(status_parameter) + print("--------------------\n") + +############################################### +# Post to Mastodon +############################################### +def post_to_mastodon(topic_id, status_message): + if DEBUG: + print(f"Would post to Mastodon: {status_message}") + return True + + try: + # Headers for the Mastodon API request + headers = { + 'Authorization': f'Bearer {MASTODON_TOKEN}' + } + + # Data for the POST request + data = { + 'status': status_message + } + + print(f"Posting to Mastodon: {status_message}") + + # Send the request to Mastodon + response = requests.post( + MASTODON_URL, + headers=headers, + data=data + ) + + # Check if the request was successful + if response.ok: + print('Mastodon post successful!') return True else: - print('Error') + print(f'Mastodon post failed with status code: {response.status_code}') + print(f'Error response: {response.text}') return False - - -############################################### -# Check CID if disallowed to post to mastodon -############################################### - -def check_cid(cid): - for item in DISALLOWED_CATEGORIES: - if str(cid) in item: - # print("Success",item) - return '1' - else: - print('Error') - return None - + + except Exception as e: + print(f'Error posting to Mastodon: {e}') + return False ############################################### # Main ############################################### -# Example usage +print("Starting NodeBB to Mastodon script" + (" (Debug Mode)" if DEBUG else "")) -# get_last_topic_id -last_topic_id = get_last_topic_id() -print('Last Topic ID:', last_topic_id) +# Get the latest numeric topic ID from the database +last_topic_id = get_last_numeric_topic_id() +print('Last Numeric Topic ID:', last_topic_id) -# get_last_topic_slug -title = get_last_topic_slug() -print("Title:", title) +if not last_topic_id: + print("No numeric topic IDs found, exiting") + exit() -# get_topic_tags -tags = get_topic_tags() -print("Tags", tags) - -# get_topic_cid - Categories ID -cid = get_last_topic_cid() -print("CID:", cid) - -# CID 13 - disallowed -if get_last_topic_cid() == 13: - # I have private categories that I don't post to Mastodon. - print("Disallowed CID") -else: - - if check_uid(): # True - allowed to post - - # read second_last_topic_id from file - with open("last_topic_id.txt", "r") as f: - data = f.readline() - f.close() - - # check for new topic - last_topic_stored = int(data.strip()) - - if last_topic_id > int(last_topic_stored): - - try: - if tags is None: - # Construct the status parameter by concatenating the title and URL - status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title} {title}" - else: - # Construct the status parameter by concatenating the title and URL - status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title} {title} {tags}" - - # Headers with the bearer token - headers = { - 'Authorization': f'Bearer {MASTODON_TOKEN}' - } - - # Data for the POST request - data = { - 'status': status_parameter - } - - response = None - try: - # Send the POST request using requests - response = requests.post(MASTODON_URL, - headers=headers, - data=data) - - # Check if the request was successful (status code 200) - if not response.ok: - print(f'Request failed with status code: {response.status_code}') - print(f'Error response: {response.text}') - - except requests.exceptions.RequestException as e: - print(f'Request failed: {e}') - - else: - print('Request successful!') - - except Exception as e: - print('An unexpected error occurred:', e) - - # write new last_topic_id to file - with open("last_topic_id.txt", "w") as f: - f.write(str(last_topic_id)) +# Read the last processed topic ID from file +try: + with open("last_topic_id.txt", "r") as f: + last_processed_id = f.readline().strip() + print(f"Last processed topic ID: {last_processed_id}") +except (FileNotFoundError, IOError): + print("No previous topic ID file found, creating new one") + last_processed_id = "0" # Start from 0 if no file exists +# Check if this is a new topic +if int(last_topic_id) > int(last_processed_id): + print(f"New topic detected (Last: {last_processed_id}, Current: {last_topic_id})") + + # Display details about the new topic + display_topic_details(last_topic_id) + + # Check if the category is disallowed + if check_category_disallowed(last_topic_id): + print("Topic is in a disallowed category, not posting to Mastodon") + elif not check_uid(last_topic_id): + print("User is not allowed to post to Mastodon") else: - print("Disallowed to post") + # Get the topic details for posting + title = get_topic_slug(last_topic_id) + tags = get_topic_tags(last_topic_id) + # Prepare the status message + if title.startswith(f"{last_topic_id}/"): + # If title already has the ID prefix (like "1672/test"), use just the title + status_message = f"Ein neuer Forumbeitrag: {URL}/{title}" + else: + # Otherwise, include the ID in the URL + status_message = f"Ein neuer Forumbeitrag: {URL}/{last_topic_id}/{title}" + + # Add tags if available + if tags: + status_message += f" {tags}" + + # Post to Mastodon + success = post_to_mastodon(last_topic_id, status_message) + + # Update the last_topic_id.txt file if successful or in debug mode + if success or DEBUG: + if DEBUG: + print(f"Debug mode: Would update last_topic_id.txt to {last_topic_id}") + else: + print(f"Updating last_topic_id.txt to {last_topic_id}") + with open("last_topic_id.txt", "w") as f: + f.write(last_topic_id) +else: + print(f"No new topics since last run (Last: {last_processed_id}, Current: {last_topic_id})") \ No newline at end of file