欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

mitmproxy(TLS错误)

程序员文章站 2022-06-15 14:30:13
一.原来的基础上添加代码 ......

一.原来的基础上添加代码

"""
this inline script allows conditional tls interception based
on a user-defined strategy.
example:
    > mitmdump -s tls_passthrough.py
    1. curl --proxy http://localhost:8080 https://example.com --insecure
    // works - we'll also see the contents in mitmproxy
    2. curl --proxy http://localhost:8080 https://example.com --insecure
    // still works - we'll also see the contents in mitmproxy
    3. curl --proxy http://localhost:8080 https://example.com
    // fails with a certificate error, which we will also see in mitmproxy
    4. curl --proxy http://localhost:8080 https://example.com
    // works again, but mitmproxy does not intercept and we do *not* see the contents
authors: maximilian hils, matthew tuusberg
"""
import collections
import random

from enum import enum

import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import tlsprotocolexception
from mitmproxy.proxy.protocol import tlslayer, rawtcplayer


class interceptionresult(enum):
    success = true
    failure = false
    skipped = none


class _tlsstrategy:
    """
    abstract base class for interception strategies.
    """

    def __init__(self):
        # a server_address -> interception results mapping
        self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))

    def should_intercept(self, server_address):
        """
        returns:
            true, if we should attempt to intercept the connection.
            false, if we want to employ pass-through instead.
        """
        raise notimplementederror()

    def record_success(self, server_address):
        self.history[server_address].append(interceptionresult.success)

    def record_failure(self, server_address):
        self.history[server_address].append(interceptionresult.failure)

    def record_skipped(self, server_address):
        self.history[server_address].append(interceptionresult.skipped)


class conservativestrategy(_tlsstrategy):
    """
    conservative interception strategy - only intercept if there haven't been any failed attempts
    in the history.
    """

    def should_intercept(self, server_address):
        if interceptionresult.failure in self.history[server_address]:
            return false
        return true


class probabilisticstrategy(_tlsstrategy):
    """
    fixed probability that we intercept a given connection.
    """

    def __init__(self, p):
        self.p = p
        super(probabilisticstrategy, self).__init__()

    def should_intercept(self, server_address):
        return random.uniform(0, 1) < self.p


class tlsfeedback(tlslayer):
    """
    monkey-patch _establish_tls_with_client to get feedback if tls could be established
    successfully on the client connection (which may fail due to cert pinning).
    """

    def _establish_tls_with_client(self):
        server_address = self.server_conn.address

        try:
            super(tlsfeedback, self)._establish_tls_with_client()
        except tlsprotocolexception as e:
            tls_strategy.record_failure(server_address)
            raise e
        else:
            tls_strategy.record_success(server_address)


# inline script hooks below.

tls_strategy = none


def load(l):
    l.add_option(
        "tlsstrat", int, 0, "tls passthrough strategy (0-100)",
    )


def configure(updated):
    global tls_strategy
    if ctx.options.tlsstrat > 0:
        tls_strategy = probabilisticstrategy(float(ctx.options.tlsstrat) / 100.0)
    else:
        tls_strategy = conservativestrategy()


def next_layer(next_layer):
    """
    this hook does the actual magic - if the next layer is planned to be a tls layer,
    we check if we want to enter pass-through mode instead.
    """
    if isinstance(next_layer, tlslayer) and next_layer._client_tls:
        server_address = next_layer.server_conn.address

        if tls_strategy.should_intercept(server_address):
            # we try to intercept.
            # monkey-patch the layer to get feedback from the tlslayer if interception worked.
            next_layer.__class__ = tlsfeedback
        else:
            # we don't intercept - reply with a pass-through layer and add a "skipped" entry.
            mitmproxy.ctx.log("tls passthrough for %s" % repr(next_layer.server_conn.address), "info")
            next_layer_replacement = rawtcplayer(next_layer.ctx, ignore=true)
            next_layer.reply.send(next_layer_replacement)
            tls_strategy.record_skipped(server_address)