#!/usr/bin/env python # -*- coding: utf-8 -*- import tweetstream, psycopg2, cld from email.utils import parsedate_tz, mktime_tz from datetime import datetime from re import sub from time import sleep conn = psycopg2.connect(database='tweets', user='hunter2', host='localhost') cur = conn.cursor() locations = ["-180,-90", "180,90"] uname = 'hunter2' passwd = 'hunter2' count = 0 skipped = 0 backoff_count = 0 def insert_tweet(tweet): global skipped, count if not 'coordinates' in tweet or tweet['coordinates'] == None: skipped += 1 return tid = tweet['id'] date = parsedate_tz(tweet['created_at']) date = datetime.fromtimestamp(mktime_tz(date)).strftime('%s') at = int(date) screen_name = tweet['user']['screen_name'] lon, lat = tweet['coordinates']['coordinates'] # The user's interface language (limited to one of like two dozen): ulang = tweet['user']['lang'] # Remove usernames: text = sub(r'(@[a-z0-9_]* ?)', r'', tweet['text']) # And links: text = sub(r'( https?://[^ ]*)', r'', text) # Hashtags stay because they're often in the surrounding language. text = text.encode('utf-8') #text = unicode(text, errors='strict') #print tweet if ulang in cld.LANGUAGES: detected = cld.detect(text, hintLanguageCode=ulang) else: detected = cld.detect(text) tlang = detected[1] if tlang == 'un': tlangp = 'null' else: # Estimated probability of the matched language, % => n/1. #print detected tlangp = detected[4][0][2]/100.0 #print ulang, tlang, tlangp, text try: cur.execute( '''insert into tweets (tid, at, screen_name, lon, lat, ulang, tlang, tlangp) values (%s, to_timestamp(%s), '%s', %s, %s, '%s', '%s', %s);''' % (tid, at, screen_name, lon, lat, ulang, tlang, tlangp)) count += 1 except psycopg2.IntegrityError: conn.rollback() # Realistically, commit() is not expensive at this scale, but as long as # we're doing a bit of logging we might as well commit in relative bulk. if count % 100 == 0: conn.commit() print count, skipped, tid, at if __name__ == '__main__': while True: try: with tweetstream.FilterStream(uname, passwd, locations=locations) as stream: for tweet in stream: insert_tweet(tweet) except tweetstream.ConnectionError: delay = 2**backoff_count print "ConnectionError! Backing off for %s seconds." % (delay) sleep(delay) if backoff_power < 10: backoff_power += 1 ''' create table tweets (tid bigint primary key, at timestamp with time zone, screen_name text, lon float, lat float, ulang text, tlang text, tlangp float); '''