Permalink: 2013-06-04 21:54:00+09:00 by ruy@ainoniwa.net in technical tags: dpkt python winpcapy social:

あらすじ

( ゚д゚)/ 3行で分かるlibpcapのpythonラッパーたち

あーしちゃん「あーしwindowsなんだけど」

そこでおじさんは考えた。ctypesしよう、と。

今日紹介するのは winpcapy http://code.google.com/p/winpcapy/ だよ

コイツはpythonから超簡単にwinpcap経由でデバイスを叩けるナイスなラッパーさ。

インストール? C:\python27\Lib\site-packages\ に置くだけだよ。

やっぱりさぁ、windowsでpythonでwinpcap使いたいとか言い始めるとさ、やれコンパイルしろとか色々うるさいわけじゃん。

俺はもっと環境負担の少ない、手軽な感じで救われたいんだよ。

Visual Studio Expressをダウンロードしてくるところから始まるpythonとか、手足を縛られてwindowsの前に転がされた状態からLinux入れてスタートとか、やっぱりしんどいんだよ。

でもwinpcapyはヤバイ。すべてをぶっ飛ばしてLet's パケットキャプチャ and パケットジェネレータなわけよ。

使い方をいちいち説明する気はないぜ。サンプルコードを載せて、どういう動きをするかだけ書く。

それで十分なのさ。

本家のサンプルコードでも良いんだが、そこはそれ。

ちょいと擬似サーバでも立てて、主に実験用途でしか役に立たない感じのエコーサーバを作ってみる。

基本機能

  • 利用可能なインタフェースの一覧を取得する(-d)
  • 待ち受けインタフェースとアドレスを指定して、ARP/pingに応答するサーバとして動作する(-i XXX -s Y.Y.Y.Y -m ZZ:ZZ:ZZ:ZZ:ZZ:ZZ)

ここまで出来ることが確認出来れば、パケットの中身は加工次第でどうにでもなる。

コード片。

# -*- coding: utf-8 -*-

from ctypes import *
from winpcapy import *
import dpkt
import socket
from optparse import OptionParser
version = u'%prog 1.1'

def pkt_send(dev, buf):
    pcap_sendpacket(dev,cast(buf,POINTER(c_ubyte)),len(buf))
    return

def _packet_decode(pkt_no, header, pkt_data, dev, opts):
    raw_data = string_at(pkt_data, header.contents.len)
    try:
        packet = dpkt.ethernet.Ethernet(raw_data)
    except:
        print("Decode fail")
        return
    reply = False
    eth_src = ''.join([chr(int(i,16)) for i in opts.server_mac.split(':')])
    eth_dst = packet.src
    eth_type = packet.type
    while 1:
        if type(packet.data) == dpkt.arp.ARP:
            packet = packet.data
            print '[ARP]',socket.inet_ntoa(packet.spa),"search for",socket.inet_ntoa(packet.tpa)
            if socket.inet_ntoa(packet.tpa) == opts.server_ip:
                eth = dpkt.ethernet.Ethernet(src=eth_src, dst=eth_dst, type=eth_type)
                arp = dpkt.arp.ARP(op = dpkt.arp.ARP_OP_REPLY)
                arp.sha = eth_src
                arp.spa = packet.tpa
                arp.tha = packet.sha
                arp.tpa = packet.spa
                eth.data = arp
                buf = str(eth)
                pkt_send(dev, buf)
                break
        elif type(packet.data) == dpkt.ip.IP:
            packet = packet.data
            if socket.inet_ntoa(packet.dst) == opts.server_ip:
                reply = True
        elif type(packet.data) == dpkt.icmp.ICMP:
            if reply and packet.data.type == dpkt.icmp.ICMP_ECHO:
                eth = dpkt.ethernet.Ethernet(src=eth_src, dst=eth_dst, type=eth_type)
                ip = dpkt.ip.IP(src=packet.dst, dst=packet.src, p=packet.p, ttl=packet.ttl)
                icmp = dpkt.icmp.ICMP(type=dpkt.icmp.ICMP_ECHOREPLY)
                icmp.data = packet.data.data
                ip.data = icmp
                ip.len = len(str(ip))
                ip.sum = 0
                eth.data = ip
                buf = str(eth)
                pkt_send(dev, buf)
                reply = False
            break
        else:
            break

def dev_discovery():
    alldevs = POINTER(pcap_if_t)()
    errbuf  = create_string_buffer(PCAP_ERRBUF_SIZE)
    if (pcap_findalldevs(byref(alldevs), errbuf) == -1):
        print ("Error in pcap_findalldevs: %s\n" % errbuf.value)
        return

    dev_count = 0
    try:
        dev = alldevs.contents
    except:
        print ("Error in pcap_findalldevs: %s" % errbuf.value)
        print ("Maybe you need admin privilege?\n")
        return

    while dev:
        dev_count = dev_count+1
        print("%d. %s" % (dev_count, dev.name))
        if (dev.description):
            print (" (%s)\n" % (dev.description))
        else:
            print (" (No description available)\n")
        if dev.next:
            dev = dev.next.contents
        else:
            dev = False

    if (dev_count==0):
        print ("\nNo interfaces found! Make sure WinPcap is installed.\n")
        return

def echo_server(opts):
    alldevs = POINTER(pcap_if_t)()
    errbuf  = create_string_buffer(PCAP_ERRBUF_SIZE)
    if (pcap_findalldevs(byref(alldevs), errbuf) == -1):
        print ("Error in pcap_findalldevs: %s\n" % errbuf.value)
        return

    dev_count = 0
    try:
        dev = alldevs.contents
    except:
        print ("Error in pcap_findalldevs: %s" % errbuf.value)
        print ("Maybe you need admin privilege?\n")
        return

    try:
        dev = alldevs
        for dev_count in range(0,opts.interface-1):
            dev = dev.contents.next
    except:
        print "Invalid interface number."
        pcap_freealldevs(alldevs)
        return

    dev = dev.contents
    adhandle = pcap_open(dev.name,65536,PCAP_OPENFLAG_PROMISCUOUS,1,None,errbuf)
    pcap_setmintocopy(adhandle,60)
    if (adhandle == None):
        print("\nUnable to open the adapter. %s is not supported by Pcap-WinPcap\n" % dev.name)
        pcap_freealldevs(alldevs)
        return

    print("\nlistening on %s...\n" % (dev.description))
    pcap_freealldevs(alldevs)

    pkt_no = 0
    pkt_hdr = POINTER(pcap_pkthdr)()
    pkt_data = POINTER(c_ubyte)()
    try:
        while 1:
            nx = pcap_next_ex(adhandle,pkt_hdr,pkt_data)
            if nx != 0:
                pkt_no += 1
                _packet_decode(pkt_no,pkt_hdr,pkt_data,adhandle,opts)
    except:
        print "exit."
    pcap_close(adhandle)

def main():
    p = OptionParser(version=version)
    p.add_option('-d', '--discovery', action='store_true', help="Discovery device.")
    p.add_option('-i', '--interface', action='store', type='int', help="Interface number.")
    p.add_option('-s', '--server-ip', action='store', help="Virtual IP")
    p.add_option('-m', '--server-mac', action='store', help="Virtual MAC")
    opts, args = p.parse_args()

    if opts.discovery:
        dev_discovery()
        return

    elif opts.server_ip is None or\
         opts.server_mac is None:
        print "Server ip and mac is requied."
    elif opts.interface < 1:
        print "Select interface number."
    else:
        echo_server(opts)
        return

    p.print_version()
    p.print_help()
    return

if __name__ == '__main__':
    main()

長い。

あと、整形とかif文で例外吐きそうになるとかそんな気はするけど。

試しにキャプチャして、動作を確認するとこんな感じ。

vechoserver_001

デバイス操作のひな形さえ作ってしまえば、存外何にでも使えると思うのだがどうだろう。