System overview¶
Description¶
HarpCaller is a system that allows calling procedures on remote machines and collects their results of various types. Procedures to be called are part of system’s configuration, so they can do many administrative tasks. This is much more robust than a set of shell scripts called through SSH, especially when the procedures need to be parametrized and/or return some structured data.
Main intended consumer of HarpCaller is a web application, so HarpCaller needs to work in asynchronous manner: call request returns job identifier, so the job’s status and returned value can be obtained at later time.
From the called machine’s perspective, HarpCaller system is synchronous, keeping a single connection (one per request) through which a request is sent one way and all the results are sent in the opposite direction.
Beside the usual RPC mechanisms, HarpCaller supports returning a stream of partial results, which can be read in real time or replayed later, what comes handy for debugging procedures that track the changes in system status.
Components of the system¶
HarpCaller system is divided into three parts: daemon, dispatcher, and client library.
Daemon¶
harpd(8) daemon is a service running on every server that can be a target for RPC call. It is meant to carry out any procedure that is called and send the value that the procedure returned as a response to RPC call. The procedures available to daemon are supplied as daemon’s configuration.
Dispatcher¶
HarpCaller (request dispatcher) is a single central service tasked with connecting to daemons to pass them call requests and receive call results, and to store these results on disk for later access.
Given the queues are independent from any part of call requests, dispatcher is the place where queueing occurs.
Dispatcher is also the service that client library talks to directly in typical use.
Client library¶
Python harp
module is a client implementation of the protocol to talk
to HarpCaller and to harpd(8) services (note: harpd(8)
uses slightly different protocol). The primary use case for this interface was
to allow issuing commands to servers from within a web application, but it
should be equally convenient for other uses.
Module documentation: Python interface
Example usage: Synopsis
Using HarpCaller¶
HarpCaller works asynchronously, what means that client receives an acknowledgement with job ID immediately after sending a call request. This job ID can be later used to check the job’s status, read its result (or follow streamed partial results), or wait for the job to finish. All can be done in the same or different process, or even in multiple processes simultaneously.
HarpCaller maintains a list of known hosts that can be used as call targets. This list, called hosts registry, contains target address and port (address is either a domain name or an IP) and credentials for each of the hosts. The hosts are identified by their names, which can be arbitrary strings. For calling code host names are the only available way to tell where to run a procedure.
import harp
rpc = harp.HarpCaller("localhost")
job = rpc.host("web01.example.net").run_foo()
print job.id() # output: 8ec71d6b-28d1-4ce2-aa23-f3ea7887beda
# job ID, being a string, can be stored in a database for later use
# recall a job ID and check job's status
job = rpc.job("8ec71d6b-28d1-4ce2-aa23-f3ea7887beda")
print "job {host}:{procedure}(...)".format(
host = job.host(),
procedure = job.procedure(),
)
if job.result() is harp.CALL_NOT_FINISHED:
print "job still running"
else:
print "job terminated"
print job.start_time() # unix timestamp
print job.end_time() # unix timestamp or `None' if still running
print job.result(wait = True) # NOTE: errors are returned, not raised
End result¶
There are two methods for retrieving job’s return value:
job.get()
, which waits for the job to terminate (this behaviour can be changed withwait=False
parameter) and raises an exception on errorjob.result()
, which doesn’t wait for the job to terminate (this behaviour can be changed withwait=True
parameter) and returns an exception object on error
The recommended way to check if the job is still running is to compare value
returned by job.result()
with
harp.CALL_NOT_FINISHED
constant.
Stream result¶
Remote procedure can return a stream of partial results, end result, or both. Returning a stream is a feature useful for writing debugging procedures, which could run for longer time and report state changes immediately as they happen.
Stream of partial results is recorded beside the end result and job metadata, so it can be retrieved at a later time.
import harp
rpc = harp.HarpCaller("localhost")
job = rpc.job("8ec71d6b-28d1-4ce2-aa23-f3ea7887beda")
# print partial results collected up until now
for msg in job.stream(since = 0):
print msg
# print partial results collected from now on until the job terminates
# NOTE: we may have missed some results that came between job.since()
# and job.follow() calls
for msg in job.follow(recent = 0):
print msg
The difference between job.follow()
and
job.stream()
is similar to the difference
between job.get()
and job.result()
: the former runs until the job terminates, while
the latter doesn’t.
job.follow()
returns an iterator (harp.RemoteCall.StreamIterator
) that yields messages until the job terminates; by default only the new messages are returnedjob.stream()
returns an iterator (harp.RemoteCall.StreamIterator
) that yields messages collected until now, without waiting for new ones; by default all messages since the job started are returned
Note that reading the partial results doesn’t raise any exceptions. To check
for errors job.get()
or job.result()
needs to be called.
Time limits¶
A procedure can be called with time limits specified.
import harp
rpc = harp.HarpCaller("localhost")
job = rpc.host("web01.example.net", max_exec_time = 60).run_foo()
job = rpc.host("web01.example.net", timeout = 60).run_foo()
max_exec_time
specifies how long (in seconds) the job can run in total
before it is cancelled. timeout
for jobs that return stream result
specifies maximum time between consecutive partial results. For jobs that only
return end result, timeout
works in the same way as max_exec_time
.
Call queues¶
Execution of remote calls can be organized in queues. A queue is identified by its name, which is a dictionary with arbitrary content supplied at call time. This way the jobs can be grouped by target host, procedure, or some (possibly unrelated to the call) data. A queue is created by the first call with the queue’s name and is deleted when the last job from the queue terminates.
By default only one job from a queue can be running at the sime time. This can be configured at queue’s creation time by setting concurrency level to the desired number of simultaneously running jobs. A queue retains the same concurrency level throughout its whole life.
A job can belong to one queue or to no queue at all. timeout
and
max_exec_time
don’t count the time the job spent waiting for its turn in
the queue.
import harp
rpc = harp.HarpCaller("localhost")
hostname = "web01.example.net"
queue = { "host": hostname, "command": "foo" } # per-host queue
# run at most 3 jobs in this queue
job = rpc.host(hostname, queue = queue, concurrency = 3).run_foo()
print job.submit_time()
print job.start_time() # `None' if the job still waits in the queue
Job information¶
User can retrieve from HarpCaller various information about a call job. This
includes the names of target host and procedure (job.host()
, job.procedure()
), arguments (job.args()
), and time of job submission
(job.submit_time()
), start
(job.start_time()
), and termination
(job.end_time()
).
User can also attach an additional information to a remote call. This information will not be interpreted by HarpCaller in any way and its meaning is at the user’s discretion.
import harp
rpc = harp.HarpCaller("localhost")
job_info = { ... } # JSON-serializable data
job = rpc.host("web01.example.net", info = info).run_foo()
job = rpc.job("8ec71d6b-28d1-4ce2-aa23-f3ea7887beda")
print json.dumps(job.info())
Interpreting errors¶
TODO: Error reporting from HarpCaller needs to be cleaned up. There’s no unified way to identify the cause of an error for now.
Using harpd(8)¶
Sometimes running a request dispatcher may be an unnecessary overhead. For
these cases, harp
offers API for communicating with harpd(8)
directly.
import harp
server = harp.HarpServer(
host = "10.8.16.22",
user = "example_user",
password = "example_password",
ca_file = "/path/to/ca_certificates.crt",
)
value = server.run_foo()
print value
Since there’s no proxy that would save call results for later use, all calls are synchronous.
If the called procedure only returns an end result, value
from above
example will be a JSON-serializable object and no further work is needed to
consume it. If the procedure returns partial results, value
will be an
iterable object (harp.HarpStreamIterator
):
import harp
server = harp.HarpServer(...)
for record in server.run_stream():
if isinstance(record, harp.Result):
end_result = record
break
else:
print record
# do something with `end_result.value'
Unlike in the communication with HarpCaller, end result is included in the
iterator (and similarly, errors are thrown from the iterator as well). To tell
the partial results and end result apart, end result is wrapped in
harp.Result
container and is always the last value.
When an error is encountered, an exception (a subclass of
harp.HarpException
) is thrown. In particular, an exception thrown by
the remote procedure is raised as harp.RemoteException
.
NOTE: When calling a remote harpd(8) from a procedure under
another harpd(8) instance, remember not to confuse the container
classes harp.Result
and harpd.proc.Result
. They cannot be
used interchangeably, though they have similar structure and symmetrical
purposes.
See Also¶
- harp(3)
- harpd(8)
- harpcallerd(8)