API Reference

Package Layout

mitogen Package

On the Mitogen master, this is imported from mitogen/__init__.py as would be expected. On the slave, it is built dynamically during startup.

mitogen.__version__ = (0, 2, 3)

Library version as a tuple.

mitogen.is_master = True

This is False in slave contexts. Previously it was used to prevent re-execution of __main__ in single file programs, however that now happens automatically.

mitogen.context_id = 0

This is 0 in a master, otherwise it is the master-assigned ID unique to the slave context used for message routing.

mitogen.parent_id = None

This is None in a master, otherwise it is the master-assigned ID unique to the slave’s parent context.

mitogen.parent_ids = []

This is an empty list in a master, otherwise it is a list of parent context IDs ordered from most direct to least direct.

mitogen.main(log_level='INFO', profiling=False)

Convenience decorator primarily useful for writing discardable test scripts.

In the master process, when func is defined in the __main__ module, arranges for func(router) to be invoked immediately, with mitogen.master.Router construction and destruction handled just as in mitogen.utils.run_with_router(). In slaves, this function does nothing.

Parameters:

Example:

import mitogen
import requests

def get_url(url):
    return requests.get(url).text

@mitogen.main()
def main(router):
    z = router.ssh(hostname='k3')
    print(z.call(get_url, 'https://example.org/')))))

mitogen.core

This module implements most package functionality, but remains separate from non-essential code in order to reduce its size, since it is also serves as the bootstrap implementation sent to every new slave context.

@mitogen.core.takes_econtext

Decorator that marks a function or class method to automatically receive a kwarg named econtext, referencing the mitogen.core.ExternalContext active in the context in which the function is being invoked in. The decorator is only meaningful when the function is invoked via CALL_FUNCTION.

When the function is invoked directly, econtext must still be passed to it explicitly.

@mitogen.core.takes_router

Decorator that marks a function or class method to automatically receive a kwarg named router, referencing the mitogen.core.Router active in the context in which the function is being invoked in. The decorator is only meaningful when the function is invoked via CALL_FUNCTION.

When the function is invoked directly, router must still be passed to it explicitly.

mitogen.master

This module implements functionality required by master processes, such as starting new contexts via SSH. Its size is also restricted, since it must be sent to any context that will be used to establish additional child contexts.

mitogen.parent

This module defines functionality common to master and parent processes. It is sent to any child context that is due to become a parent, due to recursive connection.

mitogen.fakessh

_images/fakessh.svg

mitogen.fakessh is a stream implementation that starts a subprocess with its environment modified such that PATH searches for ssh return a Mitogen implementation of SSH. When invoked, this implementation arranges for the command line supplied by the caller to be executed in a remote context, reusing the parent context’s (possibly proxied) connection to that remote context.

This allows tools like rsync and scp to transparently reuse the connections and tunnels already established by the host program to connect to a target machine, without wasteful redundant SSH connection setup, 3-way handshakes, or firewall hopping configurations, and enables these tools to be used in impossible scenarios, such as over sudo with requiretty enabled.

The fake ssh command source is written to a temporary file on disk, and consists of a copy of the mitogen.core source code (just like any other child context), with a line appended to cause it to connect back to the host process over an FD it inherits. As there is no reliance on an existing filesystem file, it is possible for child contexts to use fakessh.

As a consequence of connecting back through an inherited FD, only one SSH invocation is possible, which is fine for tools like rsync, however in future this restriction will be lifted.

Sequence:

  1. fakessh Context and Stream created by parent context. The stream’s buffer has a _fakessh_main() CALL_FUNCTION enqueued.

  2. Target program (rsync/scp/sftp) invoked, which internally executes ssh from PATH.

  3. mitogen.core bootstrap begins, recovers the stream FD inherited via the target program, established itself as the fakessh context.

  4. _fakessh_main() CALL_FUNCTION is read by fakessh context,

    1. sets up IoPump for stdio, registers stdin_handle for local context.

    2. Enqueues CALL_FUNCTION for _start_slave() invoked in target context,

      1. the program from the ssh command line is started
      2. sets up IoPump for ssh command line process’s stdio pipes
      3. returns (control_handle, stdin_handle) to _fakessh_main()
  5. _fakessh_main() receives control/stdin handles from from _start_slave(),

    1. registers remote’s stdin_handle with local IoPump.
    2. sends (“start”, local_stdin_handle) to remote’s control_handle
    3. registers local IoPump with mitogen.core.Broker.
    4. loops waiting for local stdout closed && remote stdout closed
  6. _start_slave() control channel receives (“start”, stdin_handle),

    1. registers remote’s stdin_handle with local IoPump
    2. registers local IoPump with mitogen.core.Broker.
    3. loops waiting for local stdout closed && remote stdout closed
mitogen.fakessh.run(dest, router, args, daedline=None, econtext=None)

Run the command specified by args such that PATH searches for SSH by the command will cause its attempt to use SSH to execute a remote program to be redirected to use mitogen to execute that program using the context dest instead.

Parameters:
  • args (list[str]) – Argument vector.
  • dest (mitogen.core.Context) – The destination context to execute the SSH command line in.
  • router (mitogen.core.Router) –
  • args – Command line arguments for local program, e.g. ['rsync', '/tmp', 'remote:/tmp']
Returns:

Exit status of the child process.

Message Class

class mitogen.core.Message

Messages are the fundamental unit of communication, comprising fields from the Stream Protocol header, an optional reference to the receiving mitogen.core.Router for ingress messages, and helper methods for deserialization and generating replies.

router

The mitogen.core.Router responsible for routing the message. This is None for locally originated messages.

receiver

The mitogen.core.Receiver over which the message was last received. Part of the mitogen.select.Select interface. Defaults to None.

dst_id

Integer target context ID. mitogen.core.Router delivers messages locally when their dst_id matches mitogen.context_id, otherwise they are routed up or downstream.

src_id

Integer source context ID. Used as the target of replies if any are generated.

auth_id

The context ID under whose authority the message is acting. See Source Verification.

handle

Integer target handle in the destination context. This is one of the Standard Handles, or a dynamically generated handle used to receive a one-time reply, such as the return value of a function call.

reply_to

Integer target handle to direct any reply to this message. Used to receive a one-time reply, such as the return value of a function call. IS_DEAD has a special meaning when it appears in this field.

data

Message data, which may be raw or pickled.

is_dead

True if reply_to is set to the magic value mitogen.core.IS_DEAD, indicating the sender considers the channel dead.

__init__(**kwargs)

Construct a message from from the supplied kwargs. src_id and auth_id are always set to mitogen.context_id.

classmethod pickled(obj, **kwargs)

Construct a pickled message, setting data to the serialization of obj, and setting remaining fields using kwargs.

Returns:The new message.
unpickle(throw=True)

Unpickle data, optionally raising any exceptions present.

Parameters:

throw (bool) – If True, raise exceptions, otherwise it is the caller’s responsibility.

Raises:
reply(obj, router=None, **kwargs)

Compose a reply to this message and send it using router, or router is router is None.

Parameters:
  • obj – Either a Message, or an object to be serialized in order to construct a new message.
  • router – Optional router to use if router is None.
  • kwargs – Optional keyword parameters overriding message fields in the reply.

Router Class

class mitogen.core.Router

Route messages between parent and child contexts, and invoke handlers defined on our parent context. Router.route() straddles the Broker and user threads, it is safe to call anywhere.

Note: This is the somewhat limited core version of the Router class used by child contexts. The master subclass is documented below this one.

unidirectional

When True, permit children to only communicate with the current context or a parent of the current context. Routing between siblings or children of parents is prohibited, ensuring no communication is possible between intentionally partitioned networks, such as when a program simultaneously manipulates hosts spread across a corporate and a production network, or production networks that are otherwise air-gapped.

Sending a prohibited message causes an error to be logged and a dead message to be sent in reply to the errant message, if that message has reply_to set.

The value of unidirectional becomes the default for the local() unidirectional parameter.

stream_by_id(dst_id)

Return the mitogen.core.Stream that should be used to communicate with dst_id. If a specific route for dst_id is not known, a reference to the parent context’s stream is returned.

add_route(target_id, via_id)

Arrange for messages whose dst_id is target_id to be forwarded on the directly connected stream for via_id. This method is called automatically in response to ADD_ROUTE messages, but remains public for now while the design has not yet settled, and situations may arise where routing is not fully automatic.

register(context, stream)

Register a new context and its associated stream, and add the stream’s receive side to the I/O multiplexer. This This method remains public for now while hte design has not yet settled.

add_handler(fn, handle=None, persist=True, respondent=None, policy=None)

Invoke fn(msg) for each Message sent to handle from this context. Unregister after one invocation if persist is False. If handle is None, a new handle is allocated and returned.

Parameters:
  • handle (int) – If not None, an explicit handle to register, usually one of the mitogen.core.* constants. If unspecified, a new unused handle will be allocated.
  • persist (bool) – If False, the handler will be unregistered after a single message has been received.
  • respondent (mitogen.core.Context) –

    Context that messages to this handle are expected to be sent from. If specified, arranges for a dead message to be delivered to fn when disconnection of the context is detected.

    In future respondent will likely also be used to prevent other contexts from sending messages to the handle.

  • policy (function) –

    Function invoked as policy(msg, stream) where msg is a mitogen.core.Message about to be delivered, and stream is the mitogen.core.Stream on which it was received. The function must return True, otherwise an error is logged and delivery is refused.

    Two built-in policy functions exist:

    • mitogen.core.has_parent_authority(): requires the message arrived from a parent context, or a context acting with a parent context’s authority (auth_id).
    • mitogen.parent.is_immediate_child(): requires the message arrived from an immediately connected child, for use in messaging patterns where either something becomes buggy or insecure by permitting indirect upstream communication.

    In case of refusal, and the message’s reply_to field is nonzero, a mitogen.core.CallError is delivered to the sender indicating refusal occurred.

Returns:

handle, or if handle was None, the newly allocated handle.

del_handler(handle)

Remove the handle registered for handle

Raises:KeyError – The handle wasn’t registered.
_async_route(msg, stream=None)

Arrange for msg to be forwarded towards its destination. If its destination is the local context, then arrange for it to be dispatched using the local handlers.

This is a lower overhead version of route() that may only be called from the I/O multiplexer thread.

Parameters:stream (mitogen.core.Stream) – If not None, a reference to the stream the message arrived on. Used for performing source route verification, to ensure sensitive messages such as CALL_FUNCTION arrive only from trusted contexts.
route(msg)

Arrange for the Message msg to be delivered to its destination using any relevant downstream context, or if none is found, by forwarding the message upstream towards the master context. If msg is destined for the local context, it is dispatched using the handles registered with add_handler().

This may be called from any thread.

class mitogen.master.Router(broker=None)

Extend mitogen.core.Router with functionality useful to masters, and child contexts who later become masters. Currently when this class is required, the target context’s router is upgraded at runtime.

Note

You may construct as many routers as desired, and use the same broker for multiple routers, however usually only one broker and router need exist. Multiple routers may be useful when dealing with separate trust domains, for example, manipulating infrastructure belonging to separate customers or projects.

Parameters:broker (mitogen.master.Broker) – Broker instance to use. If not specified, a private Broker is created.
profiling

When True, cause the broker thread and any subsequent broker and main threads existing in any child to write /tmp/mitogen.stats.<pid>.<thread_name>.log containing a cProfile dump on graceful exit. Must be set prior to construction of any Broker, e.g. via:

mitogen.master.Router.profiling = True
enable_debug()

Cause this context and any descendant child contexts to write debug logs to /tmp/mitogen.<pid>.log.

allocate_id()

Arrange for a unique context ID to be allocated and associated with a route leading to the active context. In masters, the ID is generated directly, in children it is forwarded to the master via an ALLOCATE_ID message that causes the master to emit matching ADD_ROUTE messages prior to replying.

context_by_id(context_id, via_id=None)

Messy factory/lookup function to find a context by its ID, or construct it. In future this will be replaced by a much more sensible interface.

Context Factories

fork(on_fork=None, on_start=None, debug=False, profiling=False, via=None)

Construct a context on the local machine by forking the current process. The forked child receives a new identity, sets up a new broker and router, and responds to function calls identically to children created using other methods.

For long-lived processes, local() is always better as it guarantees a pristine interpreter state that inherited little from the parent. Forking should only be used in performance-sensitive scenarios where short-lived children must be spawned to isolate potentially buggy code, and only after accounting for all the bad things possible as a result of, at a minimum:

  • Files open in the parent remaining open in the child, causing the lifetime of the underlying object to be extended indefinitely.
    • From the perspective of external components, this is observable in the form of pipes and sockets that are never closed, which may break anything relying on closure to signal protocol termination.
    • Descriptors that reference temporary files will not have their disk space reclaimed until the child exits.
  • Third party package state, such as urllib3’s HTTP connection pool, attempting to write to file descriptors shared with the parent, causing random failures in both parent and child.
  • UNIX signal handlers installed in the parent process remaining active in the child, despite associated resources, such as service threads, child processes, resource usage counters or process timers becoming absent or reset in the child.
  • Library code that makes assumptions about the process ID remaining unchanged, for example to implement inter-process locking, or to generate file names.
  • Anonymous MAP_PRIVATE memory mappings whose storage requirement doubles as either parent or child dirties their pages.
  • File-backed memory mappings that cannot have their space freed on disk due to the mapping living on in the child.
  • Difficult to diagnose memory usage and latency spikes due to object graphs becoming unreferenced in either parent or child, causing immediate copy-on-write to large portions of the process heap.
  • Locks held in the parent causing random deadlocks in the child, such as when another thread emits a log entry via the logging package concurrent to another thread calling fork().
  • Objects existing in Thread-Local Storage of every non-fork() thread becoming permanently inaccessible, and never having their object destructors called, including TLS usage by native extension code, triggering many new variants of all the issues above.
  • Pseudo-Random Number Generator state that is easily observable by network peers to be duplicate, violating requirements of cryptographic protocols through one-time state reuse. In the worst case, children continually reuse the same state due to repeatedly forking from a static parent.

fork() cleans up Mitogen-internal objects, in addition to locks held by the logging package, reseeds random.random(), and the OpenSSL PRNG via ssl.RAND_add(), but only if the ssl module is already loaded. You must arrange for your program’s state, including any third party packages in use, to be cleaned up by specifying an on_fork function.

The associated stream implementation is mitogen.fork.Stream.

Parameters:
  • on_fork (function) – Function invoked as on_fork() from within the child process. This permits supplying a program-specific cleanup function to break locks and close file descriptors belonging to the parent from within the child.
  • on_start (function) – Invoked as on_start(econtext) from within the child process after it has been set up, but before the function dispatch loop starts. This permits supplying a custom child main function that inherits rich data structures that cannot normally be passed via a serialization.
  • via (mitogen.core.Context) – Same as the via parameter for local().
  • debug (bool) – Same as the debug parameter for local().
  • profiling (bool) – Same as the profiling parameter for local().
local(remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None)

Construct a context on the local machine as a subprocess of the current process. The associated stream implementation is mitogen.master.Stream.

Parameters:
  • remote_name (str) –

    The argv[0] suffix for the new process. If remote_name is test, the new process argv[0] will be mitogen:test.

    If unspecified, defaults to <username>@<hostname>:<pid>.

    This variable cannot contain slash characters, as the resulting argv[0] must be presented in such a way as to allow Python to determine its installation prefix. This is required to support virtualenv.

  • python_path (str|list) –

    String or list path to the Python interpreter to use for bootstrap. Defaults to sys.executable for local connections, and python for remote connections.

    It is possible to pass a list to invoke Python wrapped using another tool, such as ["/usr/bin/env", "python"].

  • debug (bool) – If True, arrange for debug logging (enable_debug()) to be enabled in the new context. Automatically True when enable_debug() has been called, but may be used selectively otherwise.
  • unidirectional (bool) – If True, arrange for the child’s router to be constructed with unidirectional routing enabled. Automatically True when it was enabled for this router, but may still be explicitly set to False.
  • connect_timeout (float) – Fractional seconds to wait for the subprocess to indicate it is healthy. Defaults to 30 seconds.
  • profiling (bool) – If True, arrange for profiling (profiling) to be enabled in the new context. Automatically True when profiling is True, but may be used selectively otherwise.
  • via (mitogen.core.Context) –

    If not None, arrange for construction to occur via RPCs made to the context via, and for ADD_ROUTE messages to be generated as appropriate.

    # SSH to the remote machine.
    remote_machine = router.ssh(hostname='mybox.com')
    
    # Use the SSH connection to create a sudo connection.
    remote_root = router.sudo(username='root', via=remote_machine)
    
doas(username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, **kwargs)

Construct a context on the local machine over a doas invocation. The doas process is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • username (str) – Username to use, defaults to root.
  • password (str) – The account password to use if requested.
  • doas_path (str) – Filename or complete path to the doas binary. PATH will be searched if given as a filename. Defaults to doas.
  • password_prompt (bytes) – A string that indicates doas is requesting a password. Defaults to Password:.
  • incorrect_prompts (list) – List of bytestrings indicating the password is incorrect. Defaults to (b”doas: authentication failed”).
Raises:

mitogen.doas.PasswordError – A password was requested but none was provided, the supplied password was incorrect, or the target account did not exist.

docker(container=None, image=None, docker_path=None, **kwargs)

Construct a context on the local machine within an existing or temporary new Docker container using the docker program. One of container or image must be specified.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • container (str) – Existing container to connect to. Defaults to None.
  • username (str) – Username within the container to setuid() to. Defaults to None, which Docker interprets as root.
  • image (str) – Image tag to use to construct a temporary container. Defaults to None.
  • docker_path (str) – Filename or complete path to the Docker binary. PATH will be searched if given as a filename. Defaults to docker.
jail(container, jexec_path=None, **kwargs)

Construct a context on the local machine within a FreeBSD jail using the jexec program.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • container (str) – Existing container to connect to. Defaults to None.
  • username (str) – Username within the container to setuid() to. Defaults to None, which jexec interprets as root.
  • jexec_path (str) – Filename or complete path to the jexec binary. PATH will be searched if given as a filename. Defaults to /usr/sbin/jexec.
kubectl(pod, kubectl_path=None, kubectl_args=None, **kwargs)

Construct a context in a container via the Kubernetes kubectl program.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • pod (str) – Kubernetes pod to connect to.
  • kubectl_path (str) – Filename or complete path to the kubectl binary. PATH will be searched if given as a filename. Defaults to kubectl.
  • kubectl_args (list) – Additional arguments to pass to the kubectl command.
lxc(container, lxc_attach_path=None, **kwargs)

Construct a context on the local machine within an LXC classic container using the lxc-attach program.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • container (str) – Existing container to connect to. Defaults to None.
  • lxc_attach_path (str) – Filename or complete path to the lxc-attach binary. PATH will be searched if given as a filename. Defaults to lxc-attach.
lxc(container, lxc_attach_path=None, **kwargs)

Construct a context on the local machine within a LXD container using the lxc program.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • container (str) – Existing container to connect to. Defaults to None.
  • lxc_path (str) – Filename or complete path to the lxc binary. PATH will be searched if given as a filename. Defaults to lxc.
setns(container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, **kwargs)

Construct a context in the style of local(), but change the active Linux process namespaces via calls to setns(1) before executing Python.

The namespaces to use, and the active root file system are taken from the root PID of a running Docker, LXC, LXD, or systemd-nspawn container.

A program is required only to find the root PID, after which management of the child Python interpreter is handled directly.

Parameters:
  • container (str) – Container to connect to.
  • kind (str) – One of docker, lxc, lxd or machinectl.
  • username (str) – Username within the container to setuid() to. Defaults to root.
  • docker_path (str) – Filename or complete path to the Docker binary. PATH will be searched if given as a filename. Defaults to docker.
  • lxc_path (str) – Filename or complete path to the LXD lxc binary. PATH will be searched if given as a filename. Defaults to lxc.
  • lxc_info_path (str) – Filename or complete path to the LXC lxc-info binary. PATH will be searched if given as a filename. Defaults to lxc-info.
  • machinectl_path (str) – Filename or complete path to the machinectl binary. PATH will be searched if given as a filename. Defaults to machinectl.
su(username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, **kwargs)

Construct a context on the local machine over a su invocation. The su process is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • username (str) – Username to pass to su, defaults to root.
  • password (str) – The account password to use if requested.
  • su_path (str) – Filename or complete path to the su binary. PATH will be searched if given as a filename. Defaults to su.
  • password_prompt (bytes) – The string that indicates su is requesting a password. Defaults to Password:.
  • incorrect_prompts (str) – Strings that signal the password is incorrect. Defaults to (“su: sorry”, “su: authentication failure”).
Raises:

mitogen.su.PasswordError – A password was requested but none was provided, the supplied password was incorrect, or (on BSD) the target account did not exist.

sudo(username=None, sudo_path=None, password=None, **kwargs)

Construct a context on the local machine over a sudo invocation. The sudo process is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • username (str) – Username to pass to sudo as the -u parameter, defaults to root.
  • sudo_path (str) – Filename or complete path to the sudo binary. PATH will be searched if given as a filename. Defaults to sudo.
  • password (str) – The password to use if/when sudo requests it. Depending on the sudo configuration, this is either the current account password or the target account password. mitogen.sudo.PasswordError will be raised if sudo requests a password but none is provided.
  • set_home (bool) – If True, request sudo set the HOME environment variable to match the target UNIX account.
  • preserve_env (bool) – If True, request sudo to preserve the environment of the parent process.
  • sudo_args (list) – Arguments in the style of sys.argv that would normally be passed to sudo. The arguments are parsed in-process to set equivalent parameters. Re-parsing ensures unsupported options cause mitogen.core.StreamError to be raised, and that attributes of the stream match the actual behaviour of sudo.
ssh(hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, **kwargs)

Construct a remote context over an OpenSSH ssh invocation.

The ssh process is started in a newly allocated pseudo-terminal to support typing interactive passwords and responding to prompts, if a password is specified, or check_host_keys=accept. In other scenarios, BatchMode is enabled and no PTY is allocated. For many-target configurations, both options should be avoided as most systems have a conservative limit on the number of pseudo-terminals that may exist.

Accepts all parameters accepted by local(), in addition to:

Parameters:
  • username (str) – The SSH username; default is unspecified, which causes SSH to pick the username to use.
  • ssh_path (str) – Absolute or relative path to ssh. Defaults to ssh.
  • ssh_args (list) – Additional arguments to pass to the SSH command.
  • port (int) – Port number to connect to; default is unspecified, which causes SSH to pick the port number.
  • check_host_keys (str) –

    Specifies the SSH host key checking mode. Defaults to enforce.

    • ignore: no host key checking is performed. Connections never fail due to an unknown or changed host key.
    • accept: known hosts keys are checked to ensure they match, new host keys are automatically accepted and verified in future connections.
    • enforce: known host keys are checked to ensure they match, unknown hosts cause a connection failure.
  • password (str) – Password to type if/when ssh requests it. If not specified and a password is requested, mitogen.ssh.PasswordError is raised.
  • identity_file (str) –

    Path to an SSH private key file to use for authentication. Default is unspecified, which causes SSH to pick the identity file.

    When this option is specified, only identity_file will be used by the SSH client to perform authenticaion; agent authentication is automatically disabled, as is reading the default private key from ~/.ssh/id_rsa, or ~/.ssh/id_dsa.

  • identities_only (bool) – If True and a password or explicit identity file is specified, instruct the SSH client to disable any authentication identities inherited from the surrounding environment, such as those loaded in any running ssh-agent, or default key files present in ~/.ssh. This ensures authentication attempts only occur using the supplied password or SSH key.
  • compression (bool) – If True, enable ssh compression support. Compression has a minimal effect on the size of modules transmitted, as they are already compressed, however it has a large effect on every remaining message in the otherwise uncompressed stream protocol, such as function call arguments and return values.
  • ssh_debug_level (int) – Optional integer 0..3 indicating the SSH client debug level.
Raises:
  • mitogen.ssh.PasswordError – A password was requested but none was specified, or the specified password was incorrect.
  • mitogen.ssh.HostKeyError – When check_host_keys is set to either accept, indicates a previously recorded key no longer matches the remote machine. When set to enforce, as above, but additionally indicates no previously recorded key exists for the remote machine.

Context Class

class mitogen.core.Context

Represent a remote context regardless of connection method.

Note: This is the somewhat limited core version of the Context class used by child contexts. The master subclass is documented below this one.

send(msg)

Arrange for msg to be delivered to this context. dst_id is set to the target context ID.

Parameters:msg (mitogen.core.Message) – The message.
send_async(msg, persist=False)

Arrange for msg to be delivered to this context, with replies directed to a newly constructed receiver. dst_id is set to the target context ID, and reply_to is set to the newly constructed receiver’s handle.

Parameters:
  • persist (bool) – If False, the handler will be unregistered after a single message has been received.
  • msg (mitogen.core.Message) – The message.
Returns:

mitogen.core.Receiver configured to receive any replies sent to the message’s reply_to handle.

send_await(msg, deadline=None)

Like send_async(), but expect a single reply (persist=False) delivered within deadline seconds.

Parameters:
Returns:

The deserialized reply.

Raises:

mitogen.core.TimeoutError – No message was received and deadline passed.

class mitogen.parent.CallChain(context, pipelined=False)

Deliver mitogen.core.CALL_FUNCTION messages to a target context, optionally threading related calls so an exception in an earlier call cancels subsequent calls.

Parameters:

call(), call_no_reply() and call_async() normally issue calls and produce responses with no memory of prior exceptions. If a call made with call_no_reply() fails, the exception is logged to the target context’s logging framework.

Pipelining

When pipelining is enabled, if an exception occurs during a call, subsequent calls made by the same CallChain fail with the same exception, including those already in-flight on the network, and no further calls execute until reset() is invoked.

No exception is logged for calls made with call_no_reply(), instead the exception is saved and reported as the result of subsequent call() or call_async() calls.

Sequences of asynchronous calls can be made without wasting network round-trips to discover if prior calls succeed, and chains originating from multiple unrelated source contexts may overlap concurrently at a target context without interference.

In this example, 4 calls complete in one round-trip:

chain = mitogen.parent.CallChain(context, pipelined=True)
chain.call_no_reply(os.mkdir, '/tmp/foo')

# If previous mkdir() failed, this never runs:
chain.call_no_reply(os.mkdir, '/tmp/foo/bar')

# If either mkdir() failed, this never runs, and the exception is
# asynchronously delivered to the receiver.
recv = chain.call_async(subprocess.check_output, '/tmp/foo')

# If anything so far failed, this never runs, and raises the exception.
chain.call(do_something)

# If this code was executed, the exception would also be raised.
if recv.get().unpickle() == 'baz':
    pass

When pipelining is enabled, reset() must be invoked to ensure any exception is discarded, otherwise unbounded memory usage is possible in long-running programs. The context manager protocol is supported to ensure reset() is always invoked:

with mitogen.parent.CallChain(context, pipelined=True) as chain:
    chain.call_no_reply(...)
    chain.call_no_reply(...)
    chain.call_no_reply(...)
    chain.call(...)

# chain.reset() automatically invoked.
call(fn, *args, **kwargs)

Like call_async(), but block until the return value is available. Equivalent to:

call_async(fn, *args, **kwargs).get().unpickle()
Returns:The function’s return value.
Raises:mitogen.core.CallError – An exception was raised in the remote context during execution.
call_async(fn, *args, **kwargs)

Arrange for fn(*args, **kwargs) to be invoked on the context’s main thread.

Parameters:
  • fn

    A free function in module scope or a class method of a class directly reachable from module scope:

    # mymodule.py
    
    def my_func():
        '''A free function reachable as mymodule.my_func'''
    
    class MyClass:
        @classmethod
        def my_classmethod(cls):
            '''Reachable as mymodule.MyClass.my_classmethod'''
    
        def my_instancemethod(self):
            '''Unreachable: requires a class instance!'''
    
        class MyEmbeddedClass:
            @classmethod
            def my_classmethod(cls):
                '''Not directly reachable from module scope!'''
    
  • args (tuple) – Function arguments, if any. See RPC Serialization Rules for permitted types.
  • kwargs (dict) – Function keyword arguments, if any. See RPC Serialization Rules for permitted types.
Returns:

mitogen.core.Receiver configured to receive the result of the invocation:

recv = context.call_async(os.check_output, 'ls /tmp/')
try:
    # Prints output once it is received.
    msg = recv.get()
    print(msg.unpickle())
except mitogen.core.CallError, e:
    print('Call failed:', str(e))

Asynchronous calls may be dispatched in parallel to multiple contexts and consumed as they complete using mitogen.select.Select.

call_no_reply(fn, *args, **kwargs)

Like call_async(), but do not wait for a return value, and inform the target context no reply is expected. If the call fails and pipelining is disabled, the exception will be logged to the target context’s logging framework.

reset()

Instruct the target to forget any related exception.

class mitogen.parent.Context

Extend mitogen.core.Context with functionality useful to masters, and child contexts who later become parents. Currently when this class is required, the target context’s router is upgraded at runtime.

default_call_chain

A CallChain instance constructed by default, with pipelining disabled. call(), call_async() and call_no_reply() use this instance.

shutdown(wait=False)

Arrange for the context to receive a SHUTDOWN message, triggering graceful shutdown.

Due to a lack of support for timers, no attempt is made yet to force terminate a hung context using this method. This will be fixed shortly.

Parameters:wait (bool) – If True, block the calling thread until the context has completely terminated.
Returns:If wait is False, returns a mitogen.core.Latch whose get() method returns None when shutdown completes. The timeout parameter may be used to implement graceful timeouts.
call_async(fn, *args, **kwargs)

See CallChain.call_async().

call(fn, *args, **kwargs)

See CallChain.call().

call_no_reply(fn, *args, **kwargs)

See CallChain.call_no_reply().

Receiver Class

class mitogen.core.Receiver(router, handle=None, persist=True, respondent=None)

Receivers are used to wait for pickled responses from another context to be sent to a handle registered in this context. A receiver may be single-use (as in the case of mitogen.parent.Context.call_async()) or multiple use.

Parameters:
  • router (mitogen.core.Router) – Router to register the handler on.
  • handle (int) – If not None, an explicit handle to register, otherwise an unused handle is chosen.
  • persist (bool) – If True, do not unregister the receiver’s handler after the first message.
  • respondent (mitogen.core.Context) – Reference to the context this receiver is receiving from. If not None, arranges for the receiver to receive a dead message if messages can no longer be routed to the context, due to disconnection or exit.
notify = None

If not None, a reference to a function invoked as notify(receiver) when a new message is delivered to this receiver. Used by mitogen.select.Select to implement waiting on multiple receivers.

to_sender()

Return a mitogen.core.Sender configured to deliver messages to this receiver. Since a Sender can be serialized, this makes it convenient to pass (context_id, handle) pairs around:

def deliver_monthly_report(sender):
    for line in open('monthly_report.txt'):
        sender.send(line)
    sender.close()

remote = router.ssh(hostname='mainframe')
recv = mitogen.core.Receiver(router)
remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv:
    print(msg)
empty()

Return True if calling get() would block.

As with Queue.Queue, True may be returned even though a subsequent call to get() will succeed, since a message may be posted at any moment between empty() and get().

empty() is only useful to avoid a race while installing notify:

recv.notify = _my_notify_function
if not recv.empty():
    _my_notify_function(recv)

# It is guaranteed the receiver was empty after the notification
# function was installed, or that it was non-empty and the
# notification function was invoked at least once.
close()

Cause mitogen.core.ChannelError to be raised in any thread waiting in get() on this receiver.

get(timeout=None)

Sleep waiting for a message to arrive on this receiver.

Parameters:

timeout (float) – If not None, specifies a timeout in seconds.

Raises:
Returns:

(msg, data) tuple, where msg is the mitogen.core.Message that was received, and data is its unpickled data part.

get_data(timeout=None)

Like get(), except only return the data part.

__iter__()

Block and yield (msg, data) pairs delivered to this receiver until mitogen.core.ChannelError is raised.

Sender Class

class mitogen.core.Sender(context, dst_handle)

Senders are used to send pickled messages to a handle in another context, it is the inverse of mitogen.core.Sender.

Senders may be serialized, making them convenient to wire up data flows. See mitogen.core.Receiver.to_sender() for more information.

Parameters:
  • context (mitogen.core.Context) – Context to send messages to.
  • dst_handle (int) – Destination handle to send messages to.
close()

Send a dead message to the remote end, causing ChannelError() to be raised in any waiting thread.

send(data)

Send data to the remote end.

Select Class

class mitogen.select.Select(receivers=(), oneshot=True)

Support scatter/gather asynchronous calls and waiting on multiple receivers, channels, and sub-Selects. Accepts a sequence of mitogen.core.Receiver or mitogen.select.Select instances and returns the first value posted to any receiver or select.

If oneshot is True, then remove each receiver as it yields a result; since __iter__() terminates once the final receiver is removed, this makes it convenient to respond to calls made in parallel:

total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]

for msg in mitogen.select.Select(recvs):
    print('Got %s from %s' % (msg, msg.receiver))
    total += msg.unpickle()

# Iteration ends when last Receiver yields a result.
print('Received total %s from %s receivers' % (total, len(recvs)))

Select may drive a long-running scheduler:

with mitogen.select.Select(oneshot=False) as select:
    while running():
        for msg in select:
            process_result(msg.receiver.context, msg.unpickle())
        for context, workfunc in get_new_work():
            select.add(context.call_async(workfunc))

Select may be nested:

subselects = [
    mitogen.select.Select(get_some_work()),
    mitogen.select.Select(get_some_work()),
    mitogen.select.Select([
        mitogen.select.Select(get_some_work()),
        mitogen.select.Select(get_some_work())
    ])
]

for msg in mitogen.select.Select(selects):
    print(msg.unpickle())
classmethod all(it)

Take an iterable of receivers and retrieve a Message from each, returning the result of calling msg.unpickle() on each in turn. Results are returned in the order they arrived.

This is sugar for handling batch Context.call_async invocations:

print('Total disk usage: %.02fMiB' % (sum(
    mitogen.select.Select.all(
        context.call_async(get_disk_usage)
        for context in contexts
    ) / 1048576.0
),))

However, unlike in a naive comprehension such as:

recvs = [c.call_async(get_disk_usage) for c in contexts]
sum(recv.get().unpickle() for recv in recvs)

Result processing happens in the order results arrive, rather than the order requests were issued, so all() should always be faster.

get(timeout=None, block=True)

Fetch the next available value from any receiver, or raise mitogen.core.TimeoutError if no value is available within timeout seconds.

On success, the message’s receiver attribute is set to the receiver.

Parameters:
Returns:

mitogen.core.Message

Raises:
__bool__()

Return True if any receivers are registered with this select.

close()

Remove the select’s notifier function from each registered receiver, mark the associated latch as closed, and cause any thread currently sleeping in get() to be woken with mitogen.core.LatchError.

This is necessary to prevent memory leaks in long-running receivers. It is called automatically when the Python with statement is used.

empty()

Return True if calling get() would block.

As with Queue.Queue, True may be returned even though a subsequent call to get() will succeed, since a message may be posted at any moment between empty() and get().

empty() may return False even when get() would block if another thread has drained a receiver added to this select. This can be avoided by only consuming each receiver from a single thread.

__iter__(self)

Yield the result of get() until no receivers remain in the select, either because oneshot is True, or each receiver was explicitly removed via remove().

add(recv)

Add the mitogen.core.Receiver or mitogen.core.Channel recv to the select.

remove(recv)

Remove the mitogen.core.Receiver or mitogen.core.Channel recv from the select. Note that if the receiver has notified prior to remove(), then it will still be returned by a subsequent get(). This may change in a future version.

Channel Class

class mitogen.core.Channel(router, context, dst_handle, handle=None)

A channel inherits from mitogen.core.Sender and mitogen.core.Receiver to provide bidirectional functionality.

Since all handles aren’t known until after both ends are constructed, for both ends to communicate through a channel, it is necessary for one end to retrieve the handle allocated to the other and reconfigure its own channel to match. Currently this is a manual task.

Broker Class

class mitogen.core.Broker

Responsible for handling I/O multiplexing in a private thread.

Note: This is the somewhat limited core version of the Broker class used by child contexts. The master subclass is documented below.

shutdown_timeout = 3.0

Seconds grace to allow streams to shutdown gracefully before force-disconnecting them during shutdown().

defer(func, *args, *kwargs)

Arrange for func(*args, **kwargs) to be executed on the broker thread, or immediately if the current thread is the broker thread. Safe to call from any thread.

start_receive(stream)

Mark the receive_side on stream as ready for reading. Safe to call from any thread. When the associated file descriptor becomes ready for reading, BasicStream.on_receive() will be called.

stop_receive(stream)

Mark the receive_side on stream as not ready for reading. Safe to call from any thread.

_start_transmit(stream)

Mark the transmit_side on stream as ready for writing. Must only be called from the Broker thread. When the associated file descriptor becomes ready for writing, BasicStream.on_transmit() will be called.

stop_receive(stream)

Mark the transmit_side on stream as not ready for writing. Safe to call from any thread.

shutdown()

Request broker gracefully disconnect streams and stop.

join()

Wait for the broker to stop, expected to be called after shutdown().

keep_alive()

Return True if any reader’s Side.keep_alive attribute is True, or any Context is still registered that is not the master. Used to delay shutdown while some important work is in progress (e.g. log draining).

Internal Methods

_broker_main()

Handle events until shutdown(). On shutdown, invoke Stream.on_shutdown() for every active stream, then allow up to shutdown_timeout seconds for the streams to unregister themselves before forcefully calling Stream.on_disconnect().

class mitogen.master.Broker(install_watcher=True)

Note

You may construct as many brokers as desired, and use the same broker for multiple routers, however usually only one broker need exist. Multiple brokers may be useful when dealing with sets of children with differing lifetimes. For example, a subscription service where non-payment results in termination for one customer.

Parameters:install_watcher (bool) –

If True, an additional thread is started to monitor the lifetime of the main thread, triggering shutdown() automatically in case the user forgets to call it, or their code crashed.

You should not rely on this functionality in your program, it is only intended as a fail-safe and to simplify the API for new users. In particular, alternative Python implementations may not be able to support watching the main thread.

shutdown_timeout = 5.0

Seconds grace to allow streams to shutdown gracefully before force-disconnecting them during shutdown().

Utility Functions

A random assortment of utility functions useful on masters and children.

mitogen.utils.cast(obj)

Many tools love to subclass built-in types in order to implement useful functionality, such as annotating the safety of a Unicode string, or adding additional methods to a dict. However, cPickle loves to preserve those subtypes during serialization, resulting in CallError during call in the target when it tries to deserialize the data.

This function walks the object graph obj, producing a copy with any custom sub-types removed. The functionality is not default since the resulting walk may be computationally expensive given a large enough graph.

See RPC Serialization Rules for a list of supported types.

Parameters:obj – Object to undecorate.
Returns:Undecorated object.
mitogen.utils.disable_site_packages()

Remove all entries mentioning site-packages or Extras from the system path. Used primarily for testing on OS X within a virtualenv, where OS X bundles some ancient version of the six module.

mitogen.utils.log_to_file(path=None, io=False, level='INFO')

Install a new logging.Handler writing applications logs to the filesystem. Useful when debugging slave IO problems.

Parameters to this function may be overridden at runtime using environment variables. See Logging Environment Variables.

Parameters:
  • path (str) – If not None, a filesystem path to write logs to. Otherwise, logs are written to sys.stderr.
  • io (bool) – If True, include extremely verbose IO logs in the output. Useful for debugging hangs, less useful for debugging application code.
  • level (str) – Name of the logging package constant that is the minimum level to log at. Useful levels are DEBUG, INFO, WARNING, and ERROR.
mitogen.utils.run_with_router(func, *args, **kwargs)

Arrange for func(router, *args, **kwargs) to run with a temporary mitogen.master.Router, ensuring the Router and Broker are correctly shut down during normal or exceptional return.

Returns:func’s return value.
@mitogen.utils.with_router

Decorator version of run_with_router(). Example:

@with_router
def do_stuff(router, arg):
    pass

do_stuff(blah, 123)

Exceptions

class mitogen.core.Error(fmt=None, *args)

Base for all exceptions raised by Mitogen.

Parameters:
  • fmt (str) – Exception text, or format string if args is non-empty.
  • args (tuple) – Format string arguments.
class mitogen.core.CallError(fmt=None, *args)

Serializable Error subclass raised when Context.call() fails. A copy of the traceback from the external context is appended to the exception message.

class mitogen.core.ChannelError(fmt=None, *args)

Raised when a channel dies or has been closed.

class mitogen.core.LatchError(fmt=None, *args)

Raised when an attempt is made to use a mitogen.core.Latch that has been marked closed.

class mitogen.core.StreamError(fmt=None, *args)

Raised when a stream cannot be established.

class mitogen.core.TimeoutError(fmt=None, *args)

Raised when a timeout occurs on a stream.