"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import base64
import binascii
import csv
import os
from collections import namedtuple
from contextlib import suppress
from pathlib import Path
from time import time
from . import AIBOLIT
(
SUSPICIOUS,
VULNERS,
EXTENDED_SUSPICIOUS,
IGNORED_SUSPICIOUS,
) = (
"suspicious",
"vulners",
"extended-suspicious",
"ignored-suspicious",
)
SECTIONS = {
"p": "php_malware",
"j": "js_malware",
"c": "cloudhash",
"s": SUSPICIOUS,
"v": VULNERS,
"es": EXTENDED_SUSPICIOUS,
"is": IGNORED_SUSPICIOUS,
}
SUSPICIOUS_SECTIONS = {
SUSPICIOUS,
VULNERS,
EXTENDED_SUSPICIOUS,
IGNORED_SUSPICIOUS,
}
AiBolitCSVReport = namedtuple(
"AiBolitCSVReport",
[
"section",
"path",
"signature",
"ctime",
"mtime",
"size",
"etime",
"signature_id",
"hash",
"signature_name",
"sha256",
],
)
def parse_report_csv(report_path: Path):
with report_path.open(newline="") as report_stream:
for raw_row in csv.reader(report_stream, delimiter=","):
row = AiBolitCSVReport(*raw_row)
try:
section = SECTIONS[row.section]
except KeyError:
continue
sig = row.signature_name or "{}.{}".format(
section, row.signature_id
)
timestamp = (
int(float(row.etime)) if row.section != "v" else int(time())
)
file_name = row.path
with suppress(binascii.Error):
file_name = base64.b64decode(file_name, validate=True)
file_name = os.fsdecode(file_name)
yield {
"name": AIBOLIT,
"file_name": file_name,
"signature": sig,
"ctime": int(row.ctime),
"modification_time": int(row.mtime),
"suspicious": section in SUSPICIOUS_SECTIONS,
"size": int(row.size or 0),
"hash": row.sha256 or row.hash or None,
"timestamp": timestamp,
"extended_suspicious": section == EXTENDED_SUSPICIOUS,
}
def parse_report_json(report, base64_path=True):
for section in SECTIONS.values():
for hit in report.get(section, []):
sig = hit.get("sn") or ".".join([section, str(hit["sigid"])])
# vulners section does not provide timestamp ('et' field)
# so current time is used instead.
# 'et' - time when the file was scanned
timestamp = (
int(float(hit["et"])) if section != "vulners" else int(time())
)
file_name = hit["fn"]
if base64_path:
with suppress(binascii.Error):
file_name = base64.b64decode(file_name, validate=True)
file_name = os.fsdecode(file_name)
yield {
"name": AIBOLIT,
"file_name": file_name,
"signature": sig,
"suspicious": section in SUSPICIOUS_SECTIONS,
"size": hit["sz"],
"ctime": hit["ct"],
"modification_time": hit["mt"],
# 'hash' field is still used in 'cloudhash' section
"hash": hit.get("sha256", hit.get("hash")),
"timestamp": timestamp,
"extended_suspicious": section == EXTENDED_SUSPICIOUS,
}