Python client API¶
Synopsis¶
HarpCaller interaction¶
import json
import harp
rpc = harp.HarpCaller("localhost")
host = rpc.localhost(queue = {"foo": "bar"}, max_exec_time = 15)
# or: `host = rpc.host(host = "localhost", queue = ...)'
# or simply: `host = rpc.localhost', if no job configuration is needed
job = host.stream_infinity()
for msg in job.follow(since = 0):
print json.dumps(msg)
print job.id()
print job.result()
result = rpc("localhost").return_immediate("bar", "baz").get()
job_id = "3f22e0d4-c8ed-11e5-8d66-001e8c140268"
result = rpc.job(job_id).result()
if result is harp.CALL_NOT_FINISHED:
print "still running"
elif result is harp.CALL_CANCELLED:
print "cancelled"
elif ...
harpd(8) interaction¶
import json
import harp
server = harp.HarpServer(
host = "localhost",
user = "username",
password = "secret",
)
print server.current_time()
for msg in server.logs_in_next_minutes(10):
if isinstance(msg, harp.Result):
# last value, nothing more will be returned
print json.dumps(msg.value)
else:
print "log message:", json.dumps(msg)
Description¶
harp is a Python interface to talk to HarpCaller: send a call request,
check call status, retrieve its results, and so on. This part of the API is
asynchronous, so it can easily be used in a web application, even if the
remote procedures that are called take long time to finish.
harp allows also to talk directly to harpd(8) instances, to
provide enough tools for scenarios when running harpcallerd(8)
would be too expensive or troublesome. This part of the API is synchronous, so
all calls block until the called remote procedure finishes.
Python interface¶
HarpCaller interface¶
These classes are intended for interacting with HarpCaller, which means that the interface they provide is asynchronous. They also allow to retrieve results from already finished calls.
-
class
harp.HarpCaller(host, port=3502)¶ Dispatcher server representation.
Parameters: - host – address of dispatcher
- port – port of dispatcher
-
__getattr__(host)¶ Returns: target server representation Return type: RemoteServerConvenience method to get a statically-known host. Returned context is not configured with any job options. To change that, use
RemoteServer.__call__():rpc = HarpCaller("...") # get nginx' status on web01, no job configuration rpc.web01.nginx_status().get() # enqueue nginx restart request rpc.web01(queue = {"service": "nginx", "host": "web01"}) \ .restart_nginx()
See also:
host().
-
connect()¶ Returns: connection to dispatcher server Return type: JSONConnectionConnect to dispatcher server, to send a request and read a reply.
-
host(host, **kwargs)¶ Parameters: - host (string) – target RPC server
- queue (dict) – name of a queue to wait in
- concurrency (positive integer) – number of simultaneously running jobs in the queue
- timeout (positive integer (seconds)) – maximum time between consequent reads from the job
- max_exec_time (positive integer (seconds)) – maximum time the job is allowed to take
- info (JSON-serializable data) – arbitrary data to associate with call
Returns: target server representation
Return type: Prepare context (target and job options) for calling a remote procedure on a server.
-
job(job_id)¶ Parameters: job_id (string) – call job identifier Returns: call job representation Return type: RemoteCallGet a job representation, to retrieve from dispatcher its result and other information.
-
request(req)¶ Parameters: req (dict) – request to send to the dispatcher server Returns: reply from the dispatcher server Return type: dict Connect to dispatcher server, to send a request and read a reply.
-
class
harp.RemoteServer(dispatcher, hostname)¶ Target RPC server representation. This representation also carries options to be used for calling a remote procedure (timeout, maximum execution time, queue name, etc.).
Parameters: - dispatcher (
HarpCaller) – dispatcher instance - hostname – name of the target server
See also
__call__().-
__str__()¶ Stringify to target hostname.
-
__getattr__(procedure)¶ Returns: remote procedure representation Return type: RemoteProcedureAny attribute of this object is a callable representation of a remote procedure. To submit a call request, one can use:
rpc = HarpCaller(dispatcher_address) host = rpc.host("remote-server") # this is `RemoteServer' instance host.my_remote_function_name(arg1, arg2, ...)
-
__call__(**kwargs)¶ Parameters: - queue (dict) – name of a queue to wait in
- concurrency (positive integer) – number of simultaneously running jobs in the queue
- timeout (positive integer (seconds)) – maximum time between consequent reads from the job
- max_exec_time (positive integer (seconds)) – maximum time the job is allowed to take
- info (JSON-serializable data) – arbitrary data to associate with call
Returns: selfAdjust job options for this host.
- dispatcher (
-
class
harp.RemoteProcedure(dispatcher, server, procedure)¶ Callable representation of a remote procedure.
Parameters: - dispatcher (
HarpCaller) – dispatcher instance - server (
RemoteServer) – remote server instance - procedure (string) – name of the procedure to call
-
__call__(*args, **kwargs)¶ Returns: call job handle Return type: RemoteCallRequest a call to this procedure on target server to dispatcher.
-
host()¶ Returns: name of server to call this procedure at Return type: string Retrieve host this procedure will be called on.
-
procedure()¶ Returns: name of procedure to call Return type: string Retrieve name of the procedure that will be called.
- dispatcher (
-
class
harp.RemoteCall(dispatcher, job_id)¶ RPC call job representation. Allows to check job’s status, retrieve its results (both stream and end result), and to cancel if it’s still running.
Parameters: - dispatcher (
HarpCaller) – dispatcher instance - job_id (string) – call job identifier
-
class
StreamIterator(remote_call, conn, numbered=False)¶ Iterator for call’s streamed result.
Parameters: - remote_call (
RemoteCall) – object representing a call to iterate through - conn (
JSONConnection) – connection to dispatcher server - numbered (bool) – whether iteration should yield tuples for easy packet ID tracking
-
all()¶ Returns: list of all packets this iterator would yield Retrieve all packets at once in a single list.
- remote_call (
-
RemoteCall.args()¶ Returns: list of JSON-serializable values Return arguments passed to this job.
-
RemoteCall.cancel()¶ Returns: Trueif the job was still running,FalseotherwiseCancel execution of the job.
-
RemoteCall.end_time()¶ Returns: unix timestamp (integer) or NoneReturn time when the job finished, either because function returned, an error occurred, or the job was cancelled.
Noneis returned if the job hasn’t finished yet.
-
RemoteCall.follow(since=None, recent=None, numbered=False)¶ Parameters: - since (non-negative integer) – message number to read stream result from
- recent (non-negative integer) – number of messages before current to read stream result from
- numbered (bool) – if
True, iterator will produce pairs(pktid, msg), withpktidhaving an analogous meaning tosince
Return type: Follow job’s streamed result, already collected and collected in the future, until the job terminates.
According to protocol, if neither
sincenorrecentwere provided, stream behaves asrecent=0was specified.Usage:
rpc = HarpCaller(dispatcher_address) job = rpc.job("e1c7b937-2428-42c2-9f22-f8fcf2906e65") for msg in job.follow(): consume(msg) # alternatively: #for (i,msg) in job.follow(numbered = True): # consume(i, msg)
See also
stream().
-
RemoteCall.get(wait=True)¶ Parameters: wait (bool) – wait for the job to terminate Return type: JSON-serializable data or CALL_NOT_FINISHEDThrows: RemoteException,RemoteError,CancelledException, orCommunicationErrorGet job’s end result, waiting for job’s termination.
See also
result().
-
RemoteCall.host()¶ Returns: unicode Return target host of this job.
-
RemoteCall.id()¶ Return type: string Return this job’s identifier, so job’s data can be recalled at some later time.
-
RemoteCall.info()¶ Returns: infofield previously passed in call optionsReturn
infofield that was specified in job options (RemoteServer.__call__()). The content of this field is completely up to user and may be used to convey additional context between call submitter and call results consumer.
-
RemoteCall.procedure()¶ Returns: unicode Return name of the procedure called in this job.
-
RemoteCall.result(wait=False)¶ Parameters: wait (bool) – wait for the job to terminate Return type: JSON-serializable data, CALL_NOT_FINISHED,CALL_CANCELLED,RemoteException,RemoteError, orCommunicationErrorGet job’s end result, regardless of how the job terminated. If
wait=Falsewas specified and the job is still running,CALL_NOT_FINISHEDis returned.Note that this function returns exception’s instance (
RemoteException,RemoteError,CommunicationError) instead of throwing it.See also
get().
-
RemoteCall.start_time()¶ Returns: unix timestamp (integer) or NoneReturn time when the job started its execution.
Noneis returned if the job was not started (yet?) due to queueing.
-
RemoteCall.stream(since=None, recent=None, numbered=False)¶ Parameters: - since (non-negative integer) – message number to read stream result from
- recent (non-negative integer) – number of messages before current to read stream result from
- numbered (bool) – if
True, iterator will produce pairs(pktid, msg), withpktidhaving an analogous meaning tosince
Return type: Retrieve job’s streamed result collected up until call. Function does not wait for the job to terminate.
According to protocol, if neither
sincenorrecentwere provided, stream behaves assince=0was specified.Usage:
rpc = HarpCaller(dispatcher_address) job = rpc.job("ef581fb8-a0ae-49a3-9eb3-a2cc505b28c9") for msg in job.stream(): consume(msg) # alternatively: #for (i,msg) in job.stream(numbered = True): # consume(i, msg)
See also
follow().
-
RemoteCall.submit_time()¶ Returns: unix timestamp (integer) Return time when the job was submitted for execution.
- dispatcher (
-
harp.CALL_NOT_FINISHED= <CallNotFinished>¶ Value returned instead of RPC call result if the call is still running.
-
harp.CALL_CANCELLED= <CallCancelled>¶ Value returned as a result when RPC call was cancelled.
harpd(8) interface¶
These classes allow direct communication with harpd(8). The interaction is synchronous and ephemeral, unlike that with HarpCaller.
-
class
harp.HarpServer(host, user, password, port=4306, ca_file=None)¶ Parameters: - host – address of harpd(8)
- port – port of harpd(8)
- user – username to authenticate as
- password – password for the username
- ca_file – file with CA certificates to verify server’s
certificate against (or
Nonefor no verification)
harpd(8) server representation.
Typical usage:
server = HarpServer( host = "example.net", user = "username", password = "some password", ca_file = "trusted_ca.cert.pem", ) result = server.some_method("arg1", "arg2")
-
__getattr__(procedure)¶ Parameters: procedure – name of a procedure to call Returns: procedure representation Return type: HarpProcedurePrepare a call to a procedure.
-
class
harp.HarpProcedure(server, name)¶ Procedure on a harpd(8) server.
Instances of this class are callable, as a normal procedure would be.
Parameters: - server (
HarpServer) – harpd(8) where the procedure is to be executed - name (string) – procedure name
-
__call__(*args, **kwargs)¶ Parameters: - args – list of positional arguments (can’t be used with
non-empty
kwargs) - kwargs – list of keyword arguments (can’t be used with non-empty
args)
Returns: remote procedure’s result (single result) or an iterable object (streamed result)
Return type: json-serializable data orHarpStreamIteratorThrows: HarpExceptionor its subclassExecute the procedure and get its result.
If the remote procedure returns single result, this method simply returns this value (dict, list, str/unicode, int/long, float, bool, or
None; or simply: anything other thanHarpStreamIterator).If the remote procedure returns a streamed result, an iterator (
HarpStreamIterator) is returned. This iterator returns a sequence of values (JSON-serializable, as described for single result), with the last value always being aResultinstance.If the RPC call returns an exception,
HarpExceptionis thrown. Any communication error (including unexpected EOF, which would otherwise result in missingResult) is raised as (possibly subclass of)HarpException.- args – list of positional arguments (can’t be used with
non-empty
- server (
-
class
harp.HarpStreamIterator(orig_iterator)¶ Iterator wrapper for iterators to easily tell apart single results and streamed results of call to a procedure on a
HarpServer.
-
class
harp.Result(value)¶ Parameters: value – returned end result of a streamed result Container for end result returned from streaming procedure.
Returned from
HarpStreamIteratoras a last value in iteration.-
value¶ wrapped value
-
Networked JSON interface¶
Classes to simplify working with linewise JSON protocols.
-
class
harp.JSONConnection(host, port)¶ TCP connection, reading and writing JSON lines.
Object of this class is a valid context manager, so it can be used this way:
with JSONConnection(host, port) as conn: conn.send({"key": "value"}) reply = conn.receive()
Parameters: - host – address of dispatcher server
- port – port of dispatcher server
-
connect()¶ Connect to the address specified in constructor. Newly created
JSONConnectionobjects are already connected.
-
receive()¶ Returns: object serializable with jsonmoduleReceive JSON line and deserialize it, typically to a
dict.
-
send(obj)¶ Parameters: obj – object serializable with jsonmoduleSend an object as a JSON line.
-
class
harp.JSONSSLConnection(host, port, ca_file=None)¶ SSL connection, reading and writing JSON lines.
Object of this class is a valid context manager, so it can be used this way:
with JSONSSLConnection(host, port, "/etc/ssl/certs/ca.pem") as conn: conn.send({"key": "value"}) reply = conn.receive()
Parameters: - host – address of dispatcher server
- port – port of dispatcher server
- ca_file – file with CA certificates, or
Noneif no verification should be performed
-
connect()¶ Connect to the address specified in constructor. Newly created
JSONSSLConnectionobjects are already connected.
-
receive()¶ Returns: object serializable with jsonmoduleReceive JSON line and deserialize it, typically to a
dict.
-
send(obj)¶ Parameters: obj – object serializable with jsonmoduleSend an object as a JSON line.
Exceptions¶
-
exception
harp.HarpException¶ Base class for all exceptions thrown in this module.
-
exception
harp.CommunicationError¶ Error in communication with dispatcher.
-
exception
harp.CancelledException¶ Exception thrown when job has no return value due to being cancelled.
-
exception
harp.RemoteException(type, message, data=None)¶ Exception returned/thrown when the remote procedure threw an exception.
Defined fields:
-
type(unicode)¶ Identifier of error type.
-
message(unicode)¶ Complete error message.
-
data¶ Arbitrary JSON-serializable context information.
Noneif nothing was provided by remote side.
-
-
exception
harp.RemoteError(type, message, data=None)¶ Exception returned/thrown when dispatcher couldn’t reach the target RPC server for some reason.
Defined fields:
-
type(unicode)¶ Identifier of error type.
-
message(unicode)¶ Complete error message.
-
data¶ Arbitrary JSON-serializable context information.
Noneif nothing was provided by remote side.
-
See Also¶
- harpd(8)
- harpcallerd(8)