420 lines
No EOL
13 KiB
Python
420 lines
No EOL
13 KiB
Python
###############################################
|
|
# Imports
|
|
###############################################
|
|
import os
|
|
import redis
|
|
import requests
|
|
import re
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
###############################################
|
|
# Settings
|
|
###############################################
|
|
# Read settings from file .env
|
|
load_dotenv()
|
|
|
|
# Fetching Redis configuration values from environment variables
|
|
REDIS_HOST = os.getenv("REDIS_HOST")
|
|
REDIS_PORT = os.getenv("REDIS_PORT")
|
|
REDIS_DB = os.getenv("REDIS_DB")
|
|
REDIS_USERNAME = os.getenv("REDIS_USERNAME")
|
|
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
|
|
|
|
# Load Mastodon
|
|
MASTODON_TOKEN = os.getenv("MASTODON_TOKEN")
|
|
MASTODON_URL = os.getenv("MASTODON_URL")
|
|
|
|
# Load NodeBB
|
|
ALLOWED_USER = os.getenv("ALLOWED_USER", "").split(',')
|
|
DISALLOWED_CATEGORIES = os.getenv("DISALLOWED_CATEGORIES", "").split(',')
|
|
|
|
URL = os.getenv("URL")
|
|
|
|
# Debug mode for more detailed output
|
|
DEBUG = False
|
|
|
|
|
|
###############################################
|
|
# Debug helper
|
|
###############################################
|
|
def debug_print(message):
|
|
if DEBUG:
|
|
print(f"DEBUG: {message}")
|
|
|
|
|
|
###############################################
|
|
# Redis connection
|
|
###############################################
|
|
def connect_redis():
|
|
# When I need another db as 0 the redis user must have permission SELECT in redis.
|
|
# My user don't have this permission, so I set db to 0
|
|
# When I use REDIS_DB I got an error for missing SELECT permission!
|
|
try:
|
|
# Connect to the Redis database with authentication
|
|
redis_connection = redis.Redis(host=REDIS_HOST,
|
|
port=REDIS_PORT,
|
|
db=0,
|
|
username=REDIS_USERNAME,
|
|
password=REDIS_PASSWORD)
|
|
return redis_connection
|
|
except redis.exceptions.RedisError as e:
|
|
print('Error connecting to Redis:', e)
|
|
return None
|
|
|
|
###############################################
|
|
# Find all numeric topic IDs from redis db
|
|
###############################################
|
|
def find_numeric_topic_ids():
|
|
r = connect_redis()
|
|
if not r:
|
|
return []
|
|
|
|
try:
|
|
# Get all topic IDs from the sorted set
|
|
all_topic_ids = r.zrange('topics:tid', 0, -1)
|
|
|
|
# Filter only numeric IDs
|
|
numeric_ids = []
|
|
for topic_id in all_topic_ids:
|
|
id_str = topic_id.decode()
|
|
if id_str.isdigit():
|
|
numeric_ids.append(id_str)
|
|
|
|
# Sort them numerically
|
|
numeric_ids.sort(key=int)
|
|
|
|
return numeric_ids
|
|
except Exception as e:
|
|
print(f"Error finding numeric topic IDs: {e}")
|
|
return []
|
|
|
|
|
|
###############################################
|
|
# Get last numeric topic ID
|
|
###############################################
|
|
def get_last_numeric_topic_id():
|
|
numeric_ids = find_numeric_topic_ids()
|
|
if not numeric_ids:
|
|
print("No numeric topic IDs found.")
|
|
return None
|
|
|
|
# Return the highest numeric ID
|
|
last_id = numeric_ids[-1]
|
|
print(f"Found {len(numeric_ids)} numeric topic IDs. Last ID: {last_id}")
|
|
return last_id
|
|
|
|
###############################################
|
|
# Get uid for a specific topic_id
|
|
###############################################
|
|
def get_topic_uid(topic_id):
|
|
r = connect_redis()
|
|
if not r:
|
|
return None
|
|
|
|
try:
|
|
# Get the topic data for the given topic_id
|
|
topic_data = r.hgetall(f'topic:{topic_id}')
|
|
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
|
|
|
|
# Extract the 'uid' from the topic_data dictionary
|
|
if b'uid' in topic_data:
|
|
uid = topic_data[b'uid'].decode()
|
|
return uid
|
|
else:
|
|
print('UID not found in the topic data.')
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error getting topic UID: {e}")
|
|
return None
|
|
|
|
|
|
###############################################
|
|
# Get cid for a specific topic_id
|
|
###############################################
|
|
def get_topic_cid(topic_id):
|
|
r = connect_redis()
|
|
if not r:
|
|
return None
|
|
|
|
try:
|
|
# Get the topic data for the given topic_id
|
|
topic_data = r.hgetall(f'topic:{topic_id}')
|
|
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
|
|
|
|
# Extract the 'cid' from the topic_data dictionary
|
|
if b'cid' in topic_data:
|
|
cid = topic_data[b'cid'].decode()
|
|
return cid
|
|
else:
|
|
print('CID not found in the topic data.')
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error getting topic CID: {e}")
|
|
return None
|
|
|
|
###############################################
|
|
# Get slug for a specific topic_id
|
|
###############################################
|
|
def get_topic_slug(topic_id):
|
|
r = connect_redis()
|
|
if not r:
|
|
return None
|
|
|
|
try:
|
|
# Get the topic data for the given topic_id
|
|
topic_data = r.hgetall(f'topic:{topic_id}')
|
|
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
|
|
|
|
# Extract the 'slug' from the topic_data dictionary
|
|
if b'slug' in topic_data:
|
|
slug = topic_data[b'slug'].decode()
|
|
return slug
|
|
|
|
# Try to get title if slug is not available
|
|
if b'title' in topic_data:
|
|
title = topic_data[b'title'].decode()
|
|
debug_print(f"Found title: {title}")
|
|
return title
|
|
|
|
# If neither slug nor title is found, use the topic_id
|
|
return topic_id
|
|
except Exception as e:
|
|
print(f"Error getting topic slug: {e}")
|
|
return topic_id
|
|
|
|
|
|
###############################################
|
|
# Get tags for a specific topic_id
|
|
###############################################
|
|
def get_topic_tags(topic_id):
|
|
r = connect_redis()
|
|
if not r:
|
|
return ""
|
|
|
|
try:
|
|
# Get the topic data for the given topic_id
|
|
topic_data = r.hgetall(f'topic:{topic_id}')
|
|
debug_print(f"Topic {topic_id} data keys: {[k.decode() for k in topic_data.keys()]}")
|
|
|
|
# Look for tags field
|
|
if b'tags' in topic_data and topic_data[b'tags']:
|
|
tags_raw = topic_data[b'tags'].decode()
|
|
debug_print(f"Raw tags: {tags_raw}")
|
|
|
|
# Handle empty tags
|
|
if not tags_raw:
|
|
return ""
|
|
|
|
# Process tags - assuming comma-separated format
|
|
tags_list = tags_raw.split(',')
|
|
tags = [f"#{tag.strip()} " for tag in tags_list if tag.strip()]
|
|
|
|
return ''.join(tags)
|
|
|
|
return ""
|
|
except Exception as e:
|
|
print(f"Error getting topic tags: {e}")
|
|
return ""
|
|
|
|
###############################################
|
|
# Check UID if allowed to post to mastodon
|
|
###############################################
|
|
def check_uid(topic_id):
|
|
# Get the user ID for the given topic
|
|
uid = get_topic_uid(topic_id)
|
|
|
|
if uid is None:
|
|
print("Could not determine user ID")
|
|
return False
|
|
|
|
print(f'User ID of the topic creator: {uid}')
|
|
|
|
# If ALLOWED_USER list is empty or contains empty string, allow all users
|
|
if not ALLOWED_USER or (len(ALLOWED_USER) == 1 and ALLOWED_USER[0] == ''):
|
|
print("No user restrictions, allowing all users")
|
|
return True
|
|
|
|
# Check if the user is in the allowed list
|
|
for allowed in ALLOWED_USER:
|
|
allowed = allowed.strip()
|
|
if allowed and str(uid) == allowed:
|
|
print(f"User {uid} is allowed to post")
|
|
return True
|
|
|
|
print(f"User {uid} is not in the allowed list")
|
|
return False
|
|
|
|
|
|
###############################################
|
|
# Check if category is disallowed
|
|
###############################################
|
|
def check_category_disallowed(topic_id):
|
|
# Get the category ID for the given topic
|
|
cid = get_topic_cid(topic_id)
|
|
|
|
if cid is None:
|
|
print("Could not determine category ID")
|
|
return False
|
|
|
|
print(f'Category ID of the topic: {cid}')
|
|
|
|
# If DISALLOWED_CATEGORIES list is empty or contains empty string, allow all categories
|
|
if not DISALLOWED_CATEGORIES or (len(DISALLOWED_CATEGORIES) == 1 and DISALLOWED_CATEGORIES[0] == ''):
|
|
print("No category restrictions, allowing all categories")
|
|
return False
|
|
|
|
# Check if the category is in the disallowed list
|
|
for disallowed in DISALLOWED_CATEGORIES:
|
|
disallowed = disallowed.strip()
|
|
if disallowed and str(cid) == disallowed:
|
|
print(f"Category {cid} is disallowed")
|
|
return True
|
|
|
|
print(f"Category {cid} is allowed")
|
|
return False
|
|
|
|
###############################################
|
|
# Display topic details without posting to Mastodon
|
|
###############################################
|
|
def display_topic_details(topic_id):
|
|
print("\n--- Topic Details ---")
|
|
print(f"Topic ID: {topic_id}")
|
|
|
|
# Get and display the topic slug
|
|
title = get_topic_slug(topic_id)
|
|
print(f"Title/Slug: {title}")
|
|
|
|
# Get and display tags
|
|
tags = get_topic_tags(topic_id)
|
|
print(f"Tags: {tags}")
|
|
|
|
# Get and display user ID
|
|
uid = get_topic_uid(topic_id)
|
|
print(f"User ID: {uid}")
|
|
|
|
# Get and display category ID
|
|
cid = get_topic_cid(topic_id)
|
|
print(f"Category ID: {cid}")
|
|
|
|
# Display what would be posted to Mastodon
|
|
# Check if the title already contains the topic_id to avoid duplication
|
|
if title.startswith(f"{topic_id}/"):
|
|
# If title already has the ID prefix (like "1672/test"), use just the title
|
|
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{title}"
|
|
else:
|
|
status_parameter = f"Ein neuer Forumbeitrag: {URL}/{topic_id}/{title}"
|
|
|
|
if tags:
|
|
status_parameter += f" {tags}"
|
|
|
|
print("\nWould post to Mastodon:")
|
|
print(status_parameter)
|
|
print("--------------------\n")
|
|
|
|
###############################################
|
|
# Post to Mastodon
|
|
###############################################
|
|
def post_to_mastodon(topic_id, status_message):
|
|
if DEBUG:
|
|
print(f"Would post to Mastodon: {status_message}")
|
|
return True
|
|
|
|
try:
|
|
# Headers for the Mastodon API request
|
|
headers = {
|
|
'Authorization': f'Bearer {MASTODON_TOKEN}'
|
|
}
|
|
|
|
# Data for the POST request
|
|
data = {
|
|
'status': status_message
|
|
}
|
|
|
|
print(f"Posting to Mastodon: {status_message}")
|
|
|
|
# Send the request to Mastodon
|
|
response = requests.post(
|
|
MASTODON_URL,
|
|
headers=headers,
|
|
data=data
|
|
)
|
|
|
|
# Check if the request was successful
|
|
if response.ok:
|
|
print('Mastodon post successful!')
|
|
return True
|
|
else:
|
|
print(f'Mastodon post failed with status code: {response.status_code}')
|
|
print(f'Error response: {response.text}')
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f'Error posting to Mastodon: {e}')
|
|
return False
|
|
|
|
###############################################
|
|
# Main
|
|
###############################################
|
|
|
|
print("Starting NodeBB to Mastodon script" + (" (Debug Mode)" if DEBUG else ""))
|
|
|
|
# Get the latest numeric topic ID from the database
|
|
last_topic_id = get_last_numeric_topic_id()
|
|
print('Last Numeric Topic ID:', last_topic_id)
|
|
|
|
if not last_topic_id:
|
|
print("No numeric topic IDs found, exiting")
|
|
exit()
|
|
|
|
# Read the last processed topic ID from file
|
|
try:
|
|
with open("last_topic_id.txt", "r") as f:
|
|
last_processed_id = f.readline().strip()
|
|
print(f"Last processed topic ID: {last_processed_id}")
|
|
except (FileNotFoundError, IOError):
|
|
print("No previous topic ID file found, creating new one")
|
|
last_processed_id = "0" # Start from 0 if no file exists
|
|
|
|
# Check if this is a new topic
|
|
if int(last_topic_id) > int(last_processed_id):
|
|
print(f"New topic detected (Last: {last_processed_id}, Current: {last_topic_id})")
|
|
|
|
# Display details about the new topic
|
|
display_topic_details(last_topic_id)
|
|
|
|
# Check if the category is disallowed
|
|
if check_category_disallowed(last_topic_id):
|
|
print("Topic is in a disallowed category, not posting to Mastodon")
|
|
elif not check_uid(last_topic_id):
|
|
print("User is not allowed to post to Mastodon")
|
|
else:
|
|
# Get the topic details for posting
|
|
title = get_topic_slug(last_topic_id)
|
|
tags = get_topic_tags(last_topic_id)
|
|
|
|
# Prepare the status message
|
|
if title.startswith(f"{last_topic_id}/"):
|
|
# If title already has the ID prefix (like "1672/test"), use just the title
|
|
status_message = f"Ein neuer Forumbeitrag: {URL}/{title}"
|
|
else:
|
|
# Otherwise, include the ID in the URL
|
|
status_message = f"Ein neuer Forumbeitrag: {URL}/{last_topic_id}/{title}"
|
|
|
|
# Add tags if available
|
|
if tags:
|
|
status_message += f" {tags}"
|
|
|
|
# Post to Mastodon
|
|
success = post_to_mastodon(last_topic_id, status_message)
|
|
|
|
# Update the last_topic_id.txt file if successful or in debug mode
|
|
if success or DEBUG:
|
|
if DEBUG:
|
|
print(f"Debug mode: Would update last_topic_id.txt to {last_topic_id}")
|
|
else:
|
|
print(f"Updating last_topic_id.txt to {last_topic_id}")
|
|
with open("last_topic_id.txt", "w") as f:
|
|
f.write(last_topic_id)
|
|
else:
|
|
print(f"No new topics since last run (Last: {last_processed_id}, Current: {last_topic_id})") |