nodebb_post_to_mastodon/main.py
2025-03-03 11:21:41 +00:00

420 lines
No EOL
13 KiB
Python

###############################################
# Imports
###############################################
import os
import redis
import requests
import re
from dotenv import load_dotenv
###############################################
# Settings
###############################################
# Read settings from file .env
load_dotenv()
# Fetching Redis configuration values from environment variables
REDIS_HOST = os.getenv("REDIS_HOST")
REDIS_PORT = os.getenv("REDIS_PORT")
REDIS_DB = os.getenv("REDIS_DB")
REDIS_USERNAME = os.getenv("REDIS_USERNAME")
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
# Load Mastodon
MASTODON_TOKEN = os.getenv("MASTODON_TOKEN")
MASTODON_URL = os.getenv("MASTODON_URL")
# Load NodeBB
ALLOWED_USER = os.getenv("ALLOWED_USER", "").split(',')
DISALLOWED_CATEGORIES = os.getenv("DISALLOWED_CATEGORIES", "").split(',')
URL = os.getenv("URL")
# Debug mode for more detailed output
DEBUG = False
###############################################
# Debug helper
###############################################
def debug_print(message):
if DEBUG:
print(f"DEBUG: {message}")
###############################################
# Redis connection
###############################################
def connect_redis():
# When I need another db as 0 the redis user must have permission SELECT in redis.
# My user don't have this permission, so I set db to 0
# When I use REDIS_DB I got an error for missing SELECT permission!
try:
# Connect to the Redis database with authentication
redis_connection = redis.Redis(host=REDIS_HOST,
port=REDIS_PORT,
db=0,
username=REDIS_USERNAME,
password=REDIS_PASSWORD)
return redis_connection
except redis.exceptions.RedisError as e:
print('Error connecting to Redis:', e)
return None
###############################################
# Find all numeric topic IDs from redis db
###############################################
def find_numeric_topic_ids():
r = connect_redis()
if not r:
return []
try:
# Get all topic IDs from the sorted set
all_topic_ids = r.zrange('topics:tid', 0, -1)
# Filter only numeric IDs
numeric_ids = []
for topic_id in all_topic_ids:
id_str = topic_id.decode()
if id_str.isdigit():
numeric_ids.append(id_str)
# Sort them numerically
numeric_ids.sort(key=int)
return numeric_ids
except Exception as e:
print(f"Error finding numeric topic IDs: {e}")
return []
###############################################
# Get last numeric topic ID
###############################################
def get_last_numeric_topic_id():
numeric_ids = find_numeric_topic_ids()
if not numeric_ids:
print("No numeric topic IDs found.")
return None
# Return the highest numeric ID
last_id = numeric_ids[-1]
print(f"Found {len(numeric_ids)} numeric topic IDs. Last ID: {last_id}")
return last_id
###############################################
# Get uid for a specific topic_id
###############################################
def get_topic_uid(topic_id):
r = connect_redis()
if not r:
return None
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Extract the 'uid' from the topic_data dictionary
if b'uid' in topic_data:
uid = topic_data[b'uid'].decode()
return uid
else:
print('UID not found in the topic data.')
return None
except Exception as e:
print(f"Error getting topic UID: {e}")
return None
###############################################
# Get cid for a specific topic_id
###############################################
def get_topic_cid(topic_id):
r = connect_redis()
if not r:
return None
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Extract the 'cid' from the topic_data dictionary
if b'cid' in topic_data:
cid = topic_data[b'cid'].decode()
return cid
else:
print('CID not found in the topic data.')
return None
except Exception as e:
print(f"Error getting topic CID: {e}")
return None
###############################################
# Get slug for a specific topic_id
###############################################
def get_topic_slug(topic_id):
r = connect_redis()
if not r:
return None
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Extract the 'slug' from the topic_data dictionary
if b'slug' in topic_data:
slug = topic_data[b'slug'].decode()
return slug
# Try to get title if slug is not available
if b'title' in topic_data:
title = topic_data[b'title'].decode()
debug_print(f"Found title: {title}")
return title
# If neither slug nor title is found, use the topic_id
return topic_id
except Exception as e:
print(f"Error getting topic slug: {e}")
return topic_id
###############################################
# Get tags for a specific topic_id
###############################################
def get_topic_tags(topic_id):
r = connect_redis()
if not r:
return ""
try:
# Get the topic data for the given topic_id
topic_data = r.hgetall(f'topic:{topic_id}')
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
# Look for tags field
if b'tags' in topic_data and topic_data[b'tags']:
tags_raw = topic_data[b'tags'].decode()
debug_print(f"Raw tags: {tags_raw}")
# Handle empty tags
if not tags_raw:
return ""
# Process tags - assuming comma-separated format
tags_list = tags_raw.split(',')
tags = [f"#{tag.strip()} " for tag in tags_list if tag.strip()]
return ''.join(tags)
return ""
except Exception as e:
print(f"Error getting topic tags: {e}")
return ""
###############################################
# Check UID if allowed to post to mastodon
###############################################
def check_uid(topic_id):
# Get the user ID for the given topic
uid = get_topic_uid(topic_id)
if uid is None:
print("Could not determine user ID")
return False
print(f'User ID of the topic creator: {uid}')
# If ALLOWED_USER list is empty or contains empty string, allow all users
if not ALLOWED_USER or (len(ALLOWED_USER) == 1 and ALLOWED_USER[0] == ''):
print("No user restrictions, allowing all users")
return True
# Check if the user is in the allowed list
for allowed in ALLOWED_USER:
allowed = allowed.strip()
if allowed and str(uid) == allowed:
print(f"User {uid} is allowed to post")
return True
print(f"User {uid} is not in the allowed list")
return False
###############################################
# Check if category is disallowed
###############################################
def check_category_disallowed(topic_id):
# Get the category ID for the given topic
cid = get_topic_cid(topic_id)
if cid is None:
print("Could not determine category ID")
return False
print(f'Category ID of the topic: {cid}')
# If DISALLOWED_CATEGORIES list is empty or contains empty string, allow all categories
if not DISALLOWED_CATEGORIES or (len(DISALLOWED_CATEGORIES) == 1 and DISALLOWED_CATEGORIES[0] == ''):
print("No category restrictions, allowing all categories")
return False
# Check if the category is in the disallowed list
for disallowed in DISALLOWED_CATEGORIES:
disallowed = disallowed.strip()
if disallowed and str(cid) == disallowed:
print(f"Category {cid} is disallowed")
return True
print(f"Category {cid} is allowed")
return False
###############################################
# Display topic details without posting to Mastodon
###############################################
def display_topic_details(topic_id):
print("\n--- Topic Details ---")
print(f"Topic ID: {topic_id}")
# Get and display the topic slug
title = get_topic_slug(topic_id)
print(f"Title/Slug: {title}")
# Get and display tags
tags = get_topic_tags(topic_id)
print(f"Tags: {tags}")
# Get and display user ID
uid = get_topic_uid(topic_id)
print(f"User ID: {uid}")
# Get and display category ID
cid = get_topic_cid(topic_id)
print(f"Category ID: {cid}")
# Display what would be posted to Mastodon
# Check if the title already contains the topic_id to avoid duplication
if title.startswith(f"{topic_id}/"):
# If title already has the ID prefix (like "1672/test"), use just the title
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title}"
else:
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{topic_id}/{title}"
if tags:
status_parameter += f" {tags}"
print("\nWould post to Mastodon:")
print(status_parameter)
print("--------------------\n")
###############################################
# Post to Mastodon
###############################################
def post_to_mastodon(topic_id, status_message):
if DEBUG:
print(f"Would post to Mastodon: {status_message}")
return True
try:
# Headers for the Mastodon API request
headers = {
'Authorization': f'Bearer {MASTODON_TOKEN}'
}
# Data for the POST request
data = {
'status': status_message
}
print(f"Posting to Mastodon: {status_message}")
# Send the request to Mastodon
response = requests.post(
MASTODON_URL,
headers=headers,
data=data
)
# Check if the request was successful
if response.ok:
print('Mastodon post successful!')
return True
else:
print(f'Mastodon post failed with status code: {response.status_code}')
print(f'Error response: {response.text}')
return False
except Exception as e:
print(f'Error posting to Mastodon: {e}')
return False
###############################################
# Main
###############################################
print("Starting NodeBB to Mastodon script" + (" (Debug Mode)" if DEBUG else ""))
# Get the latest numeric topic ID from the database
last_topic_id = get_last_numeric_topic_id()
print('Last Numeric Topic ID:', last_topic_id)
if not last_topic_id:
print("No numeric topic IDs found, exiting")
exit()
# Read the last processed topic ID from file
try:
with open("last_topic_id.txt", "r") as f:
last_processed_id = f.readline().strip()
print(f"Last processed topic ID: {last_processed_id}")
except (FileNotFoundError, IOError):
print("No previous topic ID file found, creating new one")
last_processed_id = "0" # Start from 0 if no file exists
# Check if this is a new topic
if int(last_topic_id) > int(last_processed_id):
print(f"New topic detected (Last: {last_processed_id}, Current: {last_topic_id})")
# Display details about the new topic
display_topic_details(last_topic_id)
# Check if the category is disallowed
if check_category_disallowed(last_topic_id):
print("Topic is in a disallowed category, not posting to Mastodon")
elif not check_uid(last_topic_id):
print("User is not allowed to post to Mastodon")
else:
# Get the topic details for posting
title = get_topic_slug(last_topic_id)
tags = get_topic_tags(last_topic_id)
# Prepare the status message
if title.startswith(f"{last_topic_id}/"):
# If title already has the ID prefix (like "1672/test"), use just the title
status_message = f"Ein neuer Forumbeitrag: {URL}/{title}"
else:
# Otherwise, include the ID in the URL
status_message = f"Ein neuer Forumbeitrag: {URL}/{last_topic_id}/{title}"
# Add tags if available
if tags:
status_message += f" {tags}"
# Post to Mastodon
success = post_to_mastodon(last_topic_id, status_message)
# Update the last_topic_id.txt file if successful or in debug mode
if success or DEBUG:
if DEBUG:
print(f"Debug mode: Would update last_topic_id.txt to {last_topic_id}")
else:
print(f"Updating last_topic_id.txt to {last_topic_id}")
with open("last_topic_id.txt", "w") as f:
f.write(last_topic_id)
else:
print(f"No new topics since last run (Last: {last_processed_id}, Current: {last_topic_id})")