Exploring Cowrie Honeypot with scripts¶
Here are some useful scripts made by club members to view and analyze the cowrie honeypots.
Prerequisites¶
- You will need to install Cowrie, you can do that here
- You need to have
requests
installed for python
1. Pick any amount of scripts that you are interested in¶
General data about all IPs
This includes IP's, countries and some statistics
import json
import os
user = os.environ.get('USER')
logsPath = f"/home/{user}/cowrie/var/log/cowrie/cowrie.json"
def searchLog(searcIP: str, logPath: str) -> None:
with open(logPath, "r") as f:
for line in f:
js = json.loads(line)
if js["src_ip"] == searchIP:
x = js.get("input")
if x:
print(js["input"])
searchIP = input("Enter IP:")
searchLog(searchIP, logsPath)
Get commands made by IP address - Made by Milan
Takes in an IP address and spits out all commands made by that address. If it has not made any commands it does not print anything
import json
import requests
from collections import defaultdict
def parse_logs(file_path):
ip_stats = {}
country_stats = defaultdict(int)
session_ips = {}
with open(file_path, 'r') as f:
for line in f:
try:
log = json.loads(line)
src_ip = log.get('src_ip')
eventid = log.get('eventid')
session = log.get('session')
if not src_ip:
continue
# Map sessions to IPs
if session:F
session_ips[session] = src_ip
if src_ip not in ip_stats:
# Fetch country information using ip-api.com
response = requests.get(f'http://ip-api.com/json/{src_ip}')
if response.status_code == 200:
data = response.json()
country = data.get('country', 'Unknown')
else:
country = 'Unknown'
ip_stats[src_ip] = {
'country': country,
'connections': 0,
'durations': []
}
if eventid == 'cowrie.session.closed':
duration = log.get('duration', 0)
ip = session_ips.get(session, src_ip)
ip_stats[ip]['connections'] += 1
ip_stats[ip]['durations'].append(duration)
country_stats[ip_stats[ip]['country']] += 1
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
continue
# Print the summary per PI
for ip, stats in ip_stats.items():
print(f"IP: {ip}")
print(f"Country: {stats['country']}")
print(f"Number of connections: {stats['connections']}")
print(f"Durations: {stats['durations']}")
if stats['durations']:
total_duration = sum(stats['durations'])
avg_duration = total_duration / len(stats['durations'])
print(f"Total duration: {total_duration:.2f} seconds")
print(f"Average duration: {avg_duration:.2f} seconds")
print("-" * 60)
# Print country statistics
print("\nCountry Statistics:")
sorted_countries = sorted(country_stats.items(), key=lambda x: x[1], reverse=True)
for country, count in sorted_countries:
print(f"{country}: {count} connections")
if __name__ == "__main__":
log_file_path = "/home/<username>/cowrie/var/log/cowrie/cowrie.json"
parse_logs(log_file_path)
Reads sessions and shows the commands used, event id, src ip and log - Made by Doddi (Þórður)
import json
import requests
from collections import defaultdict
def get_ip_location(ip):
"""
get the continent and country of an IP address using ip-api.com.
returns 'Unknown' if the request fails.
"""
try:
response = requests.get(f'http://ip-api.com/json/{ip}')
if response.status_code == 200:
data = response.json()
continent = data.get('continent', 'Unknown')
country = data.get('country', 'Unknown')
return continent, country
else:
return 'Unknown', 'Unknown'
except Exception as e:
print(f"Error getting location for {ip}: {e}")
return 'Unknown', 'Unknown'
def parse_logs(file_path):
"""
parses the Cowrie logs, sorts keylogs (commands) by IP, and fetches their geolocation.
"""
keylogs_by_ip = defaultdict(list)
with open(file_path, 'r') as f:
for line in f:
try:abcabcabc
log = json.loads(line)
eventid = log.get('eventid')
src_ip = log.get('src_ip')
command = log.get('input')
session = log.get('session')
if eventid == "cowrie.command.input" and src_ip and command:
keylogs_by_ip[src_ip].append(command)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
continue
# process and print the keylogs sorted by IP
for ip, commands in keylogs_by_ip.items():
continent, country = get_ip_location(ip)
print(f"\nIP: {ip}")
print(f"Location: {continent}, {country}")
print("Commands entered by attacker:")
for cmd in commands:
print(f" {cmd}")
print("-" * 60)
if __name__ == "__main__":
# Update the file path to your actual cowrie.json location
log_file_path = "/home/<username>/cowrie/var/log/cowrie/cowrie.json" # change the uername and use the path to the cowrie.json fie
parse_logs(log_file_path)
**2. Save the file/files on your Cowrie machine and run them **¶
That's it :)