Internal API Reference

Constants

mitogen.core.CHUNK_SIZE = 131072

Default size for calls to Side.read() or Side.write(), and the size of buffers configured by mitogen.parent.create_socketpair(). This value has many performance implications, 128KiB seems to be a sweet spot.

  • When set low, large messages cause many Broker IO loop iterations, burning CPU and reducing throughput.
  • When set high, excessive RAM is reserved by the OS for socket buffers (2x per child), and an identically sized temporary userspace buffer is allocated on each read that requires zeroing, and over a particular size may require two system calls to allocate/deallocate.

Care must be taken to ensure the underlying kernel object and receiving program support the desired size. For example,

  • Most UNIXes have TTYs with fixed 2KiB-4KiB buffers, making them unsuitable for efficient IO.
  • Different UNIXes have varying presets for pipes, which may not be configurable. On recent Linux the default pipe buffer size is 64KiB, but under memory pressure may be as low as 4KiB for unprivileged processes.
  • When communication is via an intermediary process, its internal buffers effect the speed OS buffers will drain. For example OpenSSH uses 64KiB reads.

An ideal Message has a size that is a multiple of CHUNK_SIZE inclusive of headers, to avoid wasting IO loop iterations writing small trailer chunks.

Latch Class

class mitogen.core.Latch

A latch is a Queue.Queue-like object that supports mutation and waiting from multiple threads, however unlike Queue.Queue, waiting threads always remain interruptible, so CTRL+C always succeeds, and waits where a timeout is set experience no wake up latency. These properties are not possible in combination using the built-in threading primitives available in Python 2.x.

Latches implement queues using the UNIX self-pipe trick, and a per-thread socket.socketpair() that is lazily created the first time any latch attempts to sleep on a thread, and dynamically associated with the waiting Latch only for duration of the wait.

See Waking Sleeping Threads for further discussion.

close()

Mark the latch as closed, and cause every sleeping thread to be woken, with mitogen.core.LatchError raised in each thread.

empty()

Return True if calling get() would block.

As with Queue.Queue, True may be returned even though a subsequent call to get() will succeed, since a message may be posted at any moment between empty() and get().

As with Queue.Queue, False may be returned even though a subsequent call to get() will block, since another waiting thread may be woken at any moment between empty() and get().

get(timeout=None, block=True)

Return the next enqueued object, or sleep waiting for one.

Parameters:
Raises:
Returns:

The de-queued object.

put(obj)

Enqueue an object, waking the first thread waiting for a result, if one exists.

Raises:mitogen.core.LatchErrorclose() has been called, and the object is no longer valid.

PidfulStreamHandler Class

class mitogen.core.PidfulStreamHandler(stream=None)

A logging.StreamHandler subclass used when Router.enable_debug() has been called, or the debug parameter was specified during context construction. Verifies the process ID has not changed on each call to emit(), reopening the associated log file when a change is detected.

This ensures logging to the per-process output files happens correctly even when uncooperative third party components call os.fork().

emit(record)

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

open_pid = None

PID that last opened the log file.

template = '/tmp/mitogen.%s.%s.log'

Output path template.

Side Class

class mitogen.core.Side(stream, fd, cloexec=True, keep_alive=True, blocking=False)

Represent a single side of a BasicStream. This exists to allow streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional (e.g. UNIX socket) file descriptors to operate identically.

Parameters:

During construction, the file descriptor has its os.O_NONBLOCK flag enabled using fcntl.fcntl().

close()

Call os.close() on fd if it is not None, then set it to None.

fd = None

Integer file descriptor to perform IO on, or None if close() has been called.

keep_alive = None

If True, causes presence of this side in Broker’s active reader set to defer shutdown until the side is disconnected.

read(n=131072)

Read up to n bytes from the file descriptor, wrapping the underlying os.read() call with io_op() to trap common disconnection conditions.

read() always behaves as if it is reading from a regular UNIX file; socket, pipe, and TTY disconnection errors are masked and result in a 0-sized read like a regular file.

Returns:Bytes read, or the empty to string to indicate disconnection was detected.
stream = None

The Stream for which this is a read or write side.

write(s)

Write as much of the bytes from s as possible to the file descriptor, wrapping the underlying os.write() call with io_op() to trap common disconnection connditions.

Returns:Number of bytes written, or None if disconnection was detected.

Stream Classes

class mitogen.core.BasicStream
on_disconnect(broker)

Called by Broker to force disconnect the stream. The base implementation simply closes receive_side and transmit_side and unregisters the stream from the broker.

on_receive(broker)

Called by Broker when the stream’s receive_side has been marked readable using Broker.start_receive() and the broker has detected the associated file descriptor is ready for reading.

Subclasses must implement this if Broker.start_receive() is ever called on them, and the method must call on_disconect() if reading produces an empty string.

on_shutdown(broker)

Called by Broker.shutdown() to allow the stream time to gracefully shutdown. The base implementation simply called on_disconnect().

on_transmit(broker)

Called by Broker when the stream’s transmit_side has been marked writeable using Broker._start_transmit() and the broker has detected the associated file descriptor is ready for writing.

Subclasses must implement this if Broker._start_transmit() is ever called on them.

receive_side = None

A Side representing the stream’s receive file descriptor.

transmit_side = None

A Side representing the stream’s transmit file descriptor.

class mitogen.core.Stream(router, remote_id, **kwargs)

BasicStream subclass implementing mitogen’s stream protocol.

auth_id = None

If not None, Router stamps this into Message.auth_id of every message received on this stream.

egress_ids = None

Routing records the dst_id of every message arriving from this stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.

is_privileged = False

If not False, indicates the stream has auth_id set and its value is the same as mitogen.context_id or appears in mitogen.parent_ids.

on_receive(broker)

Handle the next complete message on the stream. Raise StreamError on failure.

on_shutdown(broker)

Override BasicStream behaviour of immediately disconnecting.

on_transmit(broker)

Transmit buffered messages.

pending_bytes()

Return the number of bytes queued for transmission on this stream. This can be used to limit the amount of data buffered in RAM by an otherwise unlimited consumer.

For an accurate result, this method should be called from the Broker thread, for example by using Broker.defer_sync().

send(msg)

Send data to handle, and tell the broker we have output. May be called from any thread.

class mitogen.fork.Stream(*args, **kwargs)
construct(old_router, max_message_size, on_fork=None, debug=False, profiling=False, unidirectional=False, on_start=None)

Get the named context running on the local machine, creating it if it does not exist.

importer = None

Reference to the importer, if any, recovered from the parent.

on_fork = None

User-supplied function for cleaning up child process state.

class mitogen.parent.Stream(*args, **kwargs)

Base for streams capable of starting new slaves.

EC0_MARKER = 'MITO000\n'

Sentinel value emitted by the first stage to indicate it is ready to receive the compressed bootstrap. For mitogen.ssh this must have length of at least max(len(‘password’), len(‘debug1:’))

child_is_immediate_subprocess = True

If True, indicates the child should not be killed during graceful detachment, as it the actual process implementing the child context. In all other cases, the subprocess is SSH, sudo, or a similar tool that should be reminded to quit during disconnection.

connect_deadline = None

Derived from connect_timeout; absolute floating point UNIX timestamp after which the connection attempt should be abandoned.

connect_timeout = 30.0

Maximum time to wait for a connection attempt.

construct(max_message_size, remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, unidirectional=False, old_router=None, **kwargs)

Get the named context running on the local machine, creating it if it does not exist.

static create_child(merge_stdio=False, stderr_pipe=False, preexec_fn=None)

Function with the semantics of create_child() used to create the child process.

create_child_args = {}

Dictionary of extra kwargs passed to create_child.

debug = False

True to cause context to write verbose /tmp/mitogen.<pid>.log.

detached = False

True if the remote has indicated that it intends to detach, and should not be killed on disconnect.

diag_stream = None

If create_child supplied a diag_fd, references the corresponding DiagLogStream, allowing it to be disconnected when this stream is disconnected. Set to None if no diag_fd was present.

get_python_argv()

Return the initial argument vector elements necessary to invoke Python, by returning a 1-element list containing python_path if it is a string, or simply returning it if it is already a list.

This allows emulation of existing tools where the Python invocation may be set to e.g. [‘/usr/bin/env’, ‘python’].

max_message_size = None

Passed via Router wrapper methods, must eventually be passed to ExternalContext.main().

name_prefix = u'local'

Prefix given to default names generated by connect().

on_disconnect(broker)

Called by Broker to force disconnect the stream. The base implementation simply closes receive_side and transmit_side and unregisters the stream from the broker.

on_shutdown(broker)

Request the slave gracefully shut itself down.

pid = None

Set to the child’s PID by connect().

profiling = False

True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.

python_path = '/home/docs/checkouts/readthedocs.org/user_builds/mitogen/envs/dmw/bin/python'

The path to the remote Python interpreter.

class mitogen.ssh.Stream(*args, **kwargs)
construct(hostname, username=None, ssh_path=None, port=None, check_host_keys='enforce', password=None, identity_file=None, compression=True, ssh_args=None, keepalive_enabled=True, keepalive_count=3, keepalive_interval=15, identities_only=True, ssh_debug_level=None, **kwargs)

Get the named context running on the local machine, creating it if it does not exist.

python_path = 'python'

Default to whatever is available as ‘python’ on the remote machine, overriding sys.executable use.

ssh_debug_level = 0

Number of -v invocations to pass on command line.

ssh_path = 'ssh'

The path to the SSH binary.

class mitogen.sudo.Stream(*args, **kwargs)
construct(username=None, sudo_path=None, password=None, preserve_env=None, set_home=None, sudo_args=None, login=None, selinux_role=None, selinux_type=None, **kwargs)

Get the named context running on the local machine, creating it if it does not exist.

static create_child()

Like tty_create_child(), except attach stdin/stdout to a socketpair like create_child(), but leave stderr and the controlling TTY attached to a TTY.

Parameters:args (list) – os.execl() argument list.
Returns:(pid, socketpair_fd, tty_fd)

Other Stream Subclasses

class mitogen.core.IoLogger(broker, name, dest_fd)

BasicStream subclass that sets up redirection of a standard UNIX file descriptor back into the Python logging package.

on_receive(broker)

Called by Broker when the stream’s receive_side has been marked readable using Broker.start_receive() and the broker has detected the associated file descriptor is ready for reading.

Subclasses must implement this if Broker.start_receive() is ever called on them, and the method must call on_disconect() if reading produces an empty string.

on_shutdown(broker)

Shut down the write end of the logging socket.

class mitogen.core.Waker(broker)

BasicStream subclass implementing the UNIX self-pipe trick. Used to wake the multiplexer when another thread needs to modify its state (via a cross-thread function call).

keep_alive

Prevent immediate Broker shutdown while deferred functions remain.

on_receive(broker)

Drain the pipe and fire callbacks. Since _deferred is synchronized, defer() and on_receive() can conspire to ensure only one byte needs to be pending regardless of queue length.

Poller Class

class mitogen.core.Poller
class mitogen.parent.KqueuePoller
class mitogen.parent.EpollPoller

Importer Class

class mitogen.core.Importer(router, context, core_src, whitelist=(), blacklist=())

Import protocol implementation that fetches modules from the parent process.

Parameters:context – Context to communicate via.

Responder Class

class mitogen.master.ModuleResponder(router)
neutralize_main(path, src)

Given the source for the __main__ module, try to find where it begins conditional execution based on a “if __name__ == ‘__main__’” guard, and remove any code after that point.

RouteMonitor Class

class mitogen.parent.RouteMonitor(router, parent=None)

Generate and respond to mitogen.core.ADD_ROUTE and mitogen.core.DEL_ROUTE messages sent to the local context by maintaining a table of available routes, and propagating messages towards parents and siblings as appropriate.

RouteMonitor is responsible for generating routing messages for directly attached children. It learns of new children via notice_stream() called by Router, and subscribes to their disconnect event to learn when they disappear.

In children, constructing this class overwrites the stub mitogen.core.DEL_ROUTE handler installed by mitogen.core.ExternalContext, which is expected behaviour when a child is beging upgraded in preparation to become a parent of children of its own.

Parameters:
  • router (Router) – Router to install handlers on.
  • parent (Context) – None in the master process, or reference to the parent context we should propagate route updates towards.
get_routes(stream)

Return the set of context IDs reachable on a stream.

Parameters:stream (mitogen.core.Stream) –
Returns:set([int])
notice_stream(stream)

When this parent is responsible for a new directly connected child stream, we’re also responsible for broadcasting DEL_ROUTE upstream if/when that child disconnects.

Forwarder Class

class mitogen.parent.ModuleForwarder(router, parent_context, importer)

Respond to GET_MODULE requests in a slave by forwarding the request to our parent context, or satisfying the request from our local Importer cache.

ExternalContext Class

class mitogen.core.ExternalContext(config)

External context implementation.

broker
The :class:`mitogen.core.Broker` instance.
context
The :class:`mitogen.core.Context` instance.
channel
The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION`
requests are received.
stdout_log
The :class:`mitogen.core.IoLogger` connected to ``stdout``.
importer
The :class:`mitogen.core.Importer` instance.
stdout_log
The :class:`IoLogger` connected to ``stdout``.
stderr_log
The :class:`IoLogger` connected to ``stderr``.
_dispatch_calls()
Implementation for the main thread in every child context.

mitogen.master

class mitogen.parent.ProcessMonitor

Install a signal.SIGCHLD handler that generates callbacks when a specific child process has exitted. This class is obsolete, do not use.

add(pid, callback)

Add a callback function to be notified of the exit status of a process.

Parameters:
  • pid (int) – Process ID to be notified of.
  • callback – Function invoked as callback(status), where status is the raw exit status of the child process.

Blocking I/O Functions

These functions exist to support the blocking phase of setting up a new context. They will eventually be replaced with asynchronous equivalents.

mitogen.parent.discard_until(fd, s, deadline)

Read chunks from fd until one is encountered that ends with s. This is used to skip output produced by /etc/profile, /etc/motd and mandatory SSH banners while waiting for Stream.EC0_MARKER to appear, indicating the first stage is ready to receive the compressed mitogen.core source.

Parameters:
  • fd (int) – File descriptor to read from.
  • s (bytes) – Marker string to discard until encountered.
  • deadline (float) – Absolute UNIX timestamp after which timeout should occur.
Raises:
mitogen.parent.iter_read(fds, deadline=None)

Return a generator that arranges for up to 4096-byte chunks to be read at a time from the file descriptor fd until the generator is destroyed.

Parameters:
  • fd (int) – File descriptor to read from.
  • deadline (float) – If not None, an absolute UNIX timestamp after which timeout should occur.
Raises:
mitogen.parent.write_all(fd, s, deadline=None)

Arrange for all of bytestring s to be written to the file descriptor fd.

Parameters:
  • fd (int) – File descriptor to write to.
  • s (bytes) – Bytestring to write to file descriptor.
  • deadline (float) – If not None, absolute UNIX timestamp after which timeout should occur.
Raises:
  • mitogen.core.TimeoutError – Bytestring could not be written entirely before deadline was exceeded.
  • mitogen.parent.EofError – Stream indicated EOF, suggesting the child process has exitted.
  • mitogen.core.StreamError – File descriptor was disconnected before write could complete.

Subprocess Creation Functions

mitogen.parent.create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None)

Create a child process whose stdin/stdout is connected to a socket.

Parameters:
  • args – Argument vector for execv() call.
  • merge_stdio (bool) – If True, arrange for stderr to be connected to the stdout socketpair, rather than inherited from the parent process. This may be necessary to ensure that not TTY is connected to any stdio handle, for instance when using LXC.
  • stderr_pipe (bool) – If True and merge_stdio is False, arrange for stderr to be connected to a separate pipe, to allow any ongoing debug logs generated by e.g. SSH to be outpu as the session progresses, without interfering with stdout.
Returns:

(pid, socket_obj, :data:`None or pipe_fd)`

mitogen.parent.hybrid_tty_create_child(args)

Like tty_create_child(), except attach stdin/stdout to a socketpair like create_child(), but leave stderr and the controlling TTY attached to a TTY.

Parameters:args (list) – os.execl() argument list.
Returns:(pid, socketpair_fd, tty_fd)
mitogen.parent.tty_create_child(args)

Return a file descriptor connected to the master end of a pseudo-terminal, whose slave end is connected to stdin/stdout/stderr of a new child process. The child is created such that the pseudo-terminal becomes its controlling TTY, ensuring access to /dev/tty returns a new file descriptor open on the slave end.

Parameters:args (list) – os.execl() argument list.
Returns:(pid, tty_fd, None)

Helper Functions

mitogen.core.to_text(o)

Coerce o to Unicode by decoding it from UTF-8 if it is an instance of bytes, otherwise pass it to the str constructor. The returned object is always a plain str, any subclass is removed.

mitogen.core.has_parent_authority(msg, _stream=None)

Policy function for use with Receiver and Router.add_handler() that requires incoming messages to originate from a parent context, or on a Stream whose auth_id has been set to that of a parent context or the current context.

mitogen.core.set_cloexec(fd)

Set the file descriptor fd to automatically close on os.execve(). This has no effect on file descriptors inherited across os.fork(), they must be explicitly closed through some other means, such as mitogen.fork.on_fork().

mitogen.core.set_nonblock(fd)

Set the file descriptor fd to non-blocking mode. For most underlying file types, this causes os.read() or os.write() to raise OSError with errno.EAGAIN rather than block the thread when the underlying kernel buffer is exhausted.

mitogen.core.set_block(fd)

Inverse of set_nonblock(), i.e. cause fd to block the thread when the underlying kernel buffer is exhausted.

mitogen.core.io_op(func, *args)

Wrap func(*args) that may raise select.error, IOError, or OSError, trapping UNIX error codes relating to disconnection and retry events in various subsystems:

  • When a signal is delivered to the process on Python 2, system call retry is signalled through errno.EINTR. The invocation is automatically restarted.
  • When performing IO against a TTY, disconnection of the remote end is signalled by errno.EIO.
  • When performing IO against a socket, disconnection of the remote end is signalled by errno.ECONNRESET.
  • When performing IO against a pipe, disconnection of the remote end is signalled by errno.EPIPE.
Returns:Tuple of (return_value, disconnected), where return_value is the return value of func(*args), and disconnected is True if disconnection was detected, otherwise False.
mitogen.parent.close_nonstandard_fds()
mitogen.parent.create_socketpair()

Create a socket.socketpair() to use for use as a child process’s UNIX stdio channels. As socket pairs are bidirectional, they are economical on file descriptor usage as the same descriptor can be used for stdin and stdout. As they are sockets their buffers are tunable, allowing large buffers to be configured in order to improve throughput for file transfers and reduce mitogen.core.Broker IO loop iterations.

mitogen.master.get_child_modules(path)

Return the suffixes of submodules directly neated beneath of the package directory at path.

Parameters:path (str) – Path to the module’s source code on disk, or some PEP-302-recognized equivalent. Usually this is the module’s __file__ attribute, but is specified explicitly to avoid loading the module.
Returns:List of submodule name suffixes.
mitogen.minify.minimize_source(*args, **kwds)

Remove comments and docstrings from Python source, preserving line numbers and syntax of empty blocks.

Parameters:source (str) – The source to minimize.
Returns str:The minimized source.