@interface URLOperation : NSOperation {
NSURLRequest* _request;
NSURLConnection* _connection;
}
- (id)initWithURLString: (NSString*)url;
@property (readonly) NSData* data;
end
@implementation URLOperation
/*
** work thread enter function
*/
- (void)start {
if (![self isCancelled]) {
_connection = [[NSURLConnection connectionWithRequest:_request deleget:self] retain];
while(_connection != nil) {
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]];
}
}
}
#pragma mark NSURLConnection delegate Method
- (void)connecton: (NSURLConnection*)connection
didReceiveData: (NSData*)data {
[_data appendData:data];
}
@end
/*
** asynchronous http request
** threadpool will pull operation from queue to work thread
*/
- (IBAction)pushButton: (id)sender { // main thread
NSOperationQueue* _queue = [[NSOperationQueue alloc] init];
URLOperation* operation = [[URLOperation alloc] initWithURLString:@"http://google.com"];
[_queue addOperation:operation];
}
后续分析:
接受服务器response数据后,回调以观察者(observer)模式添加到主线程消息处理队列实现。
代码实现:
#include <boost/asio.hpp>
#include <thread>
#include <vector>
#include <string>
#include <iterator>
#include <iostream>
#include <deque>
#include <mutex>
#include <set>
#include <memory>
class Observer {
protected:
virtual ~Observer() {}
public:
virtual void update(const std::string& data) = 0;
};
class ControlDelegate1 : public Observer {
public:
virtual void update(const std::string& data) {
std::cout << "ControlDelegate1 process data: " << data << std::endl;
}
};
class ControlDelegate2 : public Observer {
public:
virtual void update(const std::string& data) {
std::cout << "ControlDelegate2 process data: " << data << std::endl;
}
};
class Message {
public:
void notify() {
for(auto& obser: m_obsers) {
obser->update(m_data);
}
}
void addObserver(Observer* obser) {
if(m_obsers.end() == m_obsers.find(obser)) {
m_obsers.insert(obser);
}
}
void eraseObserver(Observer* obser) {
m_obsers.erase(obser);
}
inline void setData(const std::string& data) { m_data = data; }
inline const std::string& data() const { return m_data; }
private:
std::set<Observer*> m_obsers;
std::string m_data;
};
class MsgQueue {
public:
MsgQueue() {}
void push(std::shared_ptr<Message> msg) {
std::unique_lock<std::mutex> lck(m_msgMtx);
m_msgQue.push_back(msg);
}
std::shared_ptr<Message> pop() {
std::shared_ptr<Message> msg;
std::unique_lock<std::mutex> lck(m_msgMtx);
if(!m_msgQue.empty()) {
msg = m_msgQue.front();
m_msgQue.pop_front();
}
return msg;
}
private:
std::mutex m_msgMtx;
std::deque<std::shared_ptr<Message>> m_msgQue;
};
typedef std::function<void (const boost::system::error_code&)> timer_callback;
void main(int argc, char **argv) {
ControlDelegate1* deleagate1 = new ControlDelegate1;
ControlDelegate2* deleagate2 = new ControlDelegate2;
MsgQueue* inqueue = new MsgQueue;
MsgQueue* outqueue = new MsgQueue;
std::thread proctd([&]() { //need to be optimized with thread pool
boost::asio::io_service io;
boost::asio::deadline_timer timer(io, boost::posix_time::seconds(1));
timer_callback callback = [&](const boost::system::error_code& err) {
/*-------- on timer ---------*/
for(;;) {
std::shared_ptr<Message> msg = inqueue->pop();
if(nullptr == msg.get()) {
break;
}
std::string data = msg.get()->data();
msg.get()->setData(data.append(" end"));
outqueue->push(msg);
}
/*-------- on timer ---------*/
timer.expires_at(timer.expires_at() + boost::posix_time::seconds(1));
timer.async_wait(callback);
};
timer.async_wait(callback);
io.run();
});
proctd.detach();
boost::asio::io_service io;
boost::asio::deadline_timer timer(io, boost::posix_time::seconds(1));
timer_callback callback = [&](const boost::system::error_code& err) {
/*-------- on timer ---------*/
for(int i=0; i<2; i++) {
std::shared_ptr<Message> msg(new Message);
msg.get()->setData(std::string("hello ").append(std::to_string(i)));
if(i%2) {
msg.get()->addObserver(deleagate1);
} else {
msg.get()->addObserver(deleagate2);
}
inqueue->push(msg);
}
for(;;) {
std::shared_ptr<Message> msg = outqueue->pop();
if(nullptr == msg.get()) {
break;
}
msg.get()->notify();
}
/*-------- on timer ---------*/
timer.expires_at(timer.expires_at() + boost::posix_time::seconds(1));
timer.async_wait(callback);
};
timer.async_wait(callback);
io.run();
}
添加线程池后的代码实现:
/*
** thread pool source:
** http://threadpool.sourceforge.net/
**
** thread pool features:
** Policy-based thread pool implementation
** Scheduling policies: fifo, lifo and priority(优先级)
** Size policies: static_size
** Size policy controller: empty_controller, resize_controller
** Shutdown policies: wait_for_all_tasks, wait_for_active_tasks, immediately
** Smooth integration into STL and boost
**
*/
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/threadpool.hpp>
#include <thread>
#include <vector>
#include <string>
#include <iterator>
#include <iostream>
#include <deque>
#include <mutex>
#include <set>
#include <memory>
class Observer {
protected:
virtual ~Observer() {}
public:
virtual void update(const std::string& data) = 0;
};
class ControlDelegate1 : public Observer {
public:
virtual void update(const std::string& data) {
std::cout << "ControlDelegate1 process data: " << data << std::endl;
}
};
class ControlDelegate2 : public Observer {
public:
virtual void update(const std::string& data) {
std::cout << "ControlDelegate2 process data: " << data << std::endl;
}
};
class Message;
class MsgQueue {
public:
void push(std::shared_ptr<Message> msg) {
std::unique_lock<std::mutex> lck(m_msgMtx);
m_msgQue.push_back(msg);
}
std::shared_ptr<Message> pop() {
std::shared_ptr<Message> msg;
std::unique_lock<std::mutex> lck(m_msgMtx);
if(!m_msgQue.empty()) {
msg = m_msgQue.front();
m_msgQue.pop_front();
}
return msg;
}
private:
std::mutex m_msgMtx;
std::deque<std::shared_ptr<Message>> m_msgQue;
};
static MsgQueue outqueue;
class Message {
public:
void notify() {
for(auto& obser: m_obsers) {
obser->update(m_data);
}
}
void addObserver(Observer* obser) {
if(m_obsers.end() == m_obsers.find(obser)) {
m_obsers.insert(obser);
}
}
void eraseObserver(Observer* obser) {
m_obsers.erase(obser);
}
inline void setData(const std::string& data) { m_data = data; }
inline const std::string& data() const { return m_data; }
void process() {
m_data = m_data.append(" end");
std::shared_ptr<Message> msg(this);
outqueue.push(msg);
}
private:
std::set<Observer*> m_obsers;
std::string m_data;
};
typedef std::function<void (const boost::system::error_code&)> timer_callback;
void main(int argc, char **argv) {
boost::threadpool::pool tp(2); // thread pool initializes with 2 threads.
ControlDelegate1* deleagate1 = new ControlDelegate1;
ControlDelegate2* deleagate2 = new ControlDelegate2;
boost::asio::io_service io;
boost::asio::deadline_timer timer(io, boost::posix_time::seconds(1));
timer_callback callback = [&](const boost::system::error_code& err) {
/*-------- on timer ---------*/
for(int i=0; i<4; i++) {
Message* msg = new Message;
msg->setData(std::string("hello ").append(std::to_string(i)));
if(i%2) {
msg->addObserver(deleagate1);
} else {
msg->addObserver(deleagate2);
}
tp.schedule(boost::bind(&Message::process, msg)); // add task(message) to thread pool
}
for(;;) {
std::shared_ptr<Message> msg = outqueue.pop();
if(nullptr == msg.get()) {
break;
}
msg.get()->notify();
}
/*-------- on timer ---------*/
timer.expires_at(timer.expires_at() + boost::posix_time::seconds(1));
timer.async_wait(callback);
};
timer.async_wait(callback);
io.run();
}
参考: boost线程池