Replying to DNS queries with Python Scapy
Overview
Scapy is a Python program that enables users to send, sniff, dissect, and forge network packets. In this example, I use Scapy to intercept a specific DNS query and reply with a spoofed answer.
Details
Start the Scapy script (scroll down to see the code):
(.venv) root@home-server:/home/jemurray/scapy# ./scapy-dns.py
.
Sent 1 packets.
Send a query to the system running the Scapy script. For this example, there is no need to have a working DNS resolver. The Scapy program is listening on the raw interface waiting for a DNS packet to arrive.
The response to the query example.com
returns the bogus answer 1.2.3.4
:
jemurray@Jasons-MacBook-Pro ~ % dig @192.168.86.5 example.com
; <<>> DiG 9.10.6 <<>> @192.168.86.5 example.com
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 56703
;; flags: qr aa; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0
;; QUESTION SECTION:
;example.com. IN A
;; ANSWER SECTION:
example.com. 600 IN A 1.2.3.4
;; Query time: 34 msec
;; SERVER: 192.168.86.5#53(192.168.86.5)
;; WHEN: Wed Dec 16 16:12:33 CST 2020
;; MSG SIZE rcvd: 56
Packet capture of the dig
query and Scapy response:
16:12:33.628252 IP 192.168.86.25.58657 > 192.168.86.5.53: 56703+ [1au] A? example.com. (40)
16:12:33.653489 IP 192.168.86.5.53 > 192.168.86.25.58657: 56703*- 1/0/0 A 1.2.3.4 (56)
The Scapy Python script to listen for and generate a spoofed DNS response:
#!/usr/bin/env python
# Import scapy libraries
from scapy.all import *
# Set the interface to listen and respond on
net_interface = "ens160"
# Berkeley Packet Filter for sniffing specific DNS packet only
packet_filter = " and ".join([
"udp dst port 53", # Filter UDP port 53
"udp[10] & 0x80 = 0", # DNS queries only
"src host 192.168.86.25" # IP source <ip>
])
# Function that replies to DNS query
def dns_reply(packet):
# Construct the DNS packet
# Construct the Ethernet header by looking at the sniffed packet
eth = Ether(
src=packet[Ether].dst,
dst=packet[Ether].src
)
# Construct the IP header by looking at the sniffed packet
ip = IP(
src=packet[IP].dst,
dst=packet[IP].src
)
# Construct the UDP header by looking at the sniffed packet
udp = UDP(
dport=packet[UDP].sport,
sport=packet[UDP].dport
)
# Construct the DNS response by looking at the sniffed packet and manually
dns = DNS(
id=packet[DNS].id,
qd=packet[DNS].qd,
aa=1,
rd=0,
qr=1,
qdcount=1,
ancount=1,
nscount=0,
arcount=0,
ar=DNSRR(
rrname=packet[DNS].qd.qname,
type='A',
ttl=600,
rdata='1.2.3.4')
)
# Put the full packet together
response_packet = eth / ip / udp / dns
# Send the DNS response
sendp(response_packet, iface=net_interface)
# Sniff for a DNS query matching the 'packet_filter' and send a specially crafted reply
sniff(filter=packet_filter, prn=dns_reply, store=0, iface=net_interface, count=1)