| 
import csv
 | 
 | 
import os
 | 
 | 
from operator import itemgetter
 | 
 | 
 | 
 | 
from mastodon import Mastodon
 | 
 | 
from bs4 import BeautifulSoup
 | 
 | 
 | 
 | 
DOMAIN = 'bgme.me'
 | 
 | 
# Scopes: read:accounts read:statuses write:reports admin:read:accounts admin:read:domain_allows admin:read:domain_blocks admin:read:reports admin:write:accounts admin:write:domain_allows admin:write:domain_blocks admin:write:reports
 | 
 | 
ACCESS_TOKEN = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
 | 
 | 
 | 
 | 
IGNORE_ACCOUNTS = []
 | 
 | 
 | 
 | 
 | 
 | 
def get_text_from_html(data: str):
 | 
 | 
    soup = BeautifulSoup(data, 'html.parser')
 | 
 | 
    return soup.text
 | 
 | 
 | 
 | 
 | 
 | 
class Main:
 | 
 | 
    def __init__(self, ACCOUNTS_CSV_FILE: str):
 | 
 | 
        self.ACCOUNTS_CSV_FILE = ACCOUNTS_CSV_FILE
 | 
 | 
        if not os.path.exists(self.ACCOUNTS_CSV_FILE):
 | 
 | 
            raise IOError("Not Found CSV File.")
 | 
 | 
 | 
 | 
        self.spam_accounts = set()
 | 
 | 
        self.infected_domains = set()
 | 
 | 
 | 
 | 
        mastodon = Mastodon(api_base_url='https://' + DOMAIN, access_token=ACCESS_TOKEN)
 | 
 | 
        mastodon.account_verify_credentials()
 | 
 | 
        self.mastodon = mastodon
 | 
 | 
 | 
 | 
    def get_accounts_status(self, account_id: str, limit=20):
 | 
 | 
        return self.mastodon.account_statuses(account_id, exclude_reblogs=True, limit=limit)
 | 
 | 
 | 
 | 
    def print_latest_account_status(self, account_id: str):
 | 
 | 
        status = self.get_accounts_status(account_id, limit=1)
 | 
 | 
        s = status[0]
 | 
 | 
        # print(s['content'])
 | 
 | 
        print(get_text_from_html(s['content']))
 | 
 | 
        return s
 | 
 | 
 | 
 | 
    def report(self, account_id: str, status_id: str):
 | 
 | 
        r = self.mastodon.report(account_id, status_ids=[status_id], comment="SPAM", forward=True, category="spam")
 | 
 | 
        print('Report Account {}'.format(account_id))
 | 
 | 
        return r
 | 
 | 
 | 
 | 
    def start_report_accounts(self):
 | 
 | 
        with open(self.ACCOUNTS_CSV_FILE, 'r') as csvfile:
 | 
 | 
            reader = csv.DictReader(csvfile)
 | 
 | 
            for row in reader:
 | 
 | 
                account_id, username, domain, display_name, avatar_file_name = itemgetter(
 | 
 | 
                    "id", "username", "domain", "display_name", "avatar_file_name")(row)
 | 
 | 
 | 
 | 
                if account_id in IGNORE_ACCOUNTS:
 | 
 | 
                    continue
 | 
 | 
 | 
 | 
                print(
 | 
 | 
                    '\nReport @{username}@{domain} ({account_id}):'.format(username=username, domain=domain,
 | 
 | 
                                                                           account_id=account_id)
 | 
 | 
                )
 | 
 | 
                try:
 | 
 | 
                    s = self.print_latest_account_status(account_id)
 | 
 | 
                    confirm = input(
 | 
 | 
                        "\nWill block account @{username}@{domain}? [Y/n] ".format(username=username, domain=domain))
 | 
 | 
                    if confirm == 'n':
 | 
 | 
                        continue
 | 
 | 
                    self.report(account_id, s['id'])
 | 
 | 
                    self.spam_accounts.add(account_id)
 | 
 | 
                    self.infected_domains.add(domain)
 | 
 | 
                except BaseException as e:
 | 
 | 
                    print(e)
 | 
 | 
 | 
 | 
    def start_block_accounts(self):
 | 
 | 
        for account_id in self.spam_accounts:
 | 
 | 
            try:
 | 
 | 
                print('Block account {}'.format(account_id))
 | 
 | 
                self.mastodon.admin_account_moderate(account_id, action='suspend')
 | 
 | 
            except BaseException as e:
 | 
 | 
                print(e)
 | 
 | 
 | 
 | 
    def start_limit_domains(self):
 | 
 | 
        limit_domains = []
 | 
 | 
        for domain in self.infected_domains:
 | 
 | 
            try:
 | 
 | 
                print('Limit Domain {}'.format(domain))
 | 
 | 
                self.mastodon.admin_create_domain_block(
 | 
 | 
                    domain, severity='silence', private_comment='SPAM', public_comment='SPAM')
 | 
 | 
                limit_domains.append(domain)
 | 
 | 
            except BaseException as e:
 | 
 | 
                print(e)
 | 
 | 
 | 
 | 
        print('\n\nLimit domains:')
 | 
 | 
        for domain in limit_domains:
 | 
 | 
            print(domain)
 | 
 | 
 | 
 | 
    def start(self):
 | 
 | 
        self.start_report_accounts()
 | 
 | 
        self.start_block_accounts()
 | 
 | 
        self.start_limit_domains()
 | 
 | 
 | 
 | 
 | 
 | 
if __name__ == '__main__':
 | 
 | 
    import argparse
 | 
 | 
 | 
 | 
    parser = argparse.ArgumentParser(description='Mastodon Anti-SPAM 管理辅助脚本')
 | 
 | 
    parser.add_argument('csv_file', help='包含疑似帐户的 CSV 文件')
 | 
 | 
    args = parser.parse_args()
 | 
 | 
 | 
 | 
    m = Main(args.csv_file)
 | 
 | 
    m.start()
 |