Internal API Reference

Constants

mitogen.core.CHUNK_SIZE = 131072

Default size for calls to Side.read() or Side.write(), and the size of buffers configured by mitogen.parent.create_socketpair(). This value has many performance implications, 128KiB seems to be a sweet spot.

  • When set low, large messages cause many Broker IO loop iterations, burning CPU and reducing throughput.
  • When set high, excessive RAM is reserved by the OS for socket buffers (2x per child), and an identically sized temporary userspace buffer is allocated on each read that requires zeroing, and over a particular size may require two system calls to allocate/deallocate.

Care must be taken to ensure the underlying kernel object and receiving program support the desired size. For example,

  • Most UNIXes have TTYs with fixed 2KiB-4KiB buffers, making them unsuitable for efficient IO.
  • Different UNIXes have varying presets for pipes, which may not be configurable. On recent Linux the default pipe buffer size is 64KiB, but under memory pressure may be as low as 4KiB for unprivileged processes.
  • When communication is via an intermediary process, its internal buffers effect the speed OS buffers will drain. For example OpenSSH uses 64KiB reads.

An ideal Message has a size that is a multiple of CHUNK_SIZE inclusive of headers, to avoid wasting IO loop iterations writing small trailer chunks.

Latch Class

class mitogen.core.Latch

A latch is a Queue.Queue-like object that supports mutation and waiting from multiple threads, however unlike Queue.Queue, waiting threads always remain interruptible, so CTRL+C always succeeds, and waits where a timeout is set experience no wake up latency. These properties are not possible in combination using the built-in threading primitives available in Python 2.x.

Latches implement queues using the UNIX self-pipe trick, and a per-thread socket.socketpair() that is lazily created the first time any latch attempts to sleep on a thread, and dynamically associated with the waiting Latch only for duration of the wait.

See Waking Sleeping Threads for further discussion.

close()

Mark the latch as closed, and cause every sleeping thread to be woken, with mitogen.core.LatchError raised in each thread.

empty()

Return True if calling get() would block.

As with Queue.Queue, True may be returned even though a subsequent call to get() will succeed, since a message may be posted at any moment between empty() and get().

As with Queue.Queue, False may be returned even though a subsequent call to get() will block, since another waiting thread may be woken at any moment between empty() and get().

get(timeout=None, block=True)

Return the next enqueued object, or sleep waiting for one.

Parameters:
Raises:
Returns:

The de-queued object.

put(obj)

Enqueue an object, waking the first thread waiting for a result, if one exists.

Raises:mitogen.core.LatchErrorclose() has been called, and the object is no longer valid.

PidfulStreamHandler Class

class mitogen.core.PidfulStreamHandler(stream=None)

A logging.StreamHandler subclass used when Router.enable_debug() has been called, or the debug parameter was specified during context construction. Verifies the process ID has not changed on each call to emit(), reopening the associated log file when a change is detected.

This ensures logging to the per-process output files happens correctly even when uncooperative third party components call os.fork().

emit(record)

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

open_pid = None

PID that last opened the log file.

template = '/tmp/mitogen.%s.%s.log'

Output path template.

Side Class

class mitogen.core.Side(stream, fd, keep_alive=True)

Represent a single side of a BasicStream. This exists to allow streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional (e.g. UNIX socket) file descriptors to operate identically.

Parameters:

During construction, the file descriptor has its os.O_NONBLOCK flag enabled using fcntl.fcntl().

stream

The Stream for which this is a read or write side.

fd

Integer file descriptor to perform IO on, or None if close() has been called.

keep_alive

If True, causes presence of this side in Broker’s active reader set to defer shutdown until the side is disconnected.

fileno()

Return fd if it is not None, otherwise raise StreamError. This method is implemented so that Side can be used directly by select.select().

close()

Call os.close() on fd if it is not None, then set it to None.

read(n=CHUNK_SIZE)

Read up to n bytes from the file descriptor, wrapping the underlying os.read() call with io_op() to trap common disconnection conditions.

read() always behaves as if it is reading from a regular UNIX file; socket, pipe, and TTY disconnection errors are masked and result in a 0-sized read just like a regular file.

Returns:Bytes read, or the empty to string to indicate disconnection was detected.
write(s)

Write as much of the bytes from s as possible to the file descriptor, wrapping the underlying os.write() call with io_op() to trap common disconnection connditions.

Returns:Number of bytes written, or None if disconnection was detected.

Stream Classes

class mitogen.core.BasicStream
receive_side

A Side representing the stream’s receive file descriptor.

transmit_side

A Side representing the stream’s transmit file descriptor.

on_disconnect(broker)

Called by Broker to force disconnect the stream. The base implementation simply closes receive_side and transmit_side and unregisters the stream from the broker.

on_receive(broker)

Called by Broker when the stream’s receive_side has been marked readable using Broker.start_receive() and the broker has detected the associated file descriptor is ready for reading.

Subclasses must implement this method if Broker.start_receive() is ever called on them, and the method must call on_disconect() if reading produces an empty string.

on_transmit(broker)

Called by Broker when the stream’s transmit_side has been marked writeable using Broker._start_transmit() and the broker has detected the associated file descriptor is ready for writing.

Subclasses must implement this method if Broker._start_transmit() is ever called on them.

on_shutdown(broker)

Called by Broker.shutdown() to allow the stream time to gracefully shutdown. The base implementation simply called on_disconnect().

class mitogen.core.Stream(router, remote_id, **kwargs)

BasicStream subclass implementing mitogen’s stream protocol.

pending_bytes()

Returns the number of bytes queued for transmission on this stream. This can be used to limit the amount of data buffered in RAM by an otherwise unlimited consumer.

For an accurate result, this method should be called from the Broker thread, using a wrapper like:

def get_pending_bytes(self, stream):
    latch = mitogen.core.Latch()
    self.broker.defer(
        lambda: latch.put(stream.pending_bytes())
    )
    return latch.get()
auth_id = None

If not None, Router stamps this into Message.auth_id of every message received on this stream.

is_privileged = False

If not False, indicates the stream has auth_id set and its value is the same as mitogen.context_id or appears in mitogen.parent_ids.

on_receive(broker)

Handle the next complete message on the stream. Raise StreamError on failure.

on_shutdown(broker)

Override BasicStream behaviour of immediately disconnecting.

on_transmit(broker)

Transmit buffered messages.

send(msg)

Send data to handle, and tell the broker we have output. May be called from any thread.

class mitogen.fork.Stream(*args, **kwargs)
construct(old_router, max_message_size, on_fork=None, debug=False, profiling=False, unidirectional=False, on_start=None)

Get the named context running on the local machine, creating it if it does not exist.

importer = None

Reference to the importer, if any, recovered from the parent.

on_fork = None

User-supplied function for cleaning up child process state.

class mitogen.parent.Stream(*args, **kwargs)

Base for streams capable of starting new slaves.

EC0_MARKER = 'MITO000\n'

Sentinel value emitted by the first stage to indicate it is ready to receive the compressed bootstrap. For mitogen.ssh this must have length of at least max(len(‘password’), len(‘debug1:’))

child_is_immediate_subprocess = True

If True, indicates the subprocess managed by us should not be killed during graceful detachment, as it the actual process implementing the child context. In all other cases, the subprocess is SSH, sudo, or a similar tool that should be reminded to quit during disconnection.

connect_deadline = None

Derived from connect_timeout; absolute floating point UNIX timestamp after which the connection attempt should be abandoned.

connect_timeout = 30.0

Maximum time to wait for a connection attempt.

construct(max_message_size, remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, unidirectional=False, old_router=None, **kwargs)

Get the named context running on the local machine, creating it if it does not exist.

static create_child(merge_stdio=False, stderr_pipe=False, preexec_fn=None)

Create a child process whose stdin/stdout is connected to a socket.

Parameters:
  • args – Argument vector for execv() call.
  • merge_stdio (bool) – If True, arrange for stderr to be connected to the stdout socketpair, rather than inherited from the parent process. This may be necessary to ensure that not TTY is connected to any stdio handle, for instance when using LXC.
  • stderr_pipe (bool) – If True and merge_stdio is False, arrange for stderr to be connected to a separate pipe, to allow any ongoing debug logs generated by e.g. SSH to be outpu as the session progresses, without interfering with stdout.
Returns:

(pid, socket_obj, :data:`None or pipe_fd)`

debug = False

True to cause context to write verbose /tmp/mitogen.<pid>.log.

get_python_argv()

Return the initial argument vector elements necessary to invoke Python, by returning a 1-element list containing python_path if it is a string, or simply returning it if it is already a list.

This allows emulation of existing tools where the Python invocation may be set to e.g. [‘/usr/bin/env’, ‘python’].

max_message_size = None

Passed via Router wrapper methods, must eventually be passed to ExternalContext.main().

on_shutdown(broker)

Request the slave gracefully shut itself down.

pid = None

Set to the child’s PID by connect().

profiling = False

True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.

python_path = '/home/docs/checkouts/readthedocs.org/user_builds/mitogen/envs/stable/bin/python'

The path to the remote Python interpreter.

routes = None

List of contexts reachable via this stream; used to cleanup routes during disconnection.

class mitogen.ssh.Stream(*args, **kwargs)
construct(hostname, username=None, ssh_path=None, port=None, check_host_keys='enforce', password=None, identity_file=None, compression=True, ssh_args=None, keepalive_enabled=True, keepalive_count=3, keepalive_interval=15, identities_only=True, ssh_debug_level=None, **kwargs)

Get the named context running on the local machine, creating it if it does not exist.

python_path = 'python'

Default to whatever is available as ‘python’ on the remote machine, overriding sys.executable use.

ssh_debug_level = 0

Number of -v invocations to pass on command line.

ssh_path = 'ssh'

The path to the SSH binary.

tty_stream = None

If batch_mode=False, points to the corresponding DiagLogStream, allowing it to be disconnected at the same time this stream is being torn down.

class mitogen.sudo.Stream(*args, **kwargs)
construct(username=None, sudo_path=None, password=None, preserve_env=None, set_home=None, sudo_args=None, login=None, **kwargs)

Get the named context running on the local machine, creating it if it does not exist.

static create_child()

Like tty_create_child(), except attach stdin/stdout to a socketpair like create_child(), but leave stderr and the controlling TTY attached to a TTY.

Parameters:args (list) – os.execl() argument list.
Returns:(pid, socketpair_fd, tty_fd)
tty_stream = None

Once connected, points to the corresponding DiagLogStream, allowing it to be disconnected at the same time this stream is being torn down.

Other Stream Subclasses

class mitogen.core.IoLogger(broker, name, dest_fd)

BasicStream subclass that sets up redirection of a standard UNIX file descriptor back into the Python logging package.

on_shutdown(broker)

Shut down the write end of the logging socket.

class mitogen.core.Waker(broker)

BasicStream subclass implementing the UNIX self-pipe trick. Used to wake the multiplexer when another thread needs to modify its state (via a cross-thread function call).

keep_alive

Prevent immediate Broker shutdown while deferred functions remain.

on_receive(broker)

Drain the pipe and fire callbacks. Reading multiple bytes is safe since new bytes corresponding to future .defer() calls are written only after .defer() takes _lock: either a byte we read corresponds to something already on the queue by the time we take _lock, or a byte remains buffered, causing another wake up, because it was written after we released _lock.

Poller Class

class mitogen.core.Poller
class mitogen.parent.KqueuePoller
class mitogen.parent.EpollPoller

Importer Class

class mitogen.core.Importer(router, context, core_src, whitelist=(), blacklist=())

Import protocol implementation that fetches modules from the parent process.

Parameters:context – Context to communicate via.

Responder Class

class mitogen.master.ModuleResponder(router)
neutralize_main(path, src)

Given the source for the __main__ module, try to find where it begins conditional execution based on a “if __name__ == ‘__main__’” guard, and remove any code after that point.

Forwarder Class

class mitogen.parent.ModuleForwarder(router, parent_context, importer)

Respond to GET_MODULE requests in a slave by forwarding the request to our parent context, or satisfying the request from our local Importer cache.

ExternalContext Class

class mitogen.core.ExternalContext

External context implementation.

broker

The mitogen.core.Broker instance.

context

The mitogen.core.Context instance.

channel

The mitogen.core.Channel over which CALL_FUNCTION requests are received.

stdout_log

The mitogen.core.IoLogger connected to stdout.

importer

The mitogen.core.Importer instance.

stdout_log

The IoLogger connected to stdout.

stderr_log

The IoLogger connected to stderr.

_dispatch_calls()

Implementation for the main thread in every child context.

mitogen.master

class mitogen.master.ProcessMonitor

Install a signal.SIGCHLD handler that generates callbacks when a specific child process has exitted.

add(pid, callback)

Add a callback function to be notified of the exit status of a process.

Parameters:
  • pid (int) – Process ID to be notified of.
  • callback – Function invoked as callback(status), where status is the raw exit status of the child process.

Blocking I/O Functions

These functions exist to support the blocking phase of setting up a new context. They will eventually be replaced with asynchronous equivalents.

mitogen.parent.discard_until(fd, s, deadline)

Read chunks from fd until one is encountered that ends with s. This is used to skip output produced by /etc/profile, /etc/motd and mandatory SSH banners while waiting for Stream.EC0_MARKER to appear, indicating the first stage is ready to receive the compressed mitogen.core source.

Parameters:
  • fd (int) – File descriptor to read from.
  • s (bytes) – Marker string to discard until encountered.
  • deadline (float) – Absolute UNIX timestamp after which timeout should occur.
Raises:
mitogen.parent.iter_read(fds, deadline=None)

Return a generator that arranges for up to 4096-byte chunks to be read at a time from the file descriptor fd until the generator is destroyed.

Parameters:
  • fd (int) – File descriptor to read from.
  • deadline (float) – If not None, an absolute UNIX timestamp after which timeout should occur.
Raises:
mitogen.parent.write_all(fd, s, deadline=None)

Arrange for all of bytestring s to be written to the file descriptor fd.

Parameters:
  • fd (int) – File descriptor to write to.
  • s (bytes) – Bytestring to write to file descriptor.
  • deadline (float) – If not None, absolute UNIX timestamp after which timeout should occur.
Raises:

Subprocess Creation Functions

mitogen.parent.create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None)

Create a child process whose stdin/stdout is connected to a socket.

Parameters:
  • args – Argument vector for execv() call.
  • merge_stdio (bool) – If True, arrange for stderr to be connected to the stdout socketpair, rather than inherited from the parent process. This may be necessary to ensure that not TTY is connected to any stdio handle, for instance when using LXC.
  • stderr_pipe (bool) – If True and merge_stdio is False, arrange for stderr to be connected to a separate pipe, to allow any ongoing debug logs generated by e.g. SSH to be outpu as the session progresses, without interfering with stdout.
Returns:

(pid, socket_obj, :data:`None or pipe_fd)`

mitogen.parent.hybrid_tty_create_child(args)

Like tty_create_child(), except attach stdin/stdout to a socketpair like create_child(), but leave stderr and the controlling TTY attached to a TTY.

Parameters:args (list) – os.execl() argument list.
Returns:(pid, socketpair_fd, tty_fd)
mitogen.parent.tty_create_child(args)

Return a file descriptor connected to the master end of a pseudo-terminal, whose slave end is connected to stdin/stdout/stderr of a new child process. The child is created such that the pseudo-terminal becomes its controlling TTY, ensuring access to /dev/tty returns a new file descriptor open on the slave end.

Parameters:args (list) – os.execl() argument list.
Returns:(pid, tty_fd, None)

Helper Functions

mitogen.core.to_text(o)

Coerce o to Unicode by decoding it from UTF-8 if it is an instance of bytes, otherwise pass it to the str constructor. The returned object is always a plain str, any subclass is removed.

mitogen.core.has_parent_authority(msg, _stream=None)

Policy function for use with Receiver and Router.add_handler() that requires incoming messages to originate from a parent context, or on a Stream whose auth_id has been set to that of a parent context or the current context.

mitogen.core.set_cloexec(fd)

Set the file descriptor fd to automatically close on os.execve(). This has no effect on file descriptors inherited across os.fork(), they must be explicitly closed through some other means, such as mitogen.fork.on_fork().

mitogen.core.set_nonblock(fd)

Set the file descriptor fd to non-blocking mode. For most underlying file types, this causes os.read() or os.write() to raise OSError with errno.EAGAIN rather than block the thread when the underlying kernel buffer is exhausted.

mitogen.core.set_block(fd)

Inverse of set_nonblock(), i.e. cause fd to block the thread when the underlying kernel buffer is exhausted.

mitogen.core.io_op(func, *args)

Wrap func(*args) that may raise select.error, IOError, or OSError, trapping UNIX error codes relating to disconnection and retry events in various subsystems:

  • When a signal is delivered to the process on Python 2, system call retry is signalled through errno.EINTR. The invocation is automatically restarted.
  • When performing IO against a TTY, disconnection of the remote end is signalled by errno.EIO.
  • When performing IO against a socket, disconnection of the remote end is signalled by errno.ECONNRESET.
  • When performing IO against a pipe, disconnection of the remote end is signalled by errno.EPIPE.
Returns:Tuple of (return_value, disconnected), where return_value is the return value of func(*args), and disconnected is True if disconnection was detected, otherwise False.
mitogen.parent.close_nonstandard_fds()
mitogen.parent.create_socketpair()

Create a socket.socketpair() to use for use as a child process’s UNIX stdio channels. As socket pairs are bidirectional, they are economical on file descriptor usage as the same descriptor can be used for stdin and stdout. As they are sockets their buffers are tunable, allowing large buffers to be configured in order to improve throughput for file transfers and reduce mitogen.core.Broker IO loop iterations.

mitogen.master.get_child_modules(path)

Return the suffixes of submodules directly neated beneath of the package directory at path.

Parameters:path (str) – Path to the module’s source code on disk, or some PEP-302-recognized equivalent. Usually this is the module’s __file__ attribute, but is specified explicitly to avoid loading the module.
Returns:List of submodule name suffixes.
mitogen.minify.minimize_source(*args, **kwds)

Remove comments and docstrings from Python source, preserving line numbers and syntax of empty blocks.

Parameters:source (str) – The source to minimize.
Returns str:The minimized source.