Saturday, September 20, 2008

My python curl.py class template with stdout colors support

This is my curl.py class that I use as a template. It supports stdout colors. When a function is called is displays output of the caller and called functions. Also you can display any string, list, dict,tuple,etc...Very Nice, I like. It's the same thing as using print. Only you add color to the output. I especially like it for situations like. Displaying webpage data. Large amounts of it. Logs and CSV files. You can find patters and display them in different colors. You can use this script to download and upload data to any website. Please correct indentation where necessary.

How to Use:
sess = curl.session("") #create new session
sess.login("http://domain/site/login_post","admin","super_secret_pass")

#!/usr/bin/env python
from ctypes import *
import os, sys, types, urllib, urllib2, urlparse, string, pycurl
import stdout_colours


class curl(object):
"Encapsulate user operations on CGIs through curl."
def __init__(self, base_url=""):
self.func_me_color="white_on_black"
self.soc=stdout_colours.stdout_colors()
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
# These members might be set.
self.base_url = base_url
self.verbosity = 0
# Nothing past here should be modified by the caller.
self.response = ""
self.curlobj = pycurl.Curl()
# Verify that we've got the right site...
self.curlobj.setopt(pycurl.SSL_VERIFYHOST, 2)
# Follow redirects in case it wants to take us to a CGI...
self.curlobj.setopt(pycurl.FOLLOWLOCATION, 1)
self.curlobj.setopt(pycurl.MAXREDIRS, 5)
# Setting this option with even a nonexistent file makes libcurl
# handle cookie capture and playback automatically.
self.curlobj.setopt(pycurl.COOKIEFILE, "/dev/null")
# Set timeouts to avoid hanging too long
self.curlobj.setopt(pycurl.CONNECTTIMEOUT, 30)
self.curlobj.setopt(pycurl.TIMEOUT, 300)
# Set up a callback to capture
def response_callback(x):
self.response += x
self.curlobj.setopt(pycurl.WRITEFUNCTION, response_callback)
self.soc.me_him(['EXIT:',__name__],self.func_me_color)
def set_verbosity(self, level):
"Set verbosity to 1 to see transactions."
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
self.curlobj.setopt(pycurl.VERBOSE, level)
self.soc.me_him(['EXIT:',__name__],self.func_me_color)
def get(self, cgi, params="",verbose=0):
"Ship a GET request to a specified CGI, capture the response body."
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
if params:
cgi += "?" + urllib.urlencode(params)
self.curlobj.setopt(pycurl.URL, os.path.join(self.base_url, cgi))
self.curlobj.setopt(pycurl.HTTPGET, 1)
self.response = ""
self.curlobj.perform()
if verbose > 0:
print self.response
self.soc.me_him(['EXIT:',__name__],self.func_me_color)

def post(self, cgi, params,verbose=0):
"Ship a POST request to a specified CGI, capture the response body.."
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
self.curlobj.setopt(pycurl.URL, os.path.join(self.base_url, cgi))
self.curlobj.setopt(pycurl.POST, 1)
self.curlobj.setopt(pycurl.POSTFIELDS, urllib.urlencode(params))
self.response = ""
self.curlobj.perform()
if verbose>0:
print self.response
self.soc.me_him(['EXIT:',__name__],self.func_me_color)
def upload(self, cgi, file_name, file, verbose=0):
"POST file from localhost to location/cgi."
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
self.curlobj.setopt(pycurl.URL, os.path.join(self.base_url, cgi))
self.curlobj.setopt(pycurl.HTTPPOST,[(file_name, (pycurl.FORM_FILE,file))])
self.response = ""
self.curlobj.perform()
if verbose>0:
print self.response
self.soc.me_him(['EXIT:',__name__],self.func_me_color)
filename), "wb").write(content)

#fnames = ",".join([fname for fname, ct, c in files])
#return HttpResponse("me-%s-RECEIVE-OK[POST=%s,files=%s]" % (request.META["SERVER_PORT"], request.POST.values(), fnames ))

def answered(self, check):
"Does a given check string occur in the response?"
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
self.soc.me_him(['RETURN:',__name__],self.func_me_color)
return self.response.find(check) >= 0
def close(self):
"Close a session, freeing resources."
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
self.curlobj.close()
self.soc.me_him(['EXIT:',__name__],self.func_me_color)

class session(curl):
def login(self, cgisite,username, password):
"""login - cgi="account/login.php",params=(("username",name),("password",pass),("foo","bar")) """
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
self.post(cgisite, (("username",username),
("password",password),
("mode","login"),
("usertype","P"),
("redirect","admin")))
self.soc.me_him(['EXIT:',__name__],self.func_me_color)
def logout(self, cgisite):
"""logout - cgi="account/logout.php" """
self.soc.me_him(['ENTER:',__name__],self.func_me_color)
self.get(cgisite)
self.soc.me_him(['EXIT:',__name__],self.func_me_color)

if __name__ == "__main__":
if len(sys.argv) < 3:
print "Usage: %s \"schema://site/cgi\" \"username\" \"password\"" % sys.argv[0]
site=sys.argv[1]
username=sys.argv[2]
password=sys.argv[3]
sess=session("")
sess.set_verbosity(1)
sess.login(site,username,password)
a=""
for i in range(len(password)):
a+="*"

print "YOU ARE LOGGED IN!",site,username,a
sess.logout()
sess.close()

No comments: