Spam filter for mbox style spools
(a work in progress)

This is still a work in progress, but it works well enough for me that I put it in my cron.

What I do is set up four POP3 mailboxes.

mbox
mbox_keep
mbox_spam
mbox_unsure

All my mail goes to the 'mbox' mailbox. The 'noah_keep' mailbox is the one that I actually read. That is, this is the account that I point my mail client. The spam filter takes all mail from the 'mbox' mailbox and sorts it into the three other mailboxes.

spam_filter.py mbox

produces the following files in mbox format: mbox_keep, mbox_unsure, mbox_spam

mbox_keep

everything here matched the WHITELIST or WHITEWORDS.

mbox_spam

everything here matched the BLACKLIST or was bigger than the size limit.

mbox_unsure

everything here is not on either list and it is less than the size limit. In general this is mostly spam. You may want to check from time to time to see if anything interesting is in here.


WHITELIST

This is a list of email address patterns. They can be full email addresses or regular expression patterns. One pattern per line.

tom@example.net
.*example.org

BLACKLIST

This is a list of email address patterns.

sue@example.net
jim@example.net
.*example.com

WHITEWORDS

My Project Name
bypass_password

BLACKWORDS

The spam filter is smart enough to catch obfuscations such as "V. I. A. G. R. A" and it converts fake unicode such as vïágrã to regular ASCII before checking the BLACKWORDS list.

viagra
vicodin
phentermine


#!/home/noah1/python/bin/python 
#!/usr/bin/env python
"""This is a spam filter.
It requires four external files: a WHITELIST of email addresses that 
specify mail that is always saves; a BLACKLIST of email addresses that
specify email addresses that are always ignored; a BLACKWORDS list of
email subject lines that are always ignored; and a WHITEWORDS list
that contains patterns that are matched in the body of a message. If a
message body matches the WHITEWORDS list then the message is saved
even if the message was not in the WHITELIST.

There are three ways a message is dispatched. It can be totally
ignored. It can be sent to the *.out mbox file; or it can be sent to
the *.reject mbox file. Generally only messages in the BLACKLIST are
totally ignored. All other messages are sent to the *.unsure mbox
where you can browse them to make sure. The only exception is if an
unknown message is greater than the SIZELIMIT then it will be ignored.
# Rule: If an address is in the WHITELIST then it is a keeper.
# Rule: If an address is in the BLACKLIST then it is spam.
# Rule: If a subject matches the BLACKWORDS then it is spam.
# Rule: If a message or subject matches the WHITELIST then it is a keeper.
# Rule: If an unsure message is bigger than the size limit then it is spam.
# Rule: Everything else is unsure.

To do:
    Add DNSBL http://www.spamhaus.org/sbl/howtouse.html

Noah Spurrier
20040228
Public Domain
free of licenses or restrictions
"""
import sys, os, string, re, time
import mailbox
import email, email.Errors
import traceback


VERBOSE = 1
MAILBOX_IN = sys.argv[1]
MAILBOX_WORKING = MAILBOX_IN + '_working'
MAILBOX_UNSURE = MAILBOX_IN + '_unsure'
MAILBOX_SPAM = MAILBOX_IN + '_spam'
MAILBOX_KEEP = MAILBOX_IN + '_keep'
WHITELIST = 'WHITELIST'
BLACKLIST = 'BLACKLIST' 
WHITEWORDS = 'WHITEWORDS'
BLACKWORDS = 'BLACKWORDS'
SIZELIMIT_UNSURE = 22000 # Bytes
POP_LOCK = 'noah.pop.'

SPAM = 0
UNSURE = 1
KEEP = 2

def main ():
    if not os.path.isfile (MAILBOX_IN):
        if VERBOSE: print 'There is no inbox mbox file. Quitting...'
        return False
    if os.path.isfile (MAILBOX_WORKING):
        if VERBOSE: print 'There is already an inbox work file. See %s. Quitting...' % (MAILBOX_WORKING)
        return False
    # Move the mbox so that we can work on it. If the POP3 server activates while we
    # are working then it will just create a new file. This is atomic, so it's safe.
    if not lock (POP_LOCK):
        if VERBOSE: print 'The mbox is busy. See %s. Quitting...' % (pop_lock_file)
        return False
    os.rename (MAILBOX_IN, MAILBOX_WORKING)
    unlock (POP_LOCK)

    # Read in the patterns of whitelist email addresses.
    # Read in the patterns of blacklist email addresses.
    # Read in the patterns of white words.
    # Read in the patterns of black words.
    whitelist_list = file(WHITELIST).readlines()
    whitelist_list = map (string.strip, whitelist_list)
    whitelist_pattern_list = compile_pattern_list ([i for i in whitelist_list if i != ''])
    blacklist_list = file(BLACKLIST).readlines()
    blacklist_list = map (string.strip, blacklist_list)
    blacklist_pattern_list = compile_pattern_list ([i for i in blacklist_list if i != ''])
    whiteword_list = file(WHITEWORDS).readlines()
    whiteword_list = map (string.strip, whiteword_list)
    whiteword_pattern_list = compile_pattern_list ([i for i in whiteword_list if i != ''])
    blackword_list = file(BLACKWORDS).readlines()
    blackword_list = map (string.strip, blackword_list)
    blackword_pattern_list = compile_pattern_list ([i for i in blackword_list if i != ''])

    # Open the files.
    fp = file(MAILBOX_WORKING,'rb')
    mbox = mailbox.PortableUnixMailbox (fp, msgfactory)
    fout_keep = file(MAILBOX_KEEP, 'a')
    fout_unsure = file(MAILBOX_UNSURE, 'a')
    fout_spam = file(MAILBOX_SPAM, 'a')

# new_subject = 'body matches %s in BLACKWORDS list. | ' % (rule_blackword_document.result.re.pattern) + msg['Subject']
# del msg['Subject']
# msg['Subject'] = new_subject
# document = msg.as_string(True)

    message_count = 0
    message_keep_count = 0
    message_unsure_count = 0
    message_spam_count = 0
    msg = mbox.next()
    while msg is not None:
        message_count += 1
        if type(msg) is type(''):
            if VERBOSE: print 'Message #%d header is malformed. Skipping it.' % (message_count)
            msg = mbox.next()
            continue
        try:
            document = msg.as_string(True)
        except:
            if VERBOSE: print 'Message #%d crashes Python email module. Skipping it. Upgrade after version 2.2.' % (message_count)
            msg = mbox.next()
            continue
        try:
            newaddr = email.Utils.parseaddr(msg['From'])[1]
        except:
            if VERBOSE: print 'Message #%d crashes Python email module. Using unix address. Upgrade after version 2.2' % (message_count)
            newaddr = msg.get_unixfrom()
        if newaddr is None: # Usually means forged mail.
            if VERBOSE: print 'Message #%d is a forgery. Using unix address.' % (message_count)
            newaddr = msg.get_unixfrom()
            if newaddr is None:
                if VERBOSE: print 'Message #%d has an unparsable email address. Skipping it.' % (message_count)
                msg = mbox.next()
                continue
        subject = msg['Subject']
        if subject is None:
            subject = ''
        message_id = msg['Message-ID']

        (result, reason) = run_rules (msg, document, newaddr, subject, whitelist_pattern_list, blacklist_pattern_list, whiteword_pattern_list, blackword_pattern_list)
        if result == SPAM:
            if VERBOSE: print '  SPAM:', reason
            fout_spam.write (document)
            message_spam_count += 1
        elif result == UNSURE:
            if VERBOSE: print 'UNSURE:', reason
            fout_unsure.write (document)
            message_unsure_count += 1
        elif result == KEEP:
            if VERBOSE: print '  KEEP:', reason
            fout_keep.write (document)
            message_keep_count += 1

        msg = mbox.next()

    fout_keep.close()
    fout_unsure.close()
    fout_spam.close()

    merge_mboxes (MAILBOX_WORKING,'WORKING_BACKUP')
    os.remove (MAILBOX_WORKING)

    if VERBOSE:
        print 'Processed %d messages.' % (message_count)
        print '  keep: %d' % (message_keep_count)
        print 'unsure: %d' % (message_unsure_count)
        print '  spam: %d' % (message_spam_count)
    if VERBOSE: print 'Done.'

def run_rules (msg, document, from_address, subject, whitelist_pattern_list, blacklist_pattern_list, whiteword_pattern_list, blackword_pattern_list):
    try:
        match = in_pattern_list (whitelist_pattern_list, from_address)
        if match:
            if has_application_evil_attachments (msg):
                return SPAM, 'Message has .pif or .scr attachment'
            else:
                return KEEP, 'Message matches %s in WHITELIST' % (match.re.pattern)
        match = in_pattern_list (blacklist_pattern_list, from_address)
        if match:
            return SPAM, 'Address matches %s in BLACKLIST.' % (match.re.pattern)
        match = in_pattern_list (whiteword_pattern_list, subject)
        if match:
            return KEEP, 'Subject matches %s in WHITEWORDS list' % (match.re.pattern)
        match = in_pattern_list (whiteword_pattern_list, document)
        if match:
            return KEEP, 'Body matches %s in WHITEWORDS list' % (match.re.pattern)
        match = in_pattern_list (blackword_pattern_list, uber_strip(subject))
        if match:
            return SPAM, 'Subject matches %s in BLACKWORDS list' % (match.re.pattern)
        if len(document) > SIZELIMIT_UNSURE:
            return SPAM, 'Message is too big to be accepted by an unknown sender'
        if has_8bit_chars (subject):
            return SPAM, 'Subject has illegal 8-bit characters'
#        if has_application_attachments (msg): 
#            return SPAM, 'Unknown sender with an attachment -- probably a virus'
        match = in_pattern_list (blackword_pattern_list, uber_strip(document))
        if match:
            return SPAM, 'Body matches %s in BLACKWORDS list' % (match.re.pattern)
        return UNSURE, 'Message yields uncertainty'
    except Exception,e:
        response_message = 'The rule processor raised an exception. Sometimes this is from a bad regular expression.\n'
        response_message += str(e) + '\n'
        if VERBOSE: traceback.print_exc()
        return KEEP, response_message

def msgfactory(fp):
    """This is used as a factory when creating a PortableUnixMailbox.
    """
    try:
        return email.message_from_file(fp)
    except email.Errors.MessageParseError, e:
        # Don't return None since that will stop the mailbox iterator
        return str(e)

def has_8bit_chars (s):
    for c in s:
        if ord(c) >= 0x80:
            return 1
    return 0

NONALPHAS=re.compile(r'[^a-zA-Z]',re.MULTILINE)
def uber_strip (str):
    """This removes all spaces and punctuation and converts unicode to ASCII equivalents.
    """
    return NONALPHAS.sub ('', latin1_to_ascii(str))

def latin1_to_ascii (unicrap):
    """This replaces UNICODE Latin-1 characters with
    something equivalent in 7-bit ASCII. All characters in the standard
    7-bit ASCII range are preserved. In the 8th bit range all the Latin-1
    accented letters are stripped of their accents. Most symbol characters
    are converted to something meaninful. Anything not converted is deleted.
    """
    xlate={0xc0:'A', 0xc1:'A', 0xc2:'A', 0xc3:'A', 0xc4:'A', 0xc5:'A',
        0xc6:'Ae', 0xc7:'C',
        0xc8:'E', 0xc9:'E', 0xca:'E', 0xcb:'E',
        0xcc:'I', 0xcd:'I', 0xce:'I', 0xcf:'I',
        0xd0:'Th', 0xd1:'N',
        0xd2:'O', 0xd3:'O', 0xd4:'O', 0xd5:'O', 0xd6:'O', 0xd8:'O',
        0xd9:'U', 0xda:'U', 0xdb:'U', 0xdc:'U',
        0xdd:'Y', 0xde:'th', 0xdf:'ss',
        0xe0:'a', 0xe1:'a', 0xe2:'a', 0xe3:'a', 0xe4:'a', 0xe5:'a',
        0xe6:'ae', 0xe7:'c',
        0xe8:'e', 0xe9:'e', 0xea:'e', 0xeb:'e',
        0xec:'i', 0xed:'i', 0xee:'i', 0xef:'i',
        0xf0:'th', 0xf1:'n',
        0xf2:'o', 0xf3:'o', 0xf4:'o', 0xf5:'o', 0xf6:'o', 0xf8:'o',
        0xf9:'u', 0xfa:'u', 0xfb:'u', 0xfc:'u',
        0xfd:'y', 0xfe:'th', 0xff:'y',
        0xa1:'!', 0xa2:'{cent}', 0xa3:'{pound}', 0xa4:'{currency}',
        0xa5:'{yen}', 0xa6:'|', 0xa7:'{section}', 0xa8:'{umlaut}',
        0xa9:'{C}', 0xaa:'{^a}', 0xab:'<<', 0xac:'{not}',
        0xad:'-', 0xae:'{R}', 0xaf:'_', 0xb0:'{degrees}',
        0xb1:'{+/-}', 0xb2:'{^2}', 0xb3:'{^3}', 0xb4:"'",
        0xb5:'{micro}', 0xb6:'{paragraph}', 0xb7:'*', 0xb8:'{cedilla}',
        0xb9:'{^1}', 0xba:'{^o}', 0xbb:'>>',
        0xbc:'{1/4}', 0xbd:'{1/2}', 0xbe:'{3/4}', 0xbf:'?',
        0xd7:'*', 0xf7:'/'
        }
    r = []
    for i in unicrap:
        if xlate.has_key(ord(i)):
            r.append (xlate[ord(i)])
        elif ord(i) >= 0x80:
            pass
        else:
            r.append (i)
    return ''.join(r)

def lock (filename):
    """This creates a file lock.
    """
    try:
        fd = os.open (filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
        os.close(fd)
        return 1
    except OSError:
        return 0

def unlock (filename):
    """This removes a file lock.
    """
    os.unlink (filename)

def merge_mboxes (filename_src, filename_dst):
    """This copies one mbox onto the end of another.
    """
    fin = file (filename_src)
    fout = file (filename_dst, 'a')
    blob = fin.read(10000)
    while blob:
        fout.write (blob)
        blob = fin.read(10000)
    fin.close ()
    fout.close ()

def has_application_evil_attachments (msg):
    """This returns True if the message has any applications attachments
        with the extension .pif or .scr.
    """
    ### Maybe a string search would be faster...
    for part in msg.walk():
        if part.get_main_type() == 'application':
            extension = part.get_param('name')[-4:]
            if extension in ['.pif','.scr']:
                return True
    return False

def has_application_attachments (msg):
    """This returns True if the message has any attachments with the main type
        of 'application'. For example 'application/octet-stream' or
        'application/msword'.
    """
    ### Maybe a string search would be faster...
    for part in msg.walk():
        if part.get_main_type() == 'application':
            return True
    return False

def compile_pattern_list (string_list):
    """This takes a list of strings and returns a list of compiled regexs
    with the IGNORECASE flag set true.
    """
    pattern_list = []
    for x in string_list:
        pattern_list.append (re.compile(x, re.IGNORECASE))
    return pattern_list

def in_pattern_list (pattern_list, s):
    """This returns a match object if the string matches any regex in the pattern_list
       otherwise it returns None.
    """
    for cre in pattern_list:
        match = cre.search (s)
        if match is not None:
            return match
    return None

def append_unique (list, item):
    """This appends the item only if the item is not already in the list.
    """
    if item not in list:
        list.append (item)

if __name__ == '__main__':
    try:
        main ()
    except Exception, e:
        print 'FATAL EXCEPTION'
        print str(e)
        traceback.print_exc()
        merge_mboxes (MAILBOX_WORKING,'WORKING_BACKUP')
        os.remove (MAILBOX_WORKING)