Python client API

Synopsis

HarpCaller interaction

import json
import harp

rpc = harp.HarpCaller("localhost")

host = rpc.localhost(queue = {"foo": "bar"}, max_exec_time = 15)
# or: `host = rpc.host(host = "localhost", queue = ...)'
# or simply: `host = rpc.localhost', if no job configuration is needed
job = host.stream_infinity()
for msg in job.follow(since = 0):
    print json.dumps(msg)

print job.id()
print job.result()

result = rpc("localhost").return_immediate("bar", "baz").get()

job_id = "3f22e0d4-c8ed-11e5-8d66-001e8c140268"
result = rpc.job(job_id).result()
if result is harp.CALL_NOT_FINISHED:
    print "still running"
elif result is harp.CALL_CANCELLED:
    print "cancelled"
elif ...

harpd(8) interaction

import json
import harp

server = harp.HarpServer(
    host = "localhost",
    user = "username",
    password = "secret",
)
print server.current_time()
for msg in server.logs_in_next_minutes(10):
    if isinstance(msg, harp.Result):
        # last value, nothing more will be returned
        print json.dumps(msg.value)
    else:
        print "log message:", json.dumps(msg)

Description

harp is a Python interface to talk to HarpCaller: send a call request, check call status, retrieve its results, and so on. This part of the API is asynchronous, so it can easily be used in a web application, even if the remote procedures that are called take long time to finish.

harp allows also to talk directly to harpd(8) instances, to provide enough tools for scenarios when running harpcallerd(8) would be too expensive or troublesome. This part of the API is synchronous, so all calls block until the called remote procedure finishes.

Python interface

HarpCaller interface

These classes are intended for interacting with HarpCaller, which means that the interface they provide is asynchronous. They also allow to retrieve results from already finished calls.

class harp.HarpCaller(host, port=3502)

Dispatcher server representation.

Parameters:
  • host – address of dispatcher
  • port – port of dispatcher
__getattr__(host)
Returns:target server representation
Return type:RemoteServer

Convenience method to get a statically-known host. Returned context is not configured with any job options. To change that, use RemoteServer.__call__():

rpc = HarpCaller("...")
# get nginx' status on web01, no job configuration
rpc.web01.nginx_status().get()
# enqueue nginx restart request
rpc.web01(queue = {"service": "nginx", "host": "web01"}) \
   .restart_nginx()

See also: host().

connect()
Returns:connection to dispatcher server
Return type:JSONConnection

Connect to dispatcher server, to send a request and read a reply.

host(host, **kwargs)
Parameters:
  • host (string) – target RPC server
  • queue (dict) – name of a queue to wait in
  • concurrency (positive integer) – number of simultaneously running jobs in the queue
  • timeout (positive integer (seconds)) – maximum time between consequent reads from the job
  • max_exec_time (positive integer (seconds)) – maximum time the job is allowed to take
  • info (JSON-serializable data) – arbitrary data to associate with call
Returns:

target server representation

Return type:

RemoteServer

Prepare context (target and job options) for calling a remote procedure on a server.

job(job_id)
Parameters:job_id (string) – call job identifier
Returns:call job representation
Return type:RemoteCall

Get a job representation, to retrieve from dispatcher its result and other information.

request(req)
Parameters:req (dict) – request to send to the dispatcher server
Returns:reply from the dispatcher server
Return type:dict

Connect to dispatcher server, to send a request and read a reply.

class harp.RemoteServer(dispatcher, hostname)

Target RPC server representation. This representation also carries options to be used for calling a remote procedure (timeout, maximum execution time, queue name, etc.).

Parameters:
  • dispatcher (HarpCaller) – dispatcher instance
  • hostname – name of the target server

See also __call__().

__str__()

Stringify to target hostname.

__getattr__(procedure)
Returns:remote procedure representation
Return type:RemoteProcedure

Any attribute of this object is a callable representation of a remote procedure. To submit a call request, one can use:

rpc = HarpCaller(dispatcher_address)
host = rpc.host("remote-server") # this is `RemoteServer' instance
host.my_remote_function_name(arg1, arg2, ...)
__call__(**kwargs)
Parameters:
  • queue (dict) – name of a queue to wait in
  • concurrency (positive integer) – number of simultaneously running jobs in the queue
  • timeout (positive integer (seconds)) – maximum time between consequent reads from the job
  • max_exec_time (positive integer (seconds)) – maximum time the job is allowed to take
  • info (JSON-serializable data) – arbitrary data to associate with call
Returns:

self

Adjust job options for this host.

class harp.RemoteProcedure(dispatcher, server, procedure)

Callable representation of a remote procedure.

Parameters:
  • dispatcher (HarpCaller) – dispatcher instance
  • server (RemoteServer) – remote server instance
  • procedure (string) – name of the procedure to call
__call__(*args, **kwargs)
Returns:call job handle
Return type:RemoteCall

Request a call to this procedure on target server to dispatcher.

host()
Returns:name of server to call this procedure at
Return type:string

Retrieve host this procedure will be called on.

procedure()
Returns:name of procedure to call
Return type:string

Retrieve name of the procedure that will be called.

class harp.RemoteCall(dispatcher, job_id)

RPC call job representation. Allows to check job’s status, retrieve its results (both stream and end result), and to cancel if it’s still running.

Parameters:
  • dispatcher (HarpCaller) – dispatcher instance
  • job_id (string) – call job identifier
class StreamIterator(remote_call, conn, numbered=False)

Iterator for call’s streamed result.

Parameters:
  • remote_call (RemoteCall) – object representing a call to iterate through
  • conn (JSONConnection) – connection to dispatcher server
  • numbered (bool) – whether iteration should yield tuples for easy packet ID tracking
all()
Returns:list of all packets this iterator would yield

Retrieve all packets at once in a single list.

RemoteCall.args()
Returns:list of JSON-serializable values

Return arguments passed to this job.

RemoteCall.cancel()
Returns:True if the job was still running, False otherwise

Cancel execution of the job.

RemoteCall.end_time()
Returns:unix timestamp (integer) or None

Return time when the job finished, either because function returned, an error occurred, or the job was cancelled. None is returned if the job hasn’t finished yet.

RemoteCall.follow(since=None, recent=None, numbered=False)
Parameters:
  • since (non-negative integer) – message number to read stream result from
  • recent (non-negative integer) – number of messages before current to read stream result from
  • numbered (bool) – if True, iterator will produce pairs (pktid, msg), with pktid having an analogous meaning to since
Return type:

RemoteCall.StreamIterator

Follow job’s streamed result, already collected and collected in the future, until the job terminates.

According to protocol, if neither since nor recent were provided, stream behaves as recent=0 was specified.

Usage:

rpc = HarpCaller(dispatcher_address)
job = rpc.job("e1c7b937-2428-42c2-9f22-f8fcf2906e65")
for msg in job.follow():
    consume(msg)
# alternatively:
#for (i,msg) in job.follow(numbered = True):
#    consume(i, msg)

See also stream().

RemoteCall.get(wait=True)
Parameters:wait (bool) – wait for the job to terminate
Return type:JSON-serializable data or CALL_NOT_FINISHED
Throws:RemoteException, RemoteError, CancelledException, or CommunicationError

Get job’s end result, waiting for job’s termination.

See also result().

RemoteCall.host()
Returns:unicode

Return target host of this job.

RemoteCall.id()
Return type:string

Return this job’s identifier, so job’s data can be recalled at some later time.

RemoteCall.info()
Returns:info field previously passed in call options

Return info field that was specified in job options (RemoteServer.__call__()). The content of this field is completely up to user and may be used to convey additional context between call submitter and call results consumer.

RemoteCall.procedure()
Returns:unicode

Return name of the procedure called in this job.

RemoteCall.result(wait=False)
Parameters:wait (bool) – wait for the job to terminate
Return type:JSON-serializable data, CALL_NOT_FINISHED, CALL_CANCELLED, RemoteException, RemoteError, or CommunicationError

Get job’s end result, regardless of how the job terminated. If wait=False was specified and the job is still running, CALL_NOT_FINISHED is returned.

Note that this function returns exception’s instance (RemoteException, RemoteError, CommunicationError) instead of throwing it.

See also get().

RemoteCall.start_time()
Returns:unix timestamp (integer) or None

Return time when the job started its execution. None is returned if the job was not started (yet?) due to queueing.

RemoteCall.stream(since=None, recent=None, numbered=False)
Parameters:
  • since (non-negative integer) – message number to read stream result from
  • recent (non-negative integer) – number of messages before current to read stream result from
  • numbered (bool) – if True, iterator will produce pairs (pktid, msg), with pktid having an analogous meaning to since
Return type:

RemoteCall.StreamIterator

Retrieve job’s streamed result collected up until call. Function does not wait for the job to terminate.

According to protocol, if neither since nor recent were provided, stream behaves as since=0 was specified.

Usage:

rpc = HarpCaller(dispatcher_address)
job = rpc.job("ef581fb8-a0ae-49a3-9eb3-a2cc505b28c9")
for msg in job.stream():
    consume(msg)
# alternatively:
#for (i,msg) in job.stream(numbered = True):
#    consume(i, msg)

See also follow().

RemoteCall.submit_time()
Returns:unix timestamp (integer)

Return time when the job was submitted for execution.

harp.CALL_NOT_FINISHED = <CallNotFinished>

Value returned instead of RPC call result if the call is still running.

harp.CALL_CANCELLED = <CallCancelled>

Value returned as a result when RPC call was cancelled.

harpd(8) interface

These classes allow direct communication with harpd(8). The interaction is synchronous and ephemeral, unlike that with HarpCaller.

class harp.HarpServer(host, user, password, port=4306, ca_file=None)
Parameters:
  • host – address of harpd(8)
  • port – port of harpd(8)
  • user – username to authenticate as
  • password – password for the username
  • ca_file – file with CA certificates to verify server’s certificate against (or None for no verification)

harpd(8) server representation.

Typical usage:

server = HarpServer(
    host = "example.net",
    user = "username",
    password = "some password",
    ca_file = "trusted_ca.cert.pem",
)
result = server.some_method("arg1", "arg2")
__getattr__(procedure)
Parameters:procedure – name of a procedure to call
Returns:procedure representation
Return type:HarpProcedure

Prepare a call to a procedure.

class harp.HarpProcedure(server, name)

Procedure on a harpd(8) server.

Instances of this class are callable, as a normal procedure would be.

Parameters:
  • server (HarpServer) – harpd(8) where the procedure is to be executed
  • name (string) – procedure name
__call__(*args, **kwargs)
Parameters:
  • args – list of positional arguments (can’t be used with non-empty kwargs)
  • kwargs – list of keyword arguments (can’t be used with non-empty args)
Returns:

remote procedure’s result (single result) or an iterable object (streamed result)

Return type:

json-serializable data or HarpStreamIterator

Throws:

HarpException or its subclass

Execute the procedure and get its result.

If the remote procedure returns single result, this method simply returns this value (dict, list, str/unicode, int/long, float, bool, or None; or simply: anything other than HarpStreamIterator).

If the remote procedure returns a streamed result, an iterator (HarpStreamIterator) is returned. This iterator returns a sequence of values (JSON-serializable, as described for single result), with the last value always being a Result instance.

If the RPC call returns an exception, HarpException is thrown. Any communication error (including unexpected EOF, which would otherwise result in missing Result) is raised as (possibly subclass of) HarpException.

class harp.HarpStreamIterator(orig_iterator)

Iterator wrapper for iterators to easily tell apart single results and streamed results of call to a procedure on a HarpServer.

class harp.Result(value)
Parameters:value – returned end result of a streamed result

Container for end result returned from streaming procedure.

Returned from HarpStreamIterator as a last value in iteration.

value

wrapped value

Networked JSON interface

Classes to simplify working with linewise JSON protocols.

class harp.JSONConnection(host, port)

TCP connection, reading and writing JSON lines.

Object of this class is a valid context manager, so it can be used this way:

with JSONConnection(host, port) as conn:
    conn.send({"key": "value"})
    reply = conn.receive()
Parameters:
  • host – address of dispatcher server
  • port – port of dispatcher server
close()

Close connection. To connect again, use connect().

connect()

Connect to the address specified in constructor. Newly created JSONConnection objects are already connected.

receive()
Returns:object serializable with json module

Receive JSON line and deserialize it, typically to a dict.

send(obj)
Parameters:obj – object serializable with json module

Send an object as a JSON line.

class harp.JSONSSLConnection(host, port, ca_file=None)

SSL connection, reading and writing JSON lines.

Object of this class is a valid context manager, so it can be used this way:

with JSONSSLConnection(host, port, "/etc/ssl/certs/ca.pem") as conn:
    conn.send({"key": "value"})
    reply = conn.receive()
Parameters:
  • host – address of dispatcher server
  • port – port of dispatcher server
  • ca_file – file with CA certificates, or None if no verification should be performed
close()

Close connection. To connect again, use connect().

connect()

Connect to the address specified in constructor. Newly created JSONSSLConnection objects are already connected.

receive()
Returns:object serializable with json module

Receive JSON line and deserialize it, typically to a dict.

send(obj)
Parameters:obj – object serializable with json module

Send an object as a JSON line.

Exceptions

exception harp.HarpException

Base class for all exceptions thrown in this module.

exception harp.CommunicationError

Error in communication with dispatcher.

exception harp.CancelledException

Exception thrown when job has no return value due to being cancelled.

exception harp.RemoteException(type, message, data=None)

Exception returned/thrown when the remote procedure threw an exception.

Defined fields:

type(unicode)

Identifier of error type.

message(unicode)

Complete error message.

data

Arbitrary JSON-serializable context information. None if nothing was provided by remote side.

exception harp.RemoteError(type, message, data=None)

Exception returned/thrown when dispatcher couldn’t reach the target RPC server for some reason.

Defined fields:

type(unicode)

Identifier of error type.

message(unicode)

Complete error message.

data

Arbitrary JSON-serializable context information. None if nothing was provided by remote side.

See Also

  • harpd(8)
  • harpcallerd(8)