PrxoyPool-免费代理池的设计与实现

前言

在进行渗透测试、漏洞挖掘的时候,使用自动化工具是必须的一步,但是令人😠的是,经常由于扫描等原因导致IP被ban,这是我们最不想看到的。所以本着学习编程的目的,根据网上搜索的资料,东拼西凑的开发了这个系统。

0x01 涉及技术

  • Python3

    使用Python3的原因是由于异步IO使用的是asyncioaiohttp,所以就学习尝试了下Python3,其实没多大区别感觉。(刚开始学编程,原谅我啥都不知道😜)

  • Flask

    使用Flask作为Web开发微框架,交与其他Python框架,具有简便、易扩展等优点。具体可看:Flask中文文档

  • Redis

    Redis 是速度非常快的非关系型(NoSQL)内存键值数据库,可以存储键和五种不同类型的值之间的映射。

    五种类型数据类型为:字符串、列表、集合、有序集合、散列表。本次数据存储方面就利用的redis的有序集合。

  • 爬虫

    爬虫部分是从11个免费代理网站上抓取免费、高可用性的代理IP,并进行数据处理获取统一格式。同时本着学习的理念使用了:lxmlpyquery正则json等不同的技术来实现。

  • 异步IO

    默认给代理池设置的阈值为50000,由于数量比较大,为了提高代理高可用性的检测效率,我们在这里使用异步请求库aiohttp来进行检测。具体参考:廖雪峰的官方网站-异步IO

  • 前端

    web前端部分我是真不懂(但是我会抄啊😝),直接wget的一个自己觉得比较简洁的网站的前端,哈哈😄~

0x02 架构部分

主要分为如下几个模块:

  • 爬虫模块:

    主要从11个免费代理网站上抓取免费的代理IP,并进行格式统一处理。同时为了保证代理池的枯竭,采用周期性抓取。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    www.66ip.cn
    www.xdaili.cn
    www.kuaidaili
    www.ip3366.net
    www.89ip.cn
    www.xicidaili.com
    www.iphai.com
    premproxy.com
    www.xroxy.com
    www.data5u.com
    www.kxdaili.com
  • 存储模块

    主要学习了GitHub上一个采用的redis的有序集合来进行的,一是保证代理数据的不重复,二是使用zscore来标记代理的可用性。

  • 检测模块

    定时检测数据库中的代理的可用性情况,并采用打分机制,初始化一个分值,若可用则标记为最高,不可用则在初始化分值基础上减1,直到为0,则从redis中删除。后续random获取的时候采用分值最高的优先展示的从高到低模式。同时保证代理的实时可用性,设置定时检测。

  • API模块

    采用Flask作为web服务器,前端copy的别人的改的。从redis数据库中优先选择出分数最高的,若同时存在多个,则随机选择一个。

0x03 具体实现

爬虫模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def crawl_daili66(self, page_count=4):
"""
获取代理:www.66ip.cn
方法:pyquery
"""
start_url = 'http://www.66ip.cn/{}.html'
urls = [start_url.format(page) for page in range(1, page_count + 1)]
for url in urls:
html = get_page(url)
if html:
doc = pq(html)
trs = doc('.containerbox table tr:gt(0)').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text()
port = tr.find('td:nth-child(2)').text()
yield ':'.join([ip, port])
def crawl_ip181(self, page_count=2):
"""
获取代理:www.xdaili.cn
方法:json
"""
start_url = 'http://www.xdaili.cn/ipagent//freeip/getFreeIps?page={}'
urls = [start_url.format(page) for page in range(1, page_count + 1)]
for url in urls:
try:
response = requests.get(url)
print('抓取成功 >>> ', url, response.status_code)
if response.status_code == 200:
html = response.json()
except ConnectionError:
print('抓取失败 >>> ', url)
pass
if html:
# ip_address = re.compile(r'"ip":"(.*?)","port":"(.*?)"')
# ip_address = ip_address.findall(html)
for key in html['RESULT']['rows']:
yield ':'.join([key['ip'], key['port']])
def crawl_kuaidaili(self):
"""
获取代理:www.kuaidaili
方法:xpath
"""
for i in range(1, 4):
start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i)
html = get_page(start_url)
if html:
html = etree.HTML(html, parser=etree.HTMLParser(encoding='utf-8'))
_ip_xpath = '//*[@id="list"]/table/tbody/tr[{}]/td[1]/text()'
_port_xpath = '//*[@id="list"]/table/tbody/tr[{}]/td[2]/text()'
for i in range(1, 16):
ip_xpath = _ip_xpath.format(i)
port_xpath = _port_xpath.format(i)
ip = html.xpath(ip_xpath)[0]
port = html.xpath(port_xpath)[0]
yield ":".join([ip, port])
def crawl_ip3366(self):
"""
获取代理:www.ip3366.net
方法:正则
"""
for page in range(1, 4):
start_url = 'http://www.ip3366.net/free/?stype=1&page={}'.format(page)
html = get_page(start_url)
ip_address = re.compile('<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')
re_ip_address = ip_address.findall(html)
for address, port in re_ip_address:
result = address + ':' + port
yield result.replace(' ', '')

本着学习的目的,同时采用了pyquery正则jsonxpath等不同的获取方法,使用yield来将这些方法定义成生成器,这样我们每次请求就会通过yield来返回一个代理IP。

同时使用crawl_开头的函数命名方法,来达到动态添加代理网站地址接口的目的(这部分借鉴的大神的),具体参考:python元类解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ProxyMetaclass(type):
def __new__(cls, name, bases, attrs):
count = 0
attrs['__CrawlFunc__'] = []
for k, v in attrs.items():
if 'crawl_' in k:
attrs['__CrawlFunc__'].append(k)
count += 1
attrs['__CrawlFuncCount__'] = count
return type.__new__(cls, name, bases, attrs)
class Crawler(object, metaclass=ProxyMetaclass):
def get_proxies(self, callback):
proxies = []
for proxy in eval("self.{}()".format(callback)):
print(u'成功获取到代理 >>> ', proxy)
proxies.append(proxy)
return proxies

定义类ProxyMetaclass并声明一个__new__方法,同时Crawler类将它声明为元类,这个方法有固定的几个参数cls, name, bases, attrs,第四个参数attrs中包含了类的一些属性。我们可以遍历attrs这个参数即可获取类的所有方法信息,就像遍历字典一样,键名对应方法的名称。然后判断方法的开头是否crawl_,如果是,则将其加入到__CrawlFunc__属性中。这样我们就成功将所有以crawl_开头的方法定义成了一个属性,动态获取到所有以crawl_开头的方法列表。

然后定义了一个get_proxies()方法,将所有以crawl_开头的方法调用一遍,获取每个方法返回的代理并组合成列表形式返回。

存储模块

采用redis的有序集合,集合的特点是集合的每一个元素都是不重复的,这样就保证了代理的不重复。有序集合,顾名思义,这里存储的数据是有序的,我们就可以利用这点对代理IP进行打分、排序,实现权重的效果。具体可参考:Redis 有序集合(sorted set)

连接redis

1
db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)

添加数据,添加代理,设置分数为最高

1
2
3
4
5
6
def add(self, proxy, score=INITIAL_SCORE):
if not re.match('\d+\.\d+\.\d+\.\d+\:\d+', proxy):
print('代理不符合规范', proxy, u'丢弃')
return
if not self.db.zscore(REDIS_KEY, proxy):
return self.db.zadd(REDIS_KEY, score, proxy)

获取代理IP,随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常

1
2
3
4
5
6
7
8
9
10
11
def random(self):
result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
if len(result):
return choice(result)
else:
result = self.db.zrevrange(REDIS_KEY, 0, 100)
if len(result):
return choice(result)
else:
raise PoolEmptyError

删除代理IP,代理值减一分,小于最小值则删除

1
2
3
4
5
6
7
8
9
def decrease(self, proxy):
score = self.db.zscore(REDIS_KEY, proxy)
if score and score > MIN_SCORE:
print('代理', proxy, u'当前分数', score, u'减1')
return self.db.zincrby(REDIS_KEY, proxy, -1)
else:
print('代理', proxy, u'当前分数', score, '移除')
return self.db.zrem(REDIS_KEY, proxy)

检测模块

代理检测第一次可用,则分数设置为100,代理不可用,分数减1,这样就可以实时改变每个代理的可用情况。API接口返回的时候只需要获取分数高的代理即可。使用的python3的异步IO库aiohttp,参考的大神的表演,学习了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Tester(object):
def __init__(self):
self.redis = RedisClient()
async def test_single_proxy(self, proxy):
"""
异步测试单个代理IP并写入redis
"""
conn = aiohttp.TCPConnector(verify_ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy, bytes):
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
print('正在测试 >>>', proxy)
async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
if response.status in VALID_STATUS_CODES:
self.redis.max(proxy)
print('代理可用 >>> ', proxy)
else:
self.redis.decrease(proxy)
print('请求响应码不合法 >>>', response.status, 'IP', proxy)
except Exception as e:
self.redis.decrease(proxy)
print('代理请求失败 >>> ', proxy)
def run(self):
"""
测试主函数
"""
print('*** 测试器开始运行 ***')
try:
count = self.redis.count()
print('当前剩余:', count, u'个代理')
for i in range(0, count, BATCH_TEST_SIZE):
start = i
stop = min(i + BATCH_TEST_SIZE, count)
print('正在测试第', start + 1, '-', stop, u'个代理')
test_proxies = self.redis.batch(start, stop)
loop = asyncio.get_event_loop()
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
sys.stdout.flush()
time.sleep(5)
except Exception as e:
print('测试器发生错误', e.args)

定义了一个类Tester,接下来定义了一个test_single_proxy()方法,并在前面加了async关键词,这代表这个方法是异步的。方法内部首先创建了aiohttp的ClientSession对象,此对象类似于requests的Session对象,可以直接调用该对象的get()方法来访问页面。在这里,代理的设置是通过proxy参数传递给get()方法,程序在获取Response后需要判断响应的状态,如果状态码在VALID_STATUS_CODES列表里,则代表代理可用,可以调用RedisClientmax()方法将代理分数设为100,否则调用decrease()方法将代理分数减1,如果出现异常也同样将代理分数减1。

0x04 接口模块

使用Flask实现接口模块,前端wget的VulApps的前端模版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
__all__ = ['app']
app = Flask(__name__)
def get_conn():
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis
@app.route('/')
def index():
render = render_template('index.html')
return make_response(render)
@app.route('/random')
def get_proxy():
"""
获取proxyAPI
:return: 随机代理
"""
conn = get_conn()
return conn.random()
@app.route('/count')
def get_counts():
"""
Get the count of proxies
:return: 代理池总量
"""
conn = get_conn()
return str(conn.count())
if __name__ == '__main__':
app.run()

前端模版index.html

项目地址:http://proxy.w2n1ck.com:9090/

API接口:http://proxy.w2n1ck.com:9090/random

0x05 使用

测试结果如下:

获取代理IP:

1
2
3
4
5
6
7
8
9
import requests
PROXY_POOL_URL = 'http://proxy.w2n1ck.com:9090/random'
def get_proxy():
try:
response = requests.get(PROXY_POOL_URL)
if response.status_code == 200:
return response.text
except ConnectionError:
return None

使用代理IP:

1
2
3
4
5
6
7
8
9
10
11
import requests
proxy = get_proxy()
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy,
}
try:
response = requests.get('http://ip.cn/', proxies=proxies)
print(response.text)
except:
pass
大爷,赏个铜板呗!