API Reference¶
Package Layout¶
mitogen Package¶
On the Mitogen master, this is imported from mitogen/__init__.py as would
be expected. On the slave, it is built dynamically during startup.
-
mitogen.__version__= (0, 2, 2)¶ Library version as a tuple.
-
mitogen.is_master= True¶ This is
Falsein slave contexts. Previously it was used to prevent re-execution of__main__in single file programs, however that now happens automatically.
-
mitogen.context_id= 0¶ This is 0 in a master, otherwise it is the master-assigned ID unique to the slave context used for message routing.
-
mitogen.parent_id= None¶ This is
Nonein a master, otherwise it is the master-assigned ID unique to the slave’s parent context.
-
mitogen.parent_ids= []¶ This is an empty list in a master, otherwise it is a list of parent context IDs ordered from most direct to least direct.
-
mitogen.main(log_level='INFO', profiling=False)¶ Convenience decorator primarily useful for writing discardable test scripts.
In the master process, when func is defined in the
__main__module, arranges for func(router) to be invoked immediately, withmitogen.master.Routerconstruction and destruction handled just as inmitogen.utils.run_with_router(). In slaves, this function does nothing.Parameters: - log_level (str) – Logging package level to configure via
mitogen.utils.log_to_file(). - profiling (bool) – If
True, equivalent to settingmitogen.master.Router.profilingprior to router construction. This causes/tmpfiles to be created everywhere at the end of a successful run withcProfileoutput for every thread.
Example:
import mitogen import requests def get_url(url): return requests.get(url).text @mitogen.main() def main(router): z = router.ssh(hostname='k3') print(z.call(get_url, 'https://example.org/')))))
- log_level (str) – Logging package level to configure via
mitogen.core¶
This module implements most package functionality, but remains separate from non-essential code in order to reduce its size, since it is also serves as the bootstrap implementation sent to every new slave context.
-
@mitogen.core.takes_econtext¶ Decorator that marks a function or class method to automatically receive a kwarg named econtext, referencing the
mitogen.core.ExternalContextactive in the context in which the function is being invoked in. The decorator is only meaningful when the function is invoked viaCALL_FUNCTION.When the function is invoked directly, econtext must still be passed to it explicitly.
-
@mitogen.core.takes_router¶ Decorator that marks a function or class method to automatically receive a kwarg named router, referencing the
mitogen.core.Routeractive in the context in which the function is being invoked in. The decorator is only meaningful when the function is invoked viaCALL_FUNCTION.When the function is invoked directly, router must still be passed to it explicitly.
mitogen.master¶
This module implements functionality required by master processes, such as starting new contexts via SSH. Its size is also restricted, since it must be sent to any context that will be used to establish additional child contexts.
mitogen.parent¶
This module defines functionality common to master and parent processes. It is sent to any child context that is due to become a parent, due to recursive connection.
mitogen.fakessh¶
mitogen.fakessh is a stream implementation that starts a subprocess with
its environment modified such that PATH searches for ssh return a Mitogen
implementation of SSH. When invoked, this implementation arranges for the
command line supplied by the caller to be executed in a remote context, reusing
the parent context’s (possibly proxied) connection to that remote context.
This allows tools like rsync and scp to transparently reuse the connections
and tunnels already established by the host program to connect to a target
machine, without wasteful redundant SSH connection setup, 3-way handshakes, or
firewall hopping configurations, and enables these tools to be used in
impossible scenarios, such as over sudo with requiretty enabled.
The fake ssh command source is written to a temporary file on disk, and
consists of a copy of the mitogen.core source code (just like any
other child context), with a line appended to cause it to connect back to the
host process over an FD it inherits. As there is no reliance on an existing
filesystem file, it is possible for child contexts to use fakessh.
As a consequence of connecting back through an inherited FD, only one SSH invocation is possible, which is fine for tools like rsync, however in future this restriction will be lifted.
Sequence:
fakesshContext and Stream created by parent context. The stream’s buffer has a_fakessh_main()CALL_FUNCTIONenqueued.Target program (rsync/scp/sftp) invoked, which internally executes ssh from
PATH.
mitogen.corebootstrap begins, recovers the stream FD inherited via the target program, established itself as the fakessh context.
_fakessh_main()CALL_FUNCTIONis read by fakessh context,
sets up
IoPumpfor stdio, registers stdin_handle for local context.Enqueues
CALL_FUNCTIONfor_start_slave()invoked in target context,
- the program from the ssh command line is started
- sets up
IoPumpfor ssh command line process’s stdio pipes- returns (control_handle, stdin_handle) to
_fakessh_main()
_fakessh_main()receives control/stdin handles from from_start_slave(),
- registers remote’s stdin_handle with local
IoPump.- sends (“start”, local_stdin_handle) to remote’s control_handle
- registers local
IoPumpwithmitogen.core.Broker.- loops waiting for local stdout closed && remote stdout closed
_start_slave()control channel receives (“start”, stdin_handle),
- registers remote’s stdin_handle with local
IoPump- registers local
IoPumpwithmitogen.core.Broker.- loops waiting for local stdout closed && remote stdout closed
-
mitogen.fakessh.run(dest, router, args, daedline=None, econtext=None)¶ Run the command specified by args such that
PATHsearches for SSH by the command will cause its attempt to use SSH to execute a remote program to be redirected to use mitogen to execute that program using the context dest instead.Parameters: - args (list[str]) – Argument vector.
- dest (mitogen.core.Context) – The destination context to execute the SSH command line in.
- router (mitogen.core.Router) –
- args – Command line arguments for local program, e.g.
['rsync', '/tmp', 'remote:/tmp']
Returns: Exit status of the child process.
Message Class¶
-
class
mitogen.core.Message¶ Messages are the fundamental unit of communication, comprising fields from the Stream Protocol header, an optional reference to the receiving
mitogen.core.Routerfor ingress messages, and helper methods for deserialization and generating replies.-
router¶ The
mitogen.core.Routerresponsible for routing the message. This isNonefor locally originated messages.
-
receiver¶ The
mitogen.core.Receiverover which the message was last received. Part of themitogen.select.Selectinterface. Defaults toNone.
-
dst_id¶ Integer target context ID.
mitogen.core.Routerdelivers messages locally when theirdst_idmatchesmitogen.context_id, otherwise they are routed up or downstream.
-
src_id¶ Integer source context ID. Used as the target of replies if any are generated.
-
auth_id¶ The context ID under whose authority the message is acting. See Source Verification.
-
handle¶ Integer target handle in the destination context. This is one of the Standard Handles, or a dynamically generated handle used to receive a one-time reply, such as the return value of a function call.
-
reply_to¶ Integer target handle to direct any reply to this message. Used to receive a one-time reply, such as the return value of a function call.
IS_DEADhas a special meaning when it appears in this field.
-
data¶ Message data, which may be raw or pickled.
-
is_dead¶ Trueifreply_tois set to the magic valuemitogen.core.IS_DEAD, indicating the sender considers the channel dead.
-
__init__(**kwargs)¶ Construct a message from from the supplied kwargs.
src_idandauth_idare always set tomitogen.context_id.
-
classmethod
pickled(obj, **kwargs)¶ Construct a pickled message, setting
datato the serialization of obj, and setting remaining fields using kwargs.Returns: The new message.
-
unpickle(throw=True)¶ Unpickle
data, optionally raising any exceptions present.Parameters: throw (bool) – If
True, raise exceptions, otherwise it is the caller’s responsibility.Raises: - mitogen.core.CallError – The serialized data contained CallError exception.
- mitogen.core.ChannelError – The is_dead field was set.
-
Router Class¶
-
class
mitogen.core.Router¶ Route messages between parent and child contexts, and invoke handlers defined on our parent context.
Router.route()straddles theBrokerand user threads, it is safe to call anywhere.Note: This is the somewhat limited core version of the Router class used by child contexts. The master subclass is documented below this one.
-
unidirectional¶ When
True, permit children to only communicate with the current context or a parent of the current context. Routing between siblings or children of parents is prohibited, ensuring no communication is possible between intentionally partitioned networks, such as when a program simultaneously manipulates hosts spread across a corporate and a production network, or production networks that are otherwise air-gapped.Sending a prohibited message causes an error to be logged and a dead message to be sent in reply to the errant message, if that message has
reply_toset.The value of
unidirectionalbecomes the default for thelocal()unidirectional parameter.
-
stream_by_id(dst_id)¶ Return the
mitogen.core.Streamthat should be used to communicate with dst_id. If a specific route for dst_id is not known, a reference to the parent context’s stream is returned.
-
add_route(target_id, via_id)¶ Arrange for messages whose dst_id is target_id to be forwarded on the directly connected stream for via_id. This method is called automatically in response to
ADD_ROUTEmessages, but remains public for now while the design has not yet settled, and situations may arise where routing is not fully automatic.
-
register(context, stream)¶ Register a new context and its associated stream, and add the stream’s receive side to the I/O multiplexer. This This method remains public for now while hte design has not yet settled.
-
add_handler(fn, handle=None, persist=True, respondent=None, policy=None)¶ Invoke fn(msg) for each Message sent to handle from this context. Unregister after one invocation if persist is
False. If handle isNone, a new handle is allocated and returned.Parameters: - handle (int) – If not
None, an explicit handle to register, usually one of themitogen.core.*constants. If unspecified, a new unused handle will be allocated. - persist (bool) – If
False, the handler will be unregistered after a single message has been received. - respondent (mitogen.core.Context) –
Context that messages to this handle are expected to be sent from. If specified, arranges for a dead message to be delivered to fn when disconnection of the context is detected.
In future respondent will likely also be used to prevent other contexts from sending messages to the handle.
- policy (function) –
Function invoked as policy(msg, stream) where msg is a
mitogen.core.Messageabout to be delivered, and stream is themitogen.core.Streamon which it was received. The function must returnTrue, otherwise an error is logged and delivery is refused.Two built-in policy functions exist:
mitogen.core.has_parent_authority(): requires the message arrived from a parent context, or a context acting with a parent context’s authority (auth_id).mitogen.parent.is_immediate_child(): requires the message arrived from an immediately connected child, for use in messaging patterns where either something becomes buggy or insecure by permitting indirect upstream communication.
In case of refusal, and the message’s
reply_tofield is nonzero, amitogen.core.CallErroris delivered to the sender indicating refusal occurred.
Returns: handle, or if handle was
None, the newly allocated handle.- handle (int) – If not
-
del_handler(handle)¶ Remove the handle registered for handle
Raises: KeyError – The handle wasn’t registered.
-
_async_route(msg, stream=None)¶ Arrange for msg to be forwarded towards its destination. If its destination is the local context, then arrange for it to be dispatched using the local handlers.
This is a lower overhead version of
route()that may only be called from the I/O multiplexer thread.Parameters: stream (mitogen.core.Stream) – If not None, a reference to the stream the message arrived on. Used for performing source route verification, to ensure sensitive messages such asCALL_FUNCTIONarrive only from trusted contexts.
-
route(msg)¶ Arrange for the
Messagemsg to be delivered to its destination using any relevant downstream context, or if none is found, by forwarding the message upstream towards the master context. If msg is destined for the local context, it is dispatched using the handles registered withadd_handler().This may be called from any thread.
-
-
class
mitogen.master.Router(broker=None)¶ Extend
mitogen.core.Routerwith functionality useful to masters, and child contexts who later become masters. Currently when this class is required, the target context’s router is upgraded at runtime.Note
You may construct as many routers as desired, and use the same broker for multiple routers, however usually only one broker and router need exist. Multiple routers may be useful when dealing with separate trust domains, for example, manipulating infrastructure belonging to separate customers or projects.
Parameters: broker (mitogen.master.Broker) – Brokerinstance to use. If not specified, a privateBrokeris created.-
profiling¶ When
True, cause the broker thread and any subsequent broker and main threads existing in any child to write/tmp/mitogen.stats.<pid>.<thread_name>.logcontaining acProfiledump on graceful exit. Must be set prior to construction of anyBroker, e.g. via:mitogen.master.Router.profiling = True
-
enable_debug()¶ Cause this context and any descendant child contexts to write debug logs to /tmp/mitogen.<pid>.log.
-
allocate_id()¶ Arrange for a unique context ID to be allocated and associated with a route leading to the active context. In masters, the ID is generated directly, in children it is forwarded to the master via an
ALLOCATE_IDmessage that causes the master to emit matchingADD_ROUTEmessages prior to replying.
-
context_by_id(context_id, via_id=None)¶ Messy factory/lookup function to find a context by its ID, or construct it. In future this will be replaced by a much more sensible interface.
Context Factories
-
fork(on_fork=None, on_start=None, debug=False, profiling=False, via=None)¶ Construct a context on the local machine by forking the current process. The forked child receives a new identity, sets up a new broker and router, and responds to function calls identically to children created using other methods.
For long-lived processes,
local()is always better as it guarantees a pristine interpreter state that inherited little from the parent. Forking should only be used in performance-sensitive scenarios where short-lived children must be spawned to isolate potentially buggy code, and only after accounting for all the bad things possible as a result of, at a minimum:- Files open in the parent remaining open in the child,
causing the lifetime of the underlying object to be extended
indefinitely.
- From the perspective of external components, this is observable in the form of pipes and sockets that are never closed, which may break anything relying on closure to signal protocol termination.
- Descriptors that reference temporary files will not have their disk space reclaimed until the child exits.
- Third party package state, such as urllib3’s HTTP connection pool, attempting to write to file descriptors shared with the parent, causing random failures in both parent and child.
- UNIX signal handlers installed in the parent process remaining active in the child, despite associated resources, such as service threads, child processes, resource usage counters or process timers becoming absent or reset in the child.
- Library code that makes assumptions about the process ID remaining unchanged, for example to implement inter-process locking, or to generate file names.
- Anonymous
MAP_PRIVATEmemory mappings whose storage requirement doubles as either parent or child dirties their pages. - File-backed memory mappings that cannot have their space freed on disk due to the mapping living on in the child.
- Difficult to diagnose memory usage and latency spikes due to object graphs becoming unreferenced in either parent or child, causing immediate copy-on-write to large portions of the process heap.
- Locks held in the parent causing random deadlocks in the child, such
as when another thread emits a log entry via the
loggingpackage concurrent to another thread callingfork(). - Objects existing in Thread-Local Storage of every non-
fork()thread becoming permanently inaccessible, and never having their object destructors called, including TLS usage by native extension code, triggering many new variants of all the issues above. - Pseudo-Random Number Generator state that is easily observable by network peers to be duplicate, violating requirements of cryptographic protocols through one-time state reuse. In the worst case, children continually reuse the same state due to repeatedly forking from a static parent.
fork()cleans up Mitogen-internal objects, in addition to locks held by theloggingpackage, reseedsrandom.random(), and the OpenSSL PRNG viassl.RAND_add(), but only if thesslmodule is already loaded. You must arrange for your program’s state, including any third party packages in use, to be cleaned up by specifying an on_fork function.The associated stream implementation is
mitogen.fork.Stream.Parameters: - on_fork (function) – Function invoked as on_fork() from within the child process. This permits supplying a program-specific cleanup function to break locks and close file descriptors belonging to the parent from within the child.
- on_start (function) – Invoked as on_start(econtext) from within the child process after it has been set up, but before the function dispatch loop starts. This permits supplying a custom child main function that inherits rich data structures that cannot normally be passed via a serialization.
- via (Context) – Same as the via parameter for
local(). - debug (bool) – Same as the debug parameter for
local(). - profiling (bool) – Same as the profiling parameter for
local().
- Files open in the parent remaining open in the child,
causing the lifetime of the underlying object to be extended
indefinitely.
-
local(remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None)¶ Construct a context on the local machine as a subprocess of the current process. The associated stream implementation is
mitogen.master.Stream.Parameters: - remote_name (str) –
The
argv[0]suffix for the new process. If remote_name istest, the new processargv[0]will bemitogen:test.If unspecified, defaults to
<username>@<hostname>:<pid>.This variable cannot contain slash characters, as the resulting
argv[0]must be presented in such a way as to allow Python to determine its installation prefix. This is required to support virtualenv. - python_path (str|list) –
String or list path to the Python interpreter to use for bootstrap. Defaults to
sys.executablefor local connections, andpythonfor remote connections.It is possible to pass a list to invoke Python wrapped using another tool, such as
["/usr/bin/env", "python"]. - debug (bool) – If
True, arrange for debug logging (enable_debug()) to be enabled in the new context. AutomaticallyTruewhenenable_debug()has been called, but may be used selectively otherwise. - unidirectional (bool) – If
True, arrange for the child’s router to be constructed withunidirectional routingenabled. AutomaticallyTruewhen it was enabled for this router, but may still be explicitly set toFalse. - connect_timeout (float) – Fractional seconds to wait for the subprocess to indicate it is healthy. Defaults to 30 seconds.
- profiling (bool) – If
True, arrange for profiling (profiling) to be enabled in the new context. AutomaticallyTruewhenprofilingisTrue, but may be used selectively otherwise. - via (mitogen.core.Context) –
If not
None, arrange for construction to occur via RPCs made to the context via, and forADD_ROUTEmessages to be generated as appropriate.# SSH to the remote machine. remote_machine = router.ssh(hostname='mybox.com') # Use the SSH connection to create a sudo connection. remote_root = router.sudo(username='root', via=remote_machine)
- remote_name (str) –
-
dos(username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, **kwargs)¶ Construct a context on the local machine over a
suinvocation. Thesuprocess is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.Accepts all parameters accepted by
local(), in addition to:Parameters: - username (str) – Username to use, defaults to
root. - password (str) – The account password to use if requested.
- su_path (str) – Filename or complete path to the
subinary.PATHwill be searched if given as a filename. Defaults tosu. - password_prompt (bytes) – A string that indicates
doasis requesting a password. Defaults toPassword:. - incorrect_prompts (list) – List of bytestrings indicating the password is incorrect. Defaults to (b”doas: authentication failed”).
Raises: mitogen.su.PasswordError – A password was requested but none was provided, the supplied password was incorrect, or the target account did not exist.
- username (str) – Username to use, defaults to
-
docker(container=None, image=None, docker_path=None, **kwargs)¶ Construct a context on the local machine within an existing or temporary new Docker container using the
dockerprogram. One of container or image must be specified.Accepts all parameters accepted by
local(), in addition to:Parameters: - container (str) – Existing container to connect to. Defaults to
None. - username (str) – Username within the container to
setuid()to. Defaults toNone, which Docker interprets asroot. - image (str) – Image tag to use to construct a temporary container. Defaults to
None. - docker_path (str) – Filename or complete path to the Docker binary.
PATHwill be searched if given as a filename. Defaults todocker.
- container (str) – Existing container to connect to. Defaults to
-
jail(container, jexec_path=None, **kwargs)¶ Construct a context on the local machine within a FreeBSD jail using the
jexecprogram.Accepts all parameters accepted by
local(), in addition to:Parameters: - container (str) – Existing container to connect to. Defaults to
None. - username (str) – Username within the container to
setuid()to. Defaults toNone, whichjexecinterprets asroot. - jexec_path (str) – Filename or complete path to the
jexecbinary.PATHwill be searched if given as a filename. Defaults to/usr/sbin/jexec.
- container (str) – Existing container to connect to. Defaults to
-
lxc(container, lxc_attach_path=None, **kwargs)¶ Construct a context on the local machine within an LXC classic container using the
lxc-attachprogram.Accepts all parameters accepted by
local(), in addition to:Parameters:
-
lxc(container, lxc_attach_path=None, **kwargs) Construct a context on the local machine within a LXD container using the
lxcprogram.Accepts all parameters accepted by
local(), in addition to:Parameters:
-
setns(container, kind, docker_path=None, lxc_info_path=None, machinectl_path=None, **kwargs)¶ Construct a context in the style of
local(), but change the active Linux process namespaces via calls to setns(1) before executing Python.The namespaces to use, and the active root file system are taken from the root PID of a running Docker, LXC, LXD, or systemd-nspawn container.
A program is required only to find the root PID, after which management of the child Python interpreter is handled directly.
Parameters: - container (str) – Container to connect to.
- kind (str) – One of
docker,lxc,lxdormachinectl. - docker_path (str) – Filename or complete path to the Docker binary.
PATHwill be searched if given as a filename. Defaults todocker. - lxc_path (str) – Filename or complete path to the LXD
lxcbinary.PATHwill be searched if given as a filename. Defaults tolxc. - lxc_info_path (str) – Filename or complete path to the LXC
lxc-infobinary.PATHwill be searched if given as a filename. Defaults tolxc-info. - machinectl_path (str) – Filename or complete path to the
machinectlbinary.PATHwill be searched if given as a filename. Defaults tomachinectl.
-
su(username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, **kwargs)¶ Construct a context on the local machine over a
suinvocation. Thesuprocess is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.Accepts all parameters accepted by
local(), in addition to:Parameters: - username (str) – Username to pass to
su, defaults toroot. - password (str) – The account password to use if requested.
- su_path (str) – Filename or complete path to the
subinary.PATHwill be searched if given as a filename. Defaults tosu. - password_prompt (bytes) – The string that indicates
suis requesting a password. Defaults toPassword:. - incorrect_prompts (str) – Strings that signal the password is incorrect. Defaults to (“su: sorry”, “su: authentication failure”).
Raises: mitogen.su.PasswordError – A password was requested but none was provided, the supplied password was incorrect, or (on BSD) the target account did not exist.
- username (str) – Username to pass to
-
sudo(username=None, sudo_path=None, password=None, **kwargs)¶ Construct a context on the local machine over a
sudoinvocation. Thesudoprocess is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.Accepts all parameters accepted by
local(), in addition to:Parameters: - username (str) – Username to pass to sudo as the
-uparameter, defaults toroot. - sudo_path (str) – Filename or complete path to the sudo binary.
PATHwill be searched if given as a filename. Defaults tosudo. - password (str) – The password to use if/when sudo requests it. Depending on the sudo
configuration, this is either the current account password or the
target account password.
mitogen.sudo.PasswordErrorwill be raised if sudo requests a password but none is provided. - set_home (bool) – If
True, requestsudoset theHOMEenvironment variable to match the target UNIX account. - preserve_env (bool) – If
True, requestsudoto preserve the environment of the parent process. - sudo_args (list) – Arguments in the style of
sys.argvthat would normally be passed tosudo. The arguments are parsed in-process to set equivalent parameters. Re-parsing ensures unsupported options causemitogen.core.StreamErrorto be raised, and that attributes of the stream match the actual behaviour ofsudo.
- username (str) – Username to pass to sudo as the
-
ssh(hostname, username=None, ssh_path=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, **kwargs)¶ Construct a remote context over a
sshinvocation. Thesshprocess is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.Accepts all parameters accepted by
local(), in addition to:Parameters: - username (str) – The SSH username; default is unspecified, which causes SSH to pick the username to use.
- ssh_path (str) – Absolute or relative path to
ssh. Defaults tossh. - port (int) – Port number to connect to; default is unspecified, which causes SSH to pick the port number.
- check_host_keys (str) –
Specifies the SSH host key checking mode. Defaults to
enforce.ignore: no host key checking is performed. Connections never fail due to an unknown or changed host key.accept: known hosts keys are checked to ensure they match, new host keys are automatically accepted and verified in future connections.enforce: known host keys are checked to ensure they match, unknown hosts cause a connection failure.
- password (str) – Password to type if/when
sshrequests it. If not specified and a password is requested,mitogen.ssh.PasswordErroris raised. - identity_file (str) –
Path to an SSH private key file to use for authentication. Default is unspecified, which causes SSH to pick the identity file.
When this option is specified, only identity_file will be used by the SSH client to perform authenticaion; agent authentication is automatically disabled, as is reading the default private key from
~/.ssh/id_rsa, or~/.ssh/id_dsa. - identities_only (bool) – If
Trueand a password or explicit identity file is specified, instruct the SSH client to disable any authentication identities inherited from the surrounding environment, such as those loaded in any runningssh-agent, or default key files present in~/.ssh. This ensures authentication attempts only occur using the supplied password or SSH key. - compression (bool) – If
True, enablesshcompression support. Compression has a minimal effect on the size of modules transmitted, as they are already compressed, however it has a large effect on every remaining message in the otherwise uncompressed stream protocol, such as function call arguments and return values.
Parama int ssh_debug_level: Optional integer 0..3 indicating the SSH client debug level.
Raises: - mitogen.ssh.PasswordError – A password was requested but none was specified, or the specified password was incorrect.
- mitogen.ssh.HostKeyError – When check_host_keys is set to either
accept, indicates a previously recorded key no longer matches the remote machine. When set toenforce, as above, but additionally indicates no previously recorded key exists for the remote machine.
-
Context Class¶
-
class
mitogen.core.Context¶ Represent a remote context regardless of connection method.
Note: This is the somewhat limited core version of the Context class used by child contexts. The master subclass is documented below this one.
-
send(msg)¶ Arrange for msg to be delivered to this context. Updates the message’s dst_id prior to routing it via the associated router.
Parameters: msg (mitogen.core.Message) – The message.
-
send_async(msg, persist=False)¶ Arrange for msg to be delivered to this context, with replies delivered to a newly constructed Receiver. Updates the message’s dst_id prior to routing it via the associated router and registers a handle which is placed in the message’s reply_to.
Parameters: - persist (bool) – If
False, the handler will be unregistered after a single message has been received. - msg (mitogen.core.Message) – The message.
Returns: mitogen.core.Receiverconfigured to receive any replies sent to the message’s reply_to handle.- persist (bool) – If
-
send_await(msg, deadline=None)¶ As with
send_async(), but expect a single reply (persist=False) delivered within deadline seconds.Parameters: - msg (mitogen.core.Message) – The message.
- deadline (float) – If not
None, seconds before timing out waiting for a reply.
Raises: mitogen.core.TimeoutError – No message was received and deadline passed.
-
-
class
mitogen.parent.Context¶ Extend
mitogen.core.Routerwith functionality useful to masters, and child contexts who later become parents. Currently when this class is required, the target context’s router is upgraded at runtime.-
shutdown(wait=False)¶ Arrange for the context to receive a
SHUTDOWNmessage, triggering graceful shutdown.Due to a lack of support for timers, no attempt is made yet to force terminate a hung context using this method. This will be fixed shortly.
Parameters: wait (bool) – If True, block the calling thread until the context has completely terminated.Returns: If wait is False, returns amitogen.core.Latchwhoseget()method returnsNonewhen shutdown completes. The timeout parameter may be used to implement graceful timeouts.
-
call_async(fn, *args, **kwargs)¶ Arrange for the context’s
CALL_FUNCTIONhandle to receive a message that causes fn(*args, **kwargs) to be invoked on the context’s main thread.Parameters: - fn –
A free function in module scope or a class method of a class directly reachable from module scope:
# mymodule.py def my_func(): """A free function reachable as mymodule.my_func""" class MyClass: @classmethod def my_classmethod(cls): """Reachable as mymodule.MyClass.my_classmethod""" def my_instancemethod(self): """Unreachable: requires a class instance!""" class MyEmbeddedClass: @classmethod def my_classmethod(cls): """Not directly reachable from module scope!"""
- args (tuple) – Function arguments, if any. See RPC Serialization Rules for permitted types.
- kwargs (dict) – Function keyword arguments, if any. See RPC Serialization Rules for permitted types.
Returns: mitogen.core.Receiverconfigured to receive the result of the invocation:recv = context.call_async(os.check_output, 'ls /tmp/') try: # Prints output once it is received. msg = recv.get() print(msg.unpickle()) except mitogen.core.CallError, e: print('Call failed:', str(e))
Asynchronous calls may be dispatched in parallel to multiple contexts and consumed as they complete using
mitogen.select.Select.- fn –
-
call(fn, *args, **kwargs)¶ Equivalent to
call_async(fn, *args, **kwargs).get().unpickle().Returns: The function’s return value. Raises: mitogen.core.CallError – An exception was raised in the remote context during execution.
-
call_no_reply(fn, *args, **kwargs)¶ Send a function call, but expect no return value. If the call fails, the full exception will be logged to the target context’s logging framework.
Raises: mitogen.core.CallError – An exception was raised in the remote context during execution.
-
Receiver Class¶
-
class
mitogen.core.Receiver(router, handle=None, persist=True, respondent=None)¶ Receivers are used to wait for pickled responses from another context to be sent to a handle registered in this context. A receiver may be single-use (as in the case of
mitogen.parent.Context.call_async()) or multiple use.Parameters: - router (mitogen.core.Router) – Router to register the handler on.
- handle (int) – If not
None, an explicit handle to register, otherwise an unused handle is chosen. - persist (bool) – If
True, do not unregister the receiver’s handler after the first message. - respondent (mitogen.core.Context) – Reference to the context this receiver is receiving from. If not
None, arranges for the receiver to receive a dead message if messages can no longer be routed to the context, due to disconnection or exit.
-
notify = None If not
None, a reference to a function invoked as notify(receiver) when a new message is delivered to this receiver. Used bymitogen.select.Selectto implement waiting on multiple receivers.
-
to_sender()¶ Return a
mitogen.core.Senderconfigured to deliver messages to this receiver. Since a Sender can be serialized, this makes it convenient to pass (context_id, handle) pairs around:def deliver_monthly_report(sender): for line in open('monthly_report.txt'): sender.send(line) sender.close() remote = router.ssh(hostname='mainframe') recv = mitogen.core.Receiver(router) remote.call(deliver_monthly_report, recv.to_sender()) for msg in recv: print(msg)
-
empty()¶ Return
Trueif callingget()would block.As with
Queue.Queue,Truemay be returned even though a subsequent call toget()will succeed, since a message may be posted at any moment betweenempty()andget().empty()is only useful to avoid a race while installingnotify:recv.notify = _my_notify_function if not recv.empty(): _my_notify_function(recv) # It is guaranteed the receiver was empty after the notification # function was installed, or that it was non-empty and the # notification function was invoked at least once.
-
close()¶ Cause
mitogen.core.ChannelErrorto be raised in any thread waiting inget()on this receiver.
-
get(timeout=None)¶ Sleep waiting for a message to arrive on this receiver.
Parameters: timeout (float) – If not
None, specifies a timeout in seconds.Raises: - mitogen.core.ChannelError – The remote end indicated the channel should be closed, or communication with its parent context was lost.
- mitogen.core.TimeoutError – Timeout was reached.
Returns: (msg, data) tuple, where msg is the
mitogen.core.Messagethat was received, and data is its unpickled data part.
-
__iter__()¶ Block and yield (msg, data) pairs delivered to this receiver until
mitogen.core.ChannelErroris raised.
Sender Class¶
-
class
mitogen.core.Sender(context, dst_handle)¶ Senders are used to send pickled messages to a handle in another context, it is the inverse of
mitogen.core.Sender.Senders may be serialized, making them convenient to wire up data flows. See
mitogen.core.Receiver.to_sender()for more information.Parameters: - context (mitogen.core.Context) – Context to send messages to.
- dst_handle (int) – Destination handle to send messages to.
-
close()¶ Send a dead message to the remote end, causing
ChannelError()to be raised in any waiting thread.
-
send(data)¶ Send data to the remote end.
Select Class¶
-
class
mitogen.select.Select(receivers=(), oneshot=True)¶ Support scatter/gather asynchronous calls and waiting on multiple receivers, channels, and sub-Selects. Accepts a sequence of
mitogen.core.Receiverormitogen.select.Selectinstances and returns the first value posted to any receiver or select.If oneshot is
True, then remove each receiver as it yields a result; since__iter__()terminates once the final receiver is removed, this makes it convenient to respond to calls made in parallel:total = 0 recvs = [c.call_async(long_running_operation) for c in contexts] for msg in mitogen.select.Select(recvs): print('Got %s from %s' % (msg, msg.receiver)) total += msg.unpickle() # Iteration ends when last Receiver yields a result. print('Received total %s from %s receivers' % (total, len(recvs)))
Selectmay drive a long-running scheduler:with mitogen.select.Select(oneshot=False) as select: while running(): for msg in select: process_result(msg.receiver.context, msg.unpickle()) for context, workfunc in get_new_work(): select.add(context.call_async(workfunc))
Selectmay be nested:subselects = [ mitogen.select.Select(get_some_work()), mitogen.select.Select(get_some_work()), mitogen.select.Select([ mitogen.select.Select(get_some_work()), mitogen.select.Select(get_some_work()) ]) ] for msg in mitogen.select.Select(selects): print(msg.unpickle())
-
classmethod
all(it)¶ Take an iterable of receivers and retrieve a
Messagefrom each, returning the result of calling msg.unpickle() on each in turn. Results are returned in the order they arrived.This is sugar for handling batch
Context.call_asyncinvocations:print('Total disk usage: %.02fMiB' % (sum( mitogen.select.Select.all( context.call_async(get_disk_usage) for context in contexts ) / 1048576.0 ),))
However, unlike in a naive comprehension such as:
sum(context.call_async(get_disk_usage).get().unpickle() for context in contexts)
Result processing happens concurrently to new results arriving, so
all()should always be faster.
-
get(timeout=None, block=True)¶ Fetch the next available value from any receiver, or raise
mitogen.core.TimeoutErrorif no value is available within timeout seconds.On success, the message’s
receiverattribute is set to the receiver.Parameters: - timeout (float) – Timeout in seconds.
- block (bool) – If
False, immediately raisemitogen.core.TimeoutErrorif the select is empty.
Returns: Raises: - mitogen.core.TimeoutError – Timeout was reached.
- mitogen.core.LatchError –
close()has been called, and the underlying latch is no longer valid.
-
close()¶ Remove the select’s notifier function from each registered receiver, mark the associated latch as closed, and cause any thread currently sleeping in
get()to be woken withmitogen.core.LatchError.This is necessary to prevent memory leaks in long-running receivers. It is called automatically when the Python
withstatement is used.
-
empty()¶ Return
Trueif callingget()would block.As with
Queue.Queue,Truemay be returned even though a subsequent call toget()will succeed, since a message may be posted at any moment betweenempty()andget().empty()may returnFalseeven whenget()would block if another thread has drained a receiver added to this select. This can be avoided by only consuming each receiver from a single thread.
-
__iter__(self)¶ Yield the result of
get()until no receivers remain in the select, either because oneshot isTrue, or each receiver was explicitly removed viaremove().
-
add(recv)¶ Add the
mitogen.core.Receiverormitogen.core.Channelrecv to the select.
-
remove(recv)¶ Remove the
mitogen.core.Receiverormitogen.core.Channelrecv from the select. Note that if the receiver has notified prior toremove(), then it will still be returned by a subsequentget(). This may change in a future version.
-
classmethod
Channel Class¶
-
class
mitogen.core.Channel(router, context, dst_handle, handle=None)¶ A channel inherits from
mitogen.core.Senderand mitogen.core.Receiver to provide bidirectional functionality.Since all handles aren’t known until after both ends are constructed, for both ends to communicate through a channel, it is necessary for one end to retrieve the handle allocated to the other and reconfigure its own channel to match. Currently this is a manual task.
Broker Class¶
-
class
mitogen.core.Broker¶ Responsible for handling I/O multiplexing in a private thread.
Note: This is the somewhat limited core version of the Broker class used by child contexts. The master subclass is documented below this one.
-
shutdown_timeout = 3.0 Seconds grace to allow
streamsto shutdown gracefully before force-disconnecting them duringshutdown().
-
defer(func, *args, *kwargs)¶ Arrange for func(*args, **kwargs) to be executed on the broker thread, or immediately if the current thread is the broker thread. Safe to call from any thread.
-
start_receive(stream)¶ Mark the
receive_sideon stream as ready for reading. Safe to call from any thread. When the associated file descriptor becomes ready for reading,BasicStream.on_receive()will be called.
-
stop_receive(stream)¶ Mark the
receive_sideon stream as not ready for reading. Safe to call from any thread.
-
_start_transmit(stream)¶ Mark the
transmit_sideon stream as ready for writing. Must only be called from the Broker thread. When the associated file descriptor becomes ready for writing,BasicStream.on_transmit()will be called.
-
stop_receive(stream) Mark the
transmit_sideon stream as not ready for writing. Safe to call from any thread.
-
shutdown()¶ Request broker gracefully disconnect streams and stop.
-
join()¶ Wait for the broker to stop, expected to be called after
shutdown().
-
keep_alive()¶ Return
Trueif any reader’sSide.keep_aliveattribute isTrue, or anyContextis still registered that is not the master. Used to delay shutdown while some important work is in progress (e.g. log draining).
Internal Methods
-
_broker_main()¶ Handle events until
shutdown(). On shutdown, invokeStream.on_shutdown()for every active stream, then allow up toshutdown_timeoutseconds for the streams to unregister themselves before forcefully callingStream.on_disconnect().
-
-
class
mitogen.master.Broker(install_watcher=True)¶ Note
You may construct as many brokers as desired, and use the same broker for multiple routers, however usually only one broker need exist. Multiple brokers may be useful when dealing with sets of children with differing lifetimes. For example, a subscription service where non-payment results in termination for one customer.
Parameters: install_watcher (bool) – If
True, an additional thread is started to monitor the lifetime of the main thread, triggeringshutdown()automatically in case the user forgets to call it, or their code crashed.You should not rely on this functionality in your program, it is only intended as a fail-safe and to simplify the API for new users. In particular, alternative Python implementations may not be able to support watching the main thread.
-
shutdown_timeout = 5.0 Seconds grace to allow
streamsto shutdown gracefully before force-disconnecting them duringshutdown().
-
Utility Functions¶
A random assortment of utility functions useful on masters and children.
-
mitogen.utils.cast(obj)¶ Many tools love to subclass built-in types in order to implement useful functionality, such as annotating the safety of a Unicode string, or adding additional methods to a dict. However, cPickle loves to preserve those subtypes during serialization, resulting in CallError during
callin the target when it tries to deserialize the data.This function walks the object graph obj, producing a copy with any custom sub-types removed. The functionality is not default since the resulting walk may be computationally expensive given a large enough graph.
See RPC Serialization Rules for a list of supported types.
Parameters: obj – Object to undecorate. Returns: Undecorated object.
-
mitogen.utils.disable_site_packages()¶ Remove all entries mentioning
site-packagesorExtrasfrom the system path. Used primarily for testing on OS X within a virtualenv, where OS X bundles some ancient version of thesixmodule.
-
mitogen.utils.log_to_file(path=None, io=False, level='INFO')¶ Install a new
logging.Handlerwriting applications logs to the filesystem. Useful when debugging slave IO problems.Parameters to this function may be overridden at runtime using environment variables. See Logging Environment Variables.
Parameters: - path (str) – If not
None, a filesystem path to write logs to. Otherwise, logs are written tosys.stderr. - io (bool) – If
True, include extremely verbose IO logs in the output. Useful for debugging hangs, less useful for debugging application code. - level (str) – Name of the
loggingpackage constant that is the minimum level to log at. Useful levels areDEBUG,INFO,WARNING, andERROR.
- path (str) – If not
-
mitogen.utils.run_with_router(func, *args, **kwargs)¶ Arrange for func(router, *args, **kwargs) to run with a temporary
mitogen.master.Router, ensuring the Router and Broker are correctly shut down during normal or exceptional return.Returns: func’s return value.
-
@mitogen.utils.with_router¶ Decorator version of
run_with_router(). Example:@with_router def do_stuff(router, arg): pass do_stuff(blah, 123)
Exceptions¶
-
class
mitogen.core.Error(fmt=None, *args)¶ Base for all exceptions raised by Mitogen.
Parameters:
-
class
mitogen.core.CallError(fmt=None, *args)¶ Serializable
Errorsubclass raised whenContext.call()fails. A copy of the traceback from the external context is appended to the exception message.
-
class
mitogen.core.ChannelError(fmt=None, *args)¶ Raised when a channel dies or has been closed.
-
class
mitogen.core.LatchError(fmt=None, *args)¶ Raised when an attempt is made to use a
mitogen.core.Latchthat has been marked closed.
-
class
mitogen.core.StreamError(fmt=None, *args)¶ Raised when a stream cannot be established.
-
class
mitogen.core.TimeoutError(fmt=None, *args)¶ Raised when a timeout occurs on a stream.