This setup allows you to continuously run the backend service, and every time you make a curl request, it performs the search and returns only the summary information
Creating a MongoDB Search API Using Flask
This post will guide you through setting up a Flask API to search through MongoDB collections based on a search string and return a summary of the results.
Step 1: Setting Up the Flask API
Create a Python script that sets up the Flask API and connects to MongoDB:
from flask import Flask, request, jsonify
from pymongo import MongoClient
import re
# MongoDB connection settings
MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "security_logs"
# List of collections to search
collections = [
"AlienVaultBlockedIPs",
"AlienVaultIDSBlockedIPs",
"CSFDenyLogs",
"DDoSBlockedIPs",
"IDSBlockedIPs",
"maliciousDNSBlockedIPs",
"OpenRestyAccessLogs",
"OpenRestyErrorLogs",
"SuricataFullFastLog",
"SyslogFirewall",
"TCPDumpAllIPs",
"TotalDNSIPs",
"WAF400URIs",
"WAF403URIs",
"WAFAllURIs"
]
# Connect to MongoDB
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
# Initialize Flask app
app = Flask(__name__)
# Function to search for a string across multiple collections
def search_string_in_collections(search_string):
search_results = {}
unique_entries = set() # Set to store unique matches
# Iterate over all collections
for collection_name in collections:
collection = db[collection_name]
search_query = {"$or": []}
# Depending on the collection, search in relevant fields
if collection_name in ["AlienVaultBlockedIPs", "AlienVaultIDSBlockedIPs", "DDoSBlockedIPs", "IDSBlockedIPs", "maliciousDNSBlockedIPs", "TCPDumpAllIPs", "TotalDNSIPs"]:
search_query["$or"].append({"IP address": {"$regex": re.compile(search_string, re.IGNORECASE)}})
elif collection_name in ["SuricataFullFastLog", "SyslogFirewall"]:
search_query["$or"].append({"log_line": {"$regex": re.compile(search_string, re.IGNORECASE)}})
elif collection_name in ["WAF400URIs", "WAF403URIs", "WAFAllURIs"]:
search_query["$or"].append({"URI": {"$regex": re.compile(search_string, re.IGNORECASE)}})
# Perform the search only if there are conditions in the $or array
if search_query["$or"]:
results = list(collection.find(search_query))
if results:
# Remove duplicates by converting to unique set of entries
for result in results:
result_str = str(result)
if result_str not in unique_entries:
unique_entries.add(result_str)
if collection_name not in search_results:
search_results[collection_name] = []
search_results[collection_name].append(result)
return search_results, len(unique_entries)
# Route to trigger search via a GET request
@app.route('/search', methods=['GET'])
def search():
# Get the search string from the query parameter
search_string = request.args.get('q', '')
if not search_string:
return jsonify({"error": "Search string is required"}), 400
# Perform the search
search_results, total_unique_matches = search_string_in_collections(search_string)
# Build the summary to send back
summary = {
"total_unique_matches": total_unique_matches,
"collections": []
}
for collection_name, results in search_results.items():
summary["collections"].append({
"collection_name": collection_name,
"occurrences": len(results)
})
# Return the summary as JSON
return jsonify(summary)
# Main entry point
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000)
Step 2: Running the Flask API
After saving the Python script (e.g., search_mongo_service.py
), run it with the following command:
python3 search_mongo_service.py
Step 3: Making a Search Request via curl
You can trigger a search by making a GET
request to the Flask API, passing the search string as a query parameter. For example:
curl -X GET "http://your_server_ip:5000/search?q=1.1.1.1"
Step 4: JSON Response
The API will return a JSON response containing the total number of unique matches and a list of collections where the search string was found. The response will look like this:
{
"total_unique_matches": 3,
"collections": [
{"collection_name": "DDoSBlockedIPs", "occurrences": 1},
{"collection_name": "SyslogFirewall", "occurrences": 1},
{"collection_name": "SuricataFullFastLog", "occurrences": 1}
]
}
Comments
Post a Comment