python asyncore实现端口转发

Published on 2015 - 10 - 01

最近Digital Ocean的VPS直连一直连不上,怀疑是功夫王根据ssh的流量特征做了拦截,遂写了个脚本实现RC4加密的端口转发(主要目的是规避流量特征的分析)。
python的asyncore库是一个异步的socket通信库。
一个简单的使用示例如下:

import asyncore, socket
class HTTPClient(asyncore.dispatcher):              # 继承dispatcher类
    def __init__(self, host, path):
        asyncore.dispatcher.__init__(self)
                                                    # 这个示例是client端
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect( (host, 80) )
        self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path
    def handle_connect(self):                       # 此成员在连接成功时被调用
        pass
    def handle_close(self):                         # 远程关闭时被调用
        self.close()
    def handle_read(self):                          # 默认情况下loop会循环调用此函数,可自定义readable成员实现条件控制
        print self.recv(8192)                       # 应该会阻塞在这
    def writable(self):                             # 可发送的条件控制成员
        return (len(self.buffer) > 0)               # 当发送队列不为空时
    def handle_write(self):                         # 发送操作调用的回调
        sent = self.send(self.buffer)
        self.buffer = self.buffer[sent:]
client = HTTPClient('www.python.org', '/')
asyncore.loop()                                     # 此调用会在asyncore内部开始异步通信

dispatcher父类中handle开头的成员都是回调类,可在子类中定义回调函数。

在网上查了一些类似的程序都感觉代码写的不是很好,于是自己查了下manual重新造了遍轮子,发现还是挺别扭😂
大致代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket,asyncore,sys
from Crypto.Cipher import ARC4
from Crypto.Hash import SHA
DEBUG=False
cipher_rc4_obj=None
class Transmiter(asyncore.dispatcher):                  #数据交换类
    '''数据交换类,两个此类的实例可互换tx和rx,若初始化socket则表示server,否则是client'''
    def __init__(self,socket_info,transmitee=None):     
        '''参数为socket_info,另一个交换实例'''\
        '''(作为client时socket_info为tuple(ip,port),作为server时socket_info为accept得到的socket)'''\
        '''例如Transmiter((12.34.56.78,8080),transee)实例化了一个client端'''\
        '''Transmiter(socket_acptd,transee)实例化了一个server端'''
        self.buf_Tx=''
        self.buf_Rx=''                                              # Reserved
        self.transmitee=transmitee                                  # transmitee是要与之交换数据的另一个对象
        if None!=transmitee and None==transmitee.transmitee:
            transmitee.transmitee=self                              # transmitee与本实例交换数据
        if isinstance(socket_info,socket.socket):                   # server
            asyncore.dispatcher.__init__(self,socket_info)
            if DEBUG:
                print 'Transer server'
        elif isinstance(socket_info[0],str) and isinstance(socket_info[1],int):
            asyncore.dispatcher.__init__(self)                      # client
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)  # 建立套接字
            self.connect(socket_info)                               # (ip,port)
            if DEBUG:
                print 'Transer client'
        else:
            raise Exception

    def handle_read(self):                  # 读处理:recv并加入transee的发送队列或丢弃数据
        read = self.recv(4096)              # 有数据可读时将读取的数据加入transee的发送队列,无transee则丢弃
        if DEBUG:
            print 'Transer Read\t%d bytes' % (len(read))
        if None!=self.transmitee:
            self.transmitee.buf_Tx += read
        else:
            print 'Transer Drop\t%d bytes' % (len(read))
    def writable(self):                     # 可写判断:检测self的发送队列是否有数据
        return (len(self.buf_Tx) > 0 )
    def handle_write(self):                 # 写处理:send发送队列,保留未发送成功的部分
        sent = self.send(self.buf_Tx)
        self.buf_Tx = self.buf_Tx[sent:]
        if DEBUG:
            print 'Transer Write\t%d bytes' % (sent)
    def handle_close(self):                 # 关闭状态处理,关闭self及transee
        self.close()                        # 远端关闭后关闭连接
        if DEBUG:
            print 'Transer close'
        if None!=self.transmitee:
            self.transmitee.close()
            if DEBUG:
                print 'Transer\'s transee close'
    def handle_error(self):
        if DEBUG:
            print 'Transer error occur'
        self.handle_close()
    def handle_expt(self):
        if DEBUG:
            print 'Transer exception occur'
        self.handle_close()
class Connector(object):        # 如果断开不太好重新建立连接
    def __init__(self,remoteaddr1,remoteaddr2):
        self.remoteaddr1=remoteaddr1
        self.remoteaddr2=remoteaddr2
        self.trans1=Transmiter(remoteaddr1)
        self.trans2=Transmiter(remoteaddr2,self.trans1)
class Listener(asyncore.dispatcher):        # 两个accepted组成一对trans
    def __init__(self,localaddr,backlog=5):
        asyncore.dispatcher.__init__(self)
        self.lasttrans=None
        self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind(localaddr)
        self.listen(backlog)
    def handle_accept(self):
        conn, addr = self.accept()
        print 'accepted %s' % str(addr)
        if None==self.lasttrans:                #没有上一个trans
            print 'wait for another'
            self.lasttrans=Transmiter(conn)     #记录此trans
        else:
            print 'paired to last one'
            Transmiter(conn,self.lasttrans)     #有记录的trans,配对
            self.lasttrans=None                 #清除记录
class Forwarder(asyncore.dispatcher):
    def __init__(self, localaddr, remoteaddr, backlog=5):
        asyncore.dispatcher.__init__(self)
        self.remoteaddr=remoteaddr
        self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind(localaddr)
        self.listen(backlog)
    def handle_accept(self):
        conn, addr = self.accept()
        print 'Forward from %s to %s' % (str(addr),self.remoteaddr)
        trans_Acptd=Transmiter(conn)
        trans_toRemote=Transmiter(self.remoteaddr,trans_Acptd)
class TransmiterWithCipher(Transmiter):
    def __init__(self,socket_info,cipher_obj=None,isEnc=False,transmitee=None):
        Transmiter.__init__(self,socket_info,transmitee)
        self.cipher_obj=cipher_obj
        self.isEnc=isEnc
    def handle_read(self):
        read = self.recv(4096)
        if DEBUG:
            print 'Transer Read\t%d bytes' % (len(read))
        if None!=self.cipher_obj and False!=self.isEnc:
            read=self.cipher_obj.decrypt(read)
        if None!=self.transmitee:
            if None != self.transmitee.cipher_obj and False!=self.transmitee.isEnc:
                read = self.transmitee.cipher_obj.encrypt(read)
            self.transmitee.buf_Tx +=read
        else:
            print 'Transer Drop\t%d bytes' % (len(read))
class ForwarderWithCipher(Forwarder):
    def __init__(self, localaddr, remoteaddr, cipher_obj=None,localenc=False,remoteenc=False,backlog=5):
        self.cipher_obj=cipher_obj
        self.localenc=localenc
        self.remoteenc=remoteenc
        Forwarder.__init__(self, localaddr, remoteaddr, backlog)
    def handle_accept(self):
        conn, addr = self.accept()
        print 'Forward from %s to %s' % (str(addr),self.remoteaddr)
        trans_Acptd=TransmiterWithCipher(conn,self.cipher_obj,self.localenc)
        trans_toRemote=TransmiterWithCipher(self.remoteaddr,self.cipher_obj,self.remoteenc,trans_Acptd)

由于是规避流量特征所以拿rc4凑合一下,根据自己的脑洞顺手写了个Connector和Listenner没有测试,拒绝填坑。最后通过其他连接渠道把脚本部到vps后发现直连还是会reset,估计是校长并没有看流量特征直接把vps的C段都断了。 ( ̄へ ̄)
ref

发现了个小工具,socat,功能挺多。