This is still a work in progress, but it works well enough for me that I put it in my cron.
What I do is set up four POP3 mailboxes.
mbox
mbox_keep
mbox_spam
mbox_unsure
All my mail goes to the 'mbox' mailbox. The 'noah_keep' mailbox is the one that I actually read. That is, this is the account that I point my mail client. The spam filter takes all mail from the 'mbox' mailbox and sorts it into the three other mailboxes.
spam_filter.py mbox
produces the following files in mbox format: mbox_keep, mbox_unsure, mbox_spam
mbox_keep
everything here matched the WHITELIST or WHITEWORDS.
mbox_spam
everything here matched the BLACKLIST or was bigger than the size limit.
mbox_unsure
everything here is not on either list and it is less than the size limit. In general this is mostly spam. You may want to check from time to time to see if anything interesting is in here.
WHITELIST
This is a list of email address patterns. They can be full email addresses or regular expression patterns. One pattern per line.
tom@example.net
.*example.org
BLACKLIST
This is a list of email address patterns.
sue@example.net
jim@example.net
.*example.com
WHITEWORDS
My Project Name
bypass_password
BLACKWORDS
The spam filter is smart enough to catch obfuscations such as "V. I. A. G. R. A" and it converts fake unicode such as vïágrã to regular ASCII before checking the BLACKWORDS list.
viagra
vicodin
phentermine
#!/home/noah1/python/bin/python #!/usr/bin/env python """This is a spam filter. It requires four external files: a WHITELIST of email addresses that specify mail that is always saves; a BLACKLIST of email addresses that specify email addresses that are always ignored; a BLACKWORDS list of email subject lines that are always ignored; and a WHITEWORDS list that contains patterns that are matched in the body of a message. If a message body matches the WHITEWORDS list then the message is saved even if the message was not in the WHITELIST. There are three ways a message is dispatched. It can be totally ignored. It can be sent to the *.out mbox file; or it can be sent to the *.reject mbox file. Generally only messages in the BLACKLIST are totally ignored. All other messages are sent to the *.unsure mbox where you can browse them to make sure. The only exception is if an unknown message is greater than the SIZELIMIT then it will be ignored. # Rule: If an address is in the WHITELIST then it is a keeper. # Rule: If an address is in the BLACKLIST then it is spam. # Rule: If a subject matches the BLACKWORDS then it is spam. # Rule: If a message or subject matches the WHITELIST then it is a keeper. # Rule: If an unsure message is bigger than the size limit then it is spam. # Rule: Everything else is unsure. To do: Add DNSBL http://www.spamhaus.org/sbl/howtouse.html Noah Spurrier 20040228 Public Domain free of licenses or restrictions """ import sys, os, string, re, time import mailbox import email, email.Errors import traceback VERBOSE = 1 MAILBOX_IN = sys.argv[1] MAILBOX_WORKING = MAILBOX_IN + '_working' MAILBOX_UNSURE = MAILBOX_IN + '_unsure' MAILBOX_SPAM = MAILBOX_IN + '_spam' MAILBOX_KEEP = MAILBOX_IN + '_keep' WHITELIST = 'WHITELIST' BLACKLIST = 'BLACKLIST' WHITEWORDS = 'WHITEWORDS' BLACKWORDS = 'BLACKWORDS' SIZELIMIT_UNSURE = 22000 # Bytes POP_LOCK = 'noah.pop.' SPAM = 0 UNSURE = 1 KEEP = 2 def main (): if not os.path.isfile (MAILBOX_IN): if VERBOSE: print 'There is no inbox mbox file. Quitting...' return False if os.path.isfile (MAILBOX_WORKING): if VERBOSE: print 'There is already an inbox work file. See %s. Quitting...' % (MAILBOX_WORKING) return False # Move the mbox so that we can work on it. If the POP3 server activates while we # are working then it will just create a new file. This is atomic, so it's safe. if not lock (POP_LOCK): if VERBOSE: print 'The mbox is busy. See %s. Quitting...' % (pop_lock_file) return False os.rename (MAILBOX_IN, MAILBOX_WORKING) unlock (POP_LOCK) # Read in the patterns of whitelist email addresses. # Read in the patterns of blacklist email addresses. # Read in the patterns of white words. # Read in the patterns of black words. whitelist_list = file(WHITELIST).readlines() whitelist_list = map (string.strip, whitelist_list) whitelist_pattern_list = compile_pattern_list ([i for i in whitelist_list if i != '']) blacklist_list = file(BLACKLIST).readlines() blacklist_list = map (string.strip, blacklist_list) blacklist_pattern_list = compile_pattern_list ([i for i in blacklist_list if i != '']) whiteword_list = file(WHITEWORDS).readlines() whiteword_list = map (string.strip, whiteword_list) whiteword_pattern_list = compile_pattern_list ([i for i in whiteword_list if i != '']) blackword_list = file(BLACKWORDS).readlines() blackword_list = map (string.strip, blackword_list) blackword_pattern_list = compile_pattern_list ([i for i in blackword_list if i != '']) # Open the files. fp = file(MAILBOX_WORKING,'rb') mbox = mailbox.PortableUnixMailbox (fp, msgfactory) fout_keep = file(MAILBOX_KEEP, 'a') fout_unsure = file(MAILBOX_UNSURE, 'a') fout_spam = file(MAILBOX_SPAM, 'a') # new_subject = 'body matches %s in BLACKWORDS list. | ' % (rule_blackword_document.result.re.pattern) + msg['Subject'] # del msg['Subject'] # msg['Subject'] = new_subject # document = msg.as_string(True) message_count = 0 message_keep_count = 0 message_unsure_count = 0 message_spam_count = 0 msg = mbox.next() while msg is not None: message_count += 1 if type(msg) is type(''): if VERBOSE: print 'Message #%d header is malformed. Skipping it.' % (message_count) msg = mbox.next() continue try: document = msg.as_string(True) except: if VERBOSE: print 'Message #%d crashes Python email module. Skipping it. Upgrade after version 2.2.' % (message_count) msg = mbox.next() continue try: newaddr = email.Utils.parseaddr(msg['From'])[1] except: if VERBOSE: print 'Message #%d crashes Python email module. Using unix address. Upgrade after version 2.2' % (message_count) newaddr = msg.get_unixfrom() if newaddr is None: # Usually means forged mail. if VERBOSE: print 'Message #%d is a forgery. Using unix address.' % (message_count) newaddr = msg.get_unixfrom() if newaddr is None: if VERBOSE: print 'Message #%d has an unparsable email address. Skipping it.' % (message_count) msg = mbox.next() continue subject = msg['Subject'] if subject is None: subject = '' message_id = msg['Message-ID'] (result, reason) = run_rules (msg, document, newaddr, subject, whitelist_pattern_list, blacklist_pattern_list, whiteword_pattern_list, blackword_pattern_list) if result == SPAM: if VERBOSE: print ' SPAM:', reason fout_spam.write (document) message_spam_count += 1 elif result == UNSURE: if VERBOSE: print 'UNSURE:', reason fout_unsure.write (document) message_unsure_count += 1 elif result == KEEP: if VERBOSE: print ' KEEP:', reason fout_keep.write (document) message_keep_count += 1 msg = mbox.next() fout_keep.close() fout_unsure.close() fout_spam.close() merge_mboxes (MAILBOX_WORKING,'WORKING_BACKUP') os.remove (MAILBOX_WORKING) if VERBOSE: print 'Processed %d messages.' % (message_count) print ' keep: %d' % (message_keep_count) print 'unsure: %d' % (message_unsure_count) print ' spam: %d' % (message_spam_count) if VERBOSE: print 'Done.' def run_rules (msg, document, from_address, subject, whitelist_pattern_list, blacklist_pattern_list, whiteword_pattern_list, blackword_pattern_list): try: match = in_pattern_list (whitelist_pattern_list, from_address) if match: if has_application_evil_attachments (msg): return SPAM, 'Message has .pif or .scr attachment' else: return KEEP, 'Message matches %s in WHITELIST' % (match.re.pattern) match = in_pattern_list (blacklist_pattern_list, from_address) if match: return SPAM, 'Address matches %s in BLACKLIST.' % (match.re.pattern) match = in_pattern_list (whiteword_pattern_list, subject) if match: return KEEP, 'Subject matches %s in WHITEWORDS list' % (match.re.pattern) match = in_pattern_list (whiteword_pattern_list, document) if match: return KEEP, 'Body matches %s in WHITEWORDS list' % (match.re.pattern) match = in_pattern_list (blackword_pattern_list, uber_strip(subject)) if match: return SPAM, 'Subject matches %s in BLACKWORDS list' % (match.re.pattern) if len(document) > SIZELIMIT_UNSURE: return SPAM, 'Message is too big to be accepted by an unknown sender' if has_8bit_chars (subject): return SPAM, 'Subject has illegal 8-bit characters' # if has_application_attachments (msg): # return SPAM, 'Unknown sender with an attachment -- probably a virus' match = in_pattern_list (blackword_pattern_list, uber_strip(document)) if match: return SPAM, 'Body matches %s in BLACKWORDS list' % (match.re.pattern) return UNSURE, 'Message yields uncertainty' except Exception,e: response_message = 'The rule processor raised an exception. Sometimes this is from a bad regular expression.\n' response_message += str(e) + '\n' if VERBOSE: traceback.print_exc() return KEEP, response_message def msgfactory(fp): """This is used as a factory when creating a PortableUnixMailbox. """ try: return email.message_from_file(fp) except email.Errors.MessageParseError, e: # Don't return None since that will stop the mailbox iterator return str(e) def has_8bit_chars (s): for c in s: if ord(c) >= 0x80: return 1 return 0 NONALPHAS=re.compile(r'[^a-zA-Z]',re.MULTILINE) def uber_strip (str): """This removes all spaces and punctuation and converts unicode to ASCII equivalents. """ return NONALPHAS.sub ('', latin1_to_ascii(str)) def latin1_to_ascii (unicrap): """This replaces UNICODE Latin-1 characters with something equivalent in 7-bit ASCII. All characters in the standard 7-bit ASCII range are preserved. In the 8th bit range all the Latin-1 accented letters are stripped of their accents. Most symbol characters are converted to something meaninful. Anything not converted is deleted. """ xlate={0xc0:'A', 0xc1:'A', 0xc2:'A', 0xc3:'A', 0xc4:'A', 0xc5:'A', 0xc6:'Ae', 0xc7:'C', 0xc8:'E', 0xc9:'E', 0xca:'E', 0xcb:'E', 0xcc:'I', 0xcd:'I', 0xce:'I', 0xcf:'I', 0xd0:'Th', 0xd1:'N', 0xd2:'O', 0xd3:'O', 0xd4:'O', 0xd5:'O', 0xd6:'O', 0xd8:'O', 0xd9:'U', 0xda:'U', 0xdb:'U', 0xdc:'U', 0xdd:'Y', 0xde:'th', 0xdf:'ss', 0xe0:'a', 0xe1:'a', 0xe2:'a', 0xe3:'a', 0xe4:'a', 0xe5:'a', 0xe6:'ae', 0xe7:'c', 0xe8:'e', 0xe9:'e', 0xea:'e', 0xeb:'e', 0xec:'i', 0xed:'i', 0xee:'i', 0xef:'i', 0xf0:'th', 0xf1:'n', 0xf2:'o', 0xf3:'o', 0xf4:'o', 0xf5:'o', 0xf6:'o', 0xf8:'o', 0xf9:'u', 0xfa:'u', 0xfb:'u', 0xfc:'u', 0xfd:'y', 0xfe:'th', 0xff:'y', 0xa1:'!', 0xa2:'{cent}', 0xa3:'{pound}', 0xa4:'{currency}', 0xa5:'{yen}', 0xa6:'|', 0xa7:'{section}', 0xa8:'{umlaut}', 0xa9:'{C}', 0xaa:'{^a}', 0xab:'<<', 0xac:'{not}', 0xad:'-', 0xae:'{R}', 0xaf:'_', 0xb0:'{degrees}', 0xb1:'{+/-}', 0xb2:'{^2}', 0xb3:'{^3}', 0xb4:"'", 0xb5:'{micro}', 0xb6:'{paragraph}', 0xb7:'*', 0xb8:'{cedilla}', 0xb9:'{^1}', 0xba:'{^o}', 0xbb:'>>', 0xbc:'{1/4}', 0xbd:'{1/2}', 0xbe:'{3/4}', 0xbf:'?', 0xd7:'*', 0xf7:'/' } r = [] for i in unicrap: if xlate.has_key(ord(i)): r.append (xlate[ord(i)]) elif ord(i) >= 0x80: pass else: r.append (i) return ''.join(r) def lock (filename): """This creates a file lock. """ try: fd = os.open (filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL) os.close(fd) return 1 except OSError: return 0 def unlock (filename): """This removes a file lock. """ os.unlink (filename) def merge_mboxes (filename_src, filename_dst): """This copies one mbox onto the end of another. """ fin = file (filename_src) fout = file (filename_dst, 'a') blob = fin.read(10000) while blob: fout.write (blob) blob = fin.read(10000) fin.close () fout.close () def has_application_evil_attachments (msg): """This returns True if the message has any applications attachments with the extension .pif or .scr. """ ### Maybe a string search would be faster... for part in msg.walk(): if part.get_main_type() == 'application': extension = part.get_param('name')[-4:] if extension in ['.pif','.scr']: return True return False def has_application_attachments (msg): """This returns True if the message has any attachments with the main type of 'application'. For example 'application/octet-stream' or 'application/msword'. """ ### Maybe a string search would be faster... for part in msg.walk(): if part.get_main_type() == 'application': return True return False def compile_pattern_list (string_list): """This takes a list of strings and returns a list of compiled regexs with the IGNORECASE flag set true. """ pattern_list = [] for x in string_list: pattern_list.append (re.compile(x, re.IGNORECASE)) return pattern_list def in_pattern_list (pattern_list, s): """This returns a match object if the string matches any regex in the pattern_list otherwise it returns None. """ for cre in pattern_list: match = cre.search (s) if match is not None: return match return None def append_unique (list, item): """This appends the item only if the item is not already in the list. """ if item not in list: list.append (item) if __name__ == '__main__': try: main () except Exception, e: print 'FATAL EXCEPTION' print str(e) traceback.print_exc() merge_mboxes (MAILBOX_WORKING,'WORKING_BACKUP') os.remove (MAILBOX_WORKING)