| Server IP : 104.21.37.246 / Your IP : 104.23.243.33 [ Web Server : Apache System : Linux cpanel01wh.bkk1.cloud.z.com 2.6.32-954.3.5.lve1.4.59.el6.x86_64 #1 SMP Thu Dec 6 05:11:00 EST 2018 x86_64 User : cp648411 ( 1354) PHP Version : 7.2.34 Disable Function : NONE Domains : 0 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /etc/mail/spamassassin/CMAE-SA/example/python/ |
Upload File : |
#! /usr/bin/python
#
""" @package client_example
This package provide an example of a python script that queries a
Cloudmark Authority Server using the pycurl module (which is available
from http://http://pycurl.sourceforge.net/ )
"""
import sys
import getopt
import json
import pycurl
from StringIO import StringIO
import mbox
g_raw_output = False
g_usage_string = "usage: client_example.py [-r] [-I ip] [-a file] [-m file] [-C type] host"
def do_GET_request(c, url, req, arg) :
""" Send a GET request to the server.
Format and send a get request to the server.
On success, return the response as a string.
On failure print the error returned by curl and return an empty string
Keyword Arguments:
c -- the curl handle
url -- the url of the host (including port if necessary)
req -- the actual request
arg -- the arguments to the request as a list of strings or None
"""
buf = StringIO() # Allocate a stringIO instance to receive the response
# Build up the request.
req = url + req
if arg is not None:
req += ('/' + arg)
try :
c.setopt(c.URL, req) #set the host url
c.setopt(c.WRITEDATA, buf) #set the receive buffer
c.perform() #and let curl do its magic.
except pycurl.error, reason :
print "Curl error: ", reason[1]
return ''
return buf.getvalue() #return the response
def do_POST_request(c , url, req, parts) :
""" Send a POST request to the server.
Format and send a get request to the server.
On success, return the response as a string.
On failure print the error returned by curl and return an empty string
Keyword Arguments:
c -- the curl handle
url -- the url of the host (including port if necessary)
req -- the actual request
parts -- the parts of the POST as a dictionary keyed by part name
"""
buf = StringIO() # Allocate a stringIO instance to receive the response
# extract the form parts from the dictionary and build them up into
# a list of tuples as pycurl expects, the add the parts to the form
formpart = []
if parts :
for k,v in parts.iteritems() :
formpart += [(k, (pycurl.FORM_CONTENTS, v))]
if formpart :
c.setopt(pycurl.HTTPPOST, formpart)
req = url + req
try :
c.setopt(c.URL, req) #set the host url
c.setopt(c.WRITEDATA, buf) #set the receive buffer
c.perform() #and let curl do its magic.
response = c.getinfo(c.RESPONSE_CODE) # fetch our http response
except pycurl.error, reason :
print "Curl error: ", reason[1]
return (-1, '')
return (response, buf.getvalue()) #return the response
def fetch_element(key, dict) :
""" Fetch an element of a dictionary if it exists.
Return the specified element of the dictionary if it exists,
else return an empty string.
Keyword Arguments.
key -- the key to search for
dict -- the dictionary
"""
return dict[key] if key in dict else ''
def fetch_element_or_none(key, dict) :
""" Fetch an element of a dictionary if it exists.
Return the specified element of the dictionary if it exists,
else return None.
Keyword Arguments.
key -- the key to search for
dict -- the dictionary
"""
return dict[key] if key in dict else None
def check_JSON_response(j) :
""" Check for an error in a JSON response
Test for the presence of an HTTP error in a
JSON response returned by the server. If one is found,
print an appropriate message and return False.
Otherwise return True.
Keyword Arguments
j -- The JSON string to test
"""
h = fetch_element_or_none("http_error", j)
if h :
e = h["error"]
d = h["details"]
print "Authority Server returned HTTP error: ", e, " details: ", d
return False
return True
def do_ping(c, host) :
""" Send a ping to the server.
Keyword Arguments:
c -- the curl handle
host -- the server URL
"""
resp = do_GET_request(c, host, "/score/v2/ping", None)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if check_JSON_response(x) :
print 'Ping OK'
else :
print "Empty Response"
def do_max_message_size(c, host) :
""" Request the maximum message size information from the server.
Keyword Arguments:
c -- the curl handle
host -- the server URL
"""
resp = do_GET_request(c, host, "/score/v2/max-message-size", None)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if check_JSON_response(x) :
print "max message size: ", x["maxMessageSize"]
else :
print "Empty Response"
def do_licensing(c, host):
""" Request the licensing information from the server.
Keyword Arguments:
c -- the curl handle
host -- the server URL
"""
resp = do_GET_request(c, host, "/score/v2/license", None)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if check_JSON_response(x) :
print "licensing info:"
for k,v in x.iteritems() :
if k != "query_status" :
print " ", k , ": ", v
else :
print "Empty Response"
def do_microupdate_versions(c, host):
""" Request the current microupdate versions form the server.
Keyword Arguments:
c -- the curl handle
host -- the server URL
"""
resp = do_GET_request(c, host, "/score/v2/microupdate-versions", None)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if check_JSON_response(x) :
print "microupdate versions:"
for k,v in x.iteritems() :
if k != "query_status" :
print " ", k , ": ", v
else :
print "Empty Response"
def do_supported_attributes(c, host):
""" Request the supported attributer information form the server.
Keyword Arguments:
c -- the curl handle
host -- the server URL
"""
resp = do_GET_request(c, host, "/score/v2/supported-attributes", None)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if check_JSON_response(x) :
print "supported attributes:"
att = fetch_element_or_none("resultAttributes", x)
if att :
for a in att :
print " ", a["attribute"]
else :
print "Empty Response"
def do_runtime(c, host):
""" Request the runtime information form the server.
Keyword Arguments:
c -- the curl handle
host -- the server URL
"""
resp = do_GET_request(c, host, "/score/v2/runtime", None)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if check_JSON_response(x) :
print "runtime information"
for k,v in x.iteritems() :
if type(v) is list :
s = ""
for ve in v :
if s != "" :
s += ", "
s += ve
v = s
print " ", k , ": ", v
else :
print "Empty Response"
def do_score_ip(c, host, ip) :
""" Score an IP address
Keyword Arguments:
c -- the curl handle
host -- the server URL
ip -- the ip addr to score
"""
resp = do_GET_request(c, host, "/score/v2/ip", ip)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if check_JSON_response(x) :
print "/score/v2/ip response - score: ", fetch_element("score", x)
else :
print "Empty Response"
def do_score_analysis(c, host, analysis) :
""" Score an analysis string
Keyword Arguments:
c -- the curl handle
host -- the server URL
analysis -- the analysis string to score
"""
pst = {}
pst["analysis"] = analysis #the only form part is the analysis string
(code, resp) = do_POST_request(c, host, "/score/v2/analysis", pst)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if code >= 400 :
print "server returned ", code, ". code: ", fetch_element("code", x), " message: ", fetch_element("message", x)
elif check_JSON_response(x) :
print "/score/v2/analysis response - score: ", fetch_element("score", x)
else :
print "Empty Response"
def do_score_general(c, host, func,
rfc822, helo_domain, mail_from, receipt_to, conn_ip) :
""" Score a message using either the score_message, score_content or score_sender calls.
Keyword Arguments:
c -- the curl handle
host -- the server URL
func -- either "message", "content" or "sender"
rfc822 -- the entire mail, as a string
helo_domain -- the HELO domain from the envelope, or None.
mail_from -- the MAIL FROM field from the enelope, or None.
receipt_to -- a list of receipt_to from the the RCPT TO field in the envelope, or None
connip -- the connecting IP address, or None
"""
pst = {}
pst["rfc822"] = rfc822
if helo_domain :
pst["helo_domain"] = helo_domain
if mail_from :
pst["mail_from"] = mail_from
if receipt_to :
pst["rcpt_to"] = receipt_to
if conn_ip :
pst["conn_ip"] = conn_ip
(code, resp) = do_POST_request(c, host, func, pst)
if resp :
if g_raw_output :
print resp
else :
x = json.JSONDecoder().decode(resp)
if code >= 400 :
print "server returned ", code, ". code: ", fetch_element("code", x), " message: ", fetch_element("message", x)
elif check_JSON_response(x) :
r = func + " response - score "
r += str(fetch_element("score", x))
r += " analysis: "
r += fetch_element("analysis", x)
print r
else :
print "Empty Response"
def main(argv) :
""" The mainline """
global g_raw_output
ipaddr = None
score_method = "message" #set the default vale for the score method
mbox_file = None
analysis_file = None
c = pycurl.Curl() #initial the curl library
#fetch the command line args.
try :
opts, args = getopt.gnu_getopt(argv, "rI:a:C:m:", "")
except getopt.GetoptError, reason :
print reason
sys.exit(g_usage_string)
# Must specify a host.
if args :
host = args[0]
else :
print "No host specified"
sys.exit(g_usage_string)
# Run through our command line args and set the appropriage flags
if opts :
for o, a in opts :
if o == '-I' : # Score an IP address
ipaddr = a
elif o == '-r' : # Print the raw JSON response
g_raw_output = True
elif o == '-C' : # Set the score method to use with -m
score_method = a
elif o == '-m' : # An mbox or single mail to score
mbox_file = a
elif o == '-a' : # An analysis file to score.
analysis_file = a
if ((score_method != "message") &
(score_method != "content") &
(score_method != "sender")) :
print "Arg to -C must be message, content or sender"
sys.exit(g_usage_string)
# If no actions specified on the command line, just fetch config
# and status information
if (not ipaddr) & (not mbox_file) & (not analysis_file) :
do_ping(c, host)
print
do_max_message_size(c, host)
print
do_licensing(c, host)
print
do_microupdate_versions(c, host)
print
do_supported_attributes(c, host)
print
do_runtime(c, host)
# score an ip
if ipaddr :
do_score_ip(c, host, ipaddr)
if mbox_file :
try :
box = mbox.mbox(mbox_file)
except IOError, reason:
print reason
sys.exit(g_usage_string)
for m in box :
subj = m.fetch_header("Subject: ")
if subj :
print subj
do_score_general(c, host, "/score/v2/"+score_method, m.text,
None, None, None, None)
print
if analysis_file :
try:
f = open(analysis_file, "r")
except IOError, reason:
print reason
sys.exit(g_usage_string)
for l in f :
do_score_analysis(c, host, l)
c.close() # close the curl connection
if __name__ == "__main__" :
main(sys.argv[1:])